diff --git a/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java b/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java index d2c913b3b9a78435e136786735f8ed75aef9fd10..744d4cacb1a2d0d0e34127086e5a35c0db89a491 100644 --- a/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java +++ b/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java @@ -296,33 +296,33 @@ public class DisThreadedNetworkInterface /* *************** networking i/o ************* */ private PduFactory pduFactory = new PduFactory(); - private Thread senderThread; - private Thread receiverThread; + private Thread sendingThread; + private Thread receiveThread; /** Initialization of threads, otherwise no action that while they remain running */ - private synchronized void createSenderReceiverThreads() + private synchronized void createThreads() { - if (receiverThread == null) + if (receiveThread == null) { - System.out.println("DisThreadedNetworkInterface initializeSocketSenderReceiverThreads() starting receiverThread..."); - receiverThread = new Thread(receiverThreadRunnable, "receiverThread"); +// System.out.println("DisThreadedNetworkInterface createThreads() start receiveThread..."); + receiveThread = new Thread(receiveThreadRunnable, "receiveThread"); // https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java - receiverThread.setDaemon(false); // user thread, not system thread - receiverThread.setPriority(Thread.NORM_PRIORITY); - receiverThread.start(); + receiveThread.setDaemon(false); // user thread, not system thread + receiveThread.setPriority(Thread.NORM_PRIORITY); + receiveThread.start(); } - System.out.println("DisThreadedNetworkInterface initializeSocketSenderReceiverThreads() receiverThread.isAlive()=" + receiverThread.isAlive()); + System.out.println("DisThreadedNetworkInterface createThreads() receiveThread.isAlive()=" + receiveThread.isAlive()); - if (senderThread == null) + if (sendingThread == null) { - System.out.println("DisThreadedNetworkInterface initializeSocketSenderReceiverThreads() starting senderThread..."); - senderThread = new Thread(senderThreadRunnable, "senderThread"); +// System.out.println("DisThreadedNetworkInterface createThreads() start sendingThread..."); + sendingThread = new Thread(sendingThreadRunnable, "sendingThread"); // https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java - senderThread.setDaemon(false); // user thread, not system thread - senderThread.setPriority(Thread.NORM_PRIORITY); - senderThread.start(); + sendingThread.setDaemon(false); // user thread, not system thread + sendingThread.setPriority(Thread.NORM_PRIORITY); + sendingThread.start(); } - System.out.println("DisThreadedNetworkInterface initializeSocketSenderReceiverThreads() senderThread.isAlive()=" + receiverThread.isAlive()); + System.out.println("DisThreadedNetworkInterface createThreads() sendingThread.isAlive()=" + receiveThread.isAlive()); } /** * Can be used to restart DisThreadedNetworkInterface if closed. @@ -334,7 +334,7 @@ public class DisThreadedNetworkInterface { createDatagramSocket(); // common asset, synchronized to prevent interleaved reentry - createSenderReceiverThreads(); + createThreads(); } /** * Create datagram socket if not already available; can also be invoked by @@ -358,17 +358,17 @@ public class DisThreadedNetworkInterface { System.out.println(message + "datagramSocket is already connected..."); } - if (datagramSocket.isClosed()) + else if (datagramSocket.isClosed()) { datagramSocket = null; - System.err.println(" *** " + TRACE_PREFIX + "datagramSocket was available but closed before createDatagramSocket(), resetting..."); + System.err.println(" *** " + TRACE_PREFIX + "datagramSocket was previously used but closed, resetting..."); } } - else // (datagramSocket == null) + if (datagramSocket == null) // may have just been reset { try { - System.out.println(message + "datagramSocket is getting connected..."); +// System.out.println(message + "datagramSocket is getting connected..."); // The initial value of the SO_BROADCAST socket option is FALSE datagramSocket = new MulticastSocket(getPort()); datagramSocket.setSoTimeout(1000); // msec timeout on reading and then continue looping in order to avoid blocking @@ -398,7 +398,7 @@ public class DisThreadedNetworkInterface } @SuppressWarnings("SleepWhileInLoop") // intentional - private Runnable receiverThreadRunnable = () -> { + private Runnable receiveThreadRunnable = () -> { int pduReceiptCounter = 0; @@ -407,51 +407,47 @@ public class DisThreadedNetworkInterface ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_TRANSMISSION_UNIT_SIZE); DatagramPacket receivedPacket = new DatagramPacket(byteBuffer.array(), byteBuffer.capacity()); - while (!killed) // keep trying even if error occurred + while (!killed && !Thread.currentThread().isInterrupted()) // loop until terminated { try { -// while (!killed) // loop until terminated -// { - datagramSocket.receive(receivedPacket); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port - toRawListeners(receivedPacket.getData(), receivedPacket.getLength()); + datagramSocket.receive(receivedPacket); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port + toRawListeners(receivedPacket.getData(), receivedPacket.getLength()); - Pdu nextPdu = pduFactory.createPdu(byteBuffer); + Pdu nextPdu = pduFactory.createPdu(byteBuffer); - if (nextPdu != null) + if (nextPdu != null) + { + pduReceiptCounter++; // TODO experimental, add to generator as a commented-out diagnostic; consider adding diagnostic mode + if (hasVerboseOutput() && hasVerboseReceipt()) { - pduReceiptCounter++; // TODO experimental, add to generator as a commented-out diagnostic; consider adding diagnostic mode - if (hasVerboseOutput() && hasVerboseReceipt()) - { - String pad = ""; - if (pduReceiptCounter < 10) - pad = " "; - String message = TRACE_PREFIX + "[receipt " + pad + pduReceiptCounter + "] " + nextPdu.getPduType().toString(); - if (hasVerboseOutputIncludesTimestamp()) - message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp()); - if (nextPdu.getPduType() == DisPduType.ENTITY_STATE) - message += " " + String.format("%11s", ((EntityStatePdu)nextPdu).getMarkingString()); - message += ", size " + nextPdu.getMarshalledSize() + " bytes)"; - System.err.flush(); - System.out.println(message); - System.out.flush(); - } - toListeners(nextPdu); + String pad = ""; + if (pduReceiptCounter < 10) + pad = " "; + String message = TRACE_PREFIX + "[receipt " + pad + pduReceiptCounter + "] " + nextPdu.getPduType().toString(); + if (hasVerboseOutputIncludesTimestamp()) + message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp()); + if (nextPdu.getPduType() == DisPduType.ENTITY_STATE) + message += " " + String.format("%11s", ((EntityStatePdu)nextPdu).getMarkingString()); + message += ", size " + nextPdu.getMarshalledSize() + " bytes)"; + System.err.flush(); + System.out.println(message); + System.out.flush(); } - byteBuffer.clear(); -// } -// Thread.sleep (100L); // loop successfully killed - } + toListeners(nextPdu); + } + byteBuffer.clear(); + } catch (IOException ex) { // read commonly times out while waiting for PDUs to arrive, and so this trace statement is for debugging -// System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receiverThread: " + ex.getLocalizedMessage()); +// System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receiveThread: " + ex.getLocalizedMessage()); } } // returning kills thread, do not put any other steps here }; - private final Runnable senderThreadRunnable = () -> { + private final Runnable sendingThreadRunnable = () -> { int pduSentCounter = 0; @@ -460,47 +456,43 @@ public class DisThreadedNetworkInterface dos = new DataOutputStream(baos); packet = new DatagramPacket(baos.toByteArray(), baos.size(), inetSocket); - while (!killed) // keep trying even if error occurred + while (!killed && !Thread.currentThread().isInterrupted()) // loop until terminated { try { -// while (!killed) // loop until terminated -// { - Pdu nextPdu = pdus2send.take(); - - nextPdu.marshal(dos); - packet.setData(baos.toByteArray()); - datagramSocket.send(packet); - pduSentCounter++; - - if (hasVerboseOutput() && hasVerboseSending()) - { - String pad = ""; - if (pduSentCounter < 10) - pad = " "; - String message = TRACE_PREFIX + "[sending " + pad + pduSentCounter + "] " + nextPdu.getPduType().toString(); - if (hasVerboseOutputIncludesTimestamp()) - message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp()); - if (nextPdu.getPduType() == DisPduType.ENTITY_STATE) - message += " " + String.format("%11s", ((EntityStatePdu)nextPdu).getMarkingString()); - message += ", size " + nextPdu.getMarshalledSize() + " bytes)"; - System.err.flush(); - System.out.println(message); - System.out.flush(); - } - dos.flush(); // immediately force pdu write - baos.reset(); // prepare for next send -// } -// if (killed) // operations are finished -// { -// // check if looping complete, cleanup here to ensure thread exit is clean -// System.out.println("*** DisThreadedNetworkInterface senderThreadRunnable killed=true"); -// } -// Thread.sleep (100L); // loop successfully killed + Pdu nextPdu = pdus2send.take(); + + nextPdu.marshal(dos); + packet.setData(baos.toByteArray()); + datagramSocket.send(packet); + pduSentCounter++; + + if (hasVerboseOutput() && hasVerboseSending()) + { + String pad = ""; + if (pduSentCounter < 10) + pad = " "; + String message = TRACE_PREFIX + "[sending " + pad + pduSentCounter + "] " + nextPdu.getPduType().toString(); + if (hasVerboseOutputIncludesTimestamp()) + message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp()); + if (nextPdu.getPduType() == DisPduType.ENTITY_STATE) + message += " " + String.format("%11s", ((EntityStatePdu)nextPdu).getMarkingString()); + message += ", size " + nextPdu.getMarshalledSize() + " bytes)"; + System.err.flush(); + System.out.println(message); + System.out.flush(); + } + dos.flush(); // immediately force pdu write + baos.reset(); // prepare for next send + } + catch (InterruptedException ex) + { + // no response, expected termination } catch (Exception ex) { - System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface senderThread: " + ex.getLocalizedMessage()); + System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface sendingThread: " + ex.getLocalizedMessage()); + ex.printStackTrace(); } } // returning kills thread, do not put any other steps here @@ -540,13 +532,20 @@ public class DisThreadedNetworkInterface @Deprecated public void kill() { - setKillSentinel(); + setKillSentinelAndInterrupts(); } - /** Finish pending send/receive activity and then close. */ - public void setKillSentinel() + /** Tell sendingThread and receiveThread to stop. */ + public void setKillSentinelAndInterrupts() // TODO rename AndInterrupts { killed = true; // set loop sentinel for threads to finish + + // https://stackoverflow.com/questions/26647840/how-do-i-interrupt-kill-a-hung-thread-in-java + sendingThread.interrupt(); + receiveThread.interrupt(); + System.out.println ("*** setKillSentinelAndInterrupts() killed=" + killed + + " sendingThread.isInterrupted()=" + sendingThread.isInterrupted() + + " receiveThread.isInterrupted()=" + receiveThread.isInterrupted()); } /** Terminate the instance after completion of pending send/receive activity. @@ -557,43 +556,17 @@ public class DisThreadedNetworkInterface { if (!killed) { - setKillSentinel(); // killed = true; - Thread.sleep(100l); // let senderThread and receiverThread stop + setKillSentinelAndInterrupts(); // killed = true; + Thread.sleep(100l); // let sendingThread and receiveThread stop } pdus2send.clear(); // all stop - dos.flush(); // immediately force pdu write + dos.flush(); // immediately force pdu write, if any remain baos.close(); dos.close(); - System.out.println ("*** Thread close(): pdus2send.size()=" + pdus2send.size() + - " baos.size()=" + baos.size() + " dos.size()=" + dos.size()); + System.out.println ("*** DisThreadedNetworkInterface close():" + + " pdus2send.size()=" + pdus2send.size() + + " baos.size()=" + baos.size() + " dos.size()=" + dos.size()); -// try { // join and kill senderThread -// if (senderThread != null) -// { -// senderThread.join(4000); // wait for thread to die, msec max duration -// Thread.sleep(100l); -// } -// } -// catch (InterruptedException ie) -// { -// System.err.println ("*** DisThreadedNetworkInterface senderThread join() failed to wait for senderThread to die"); -// System.err.flush(); -// ie.printStackTrace(System.err); -// } - // separate blocks since second join statement otherwise gets skipped if first thread timeout occurs -// try { // join and kill receiverThread -// if (receiverThread != null) -// { -// receiverThread.join(4000); // wait for thread to die, msec max duration -// Thread.sleep(100l); -// } -// } -// catch (InterruptedException ie) -// { -// System.err.println ("*** DisThreadedNetworkInterface receiverThread join() failed to wait for threads to die"); -// System.err.flush(); -// ie.printStackTrace(System.err); -// } // now close socket (after killing threads so that hopefully the socket doesn't lock them) if (datagramSocket != null && !datagramSocket.isClosed()) { @@ -616,26 +589,10 @@ public class DisThreadedNetworkInterface System.out.flush(); } } - killThread ( senderThread); - killThread (receiverThread); - Thread.sleep(1000l); + killThread (sendingThread); // making sure + killThread (receiveThread); // making sure + Thread.sleep(500l); reportThreadStatus(); - - - // try again!! to close sender thread, TODO what is blocking? -// try { // join and kill senderThread -// if (senderThread != null) -// { -// senderThread.join(6000); // wait for thread to die, msec max duration -// Thread.sleep(100l); -// } -// } -// catch (InterruptedException ie) -// { -// System.err.println ("*** DisThreadedNetworkInterface senderThread join() failed to wait for senderThread to die"); -// System.err.flush(); -// ie.printStackTrace(System.err); -// } } catch (Exception e) { @@ -647,6 +604,7 @@ public class DisThreadedNetworkInterface try { // join and kill threadToKill if (threadToKill != null) { + threadToKill.interrupt(); threadToKill.join(4000); // wait for thread to die, msec max duration Thread.sleep(100l); } @@ -661,23 +619,24 @@ public class DisThreadedNetworkInterface String threadAlive = "null"; if (threadToKill != null) threadAlive = String.valueOf(threadToKill.isAlive()); - System.out.println ("*** killThread() status: " + threadToKill.getName() + ".isAlive()=" + threadAlive); + System.out.println ("*** killThread() status: " + threadToKill.getName() + ".isAlive()=" + threadAlive + + " " + threadToKill.getName() + ".isInterrupted()=" + threadToKill.isInterrupted()); System.err.flush(); // ensure all output sent System.out.flush(); // ensure all output sent } - /** Report whether senderThread and receiverThread are alive */ + /** Report whether sendingThread and receiveThread are alive */ public void reportThreadStatus() { // report if successful - String senderThreadAlive = "null"; - String receiverThreadAlive = "null"; - if (senderThread != null) - senderThreadAlive = String.valueOf( senderThread.isAlive()); - if (receiverThread != null) - receiverThreadAlive = String.valueOf(receiverThread.isAlive()); - System.out.println ("*** Thread closing status: senderThread.isAlive()=" + senderThreadAlive + - " receiverThread.isAlive()=" + receiverThreadAlive); + String sendingThreadAlive = "null"; + String receiveThreadAlive = "null"; + if (sendingThread != null) + sendingThreadAlive = String.valueOf( sendingThread.isAlive()); + if (receiveThread != null) + receiveThreadAlive = String.valueOf(receiveThread.isAlive()); + System.out.println ("*** Thread close status: sendingThread.isAlive()=" + sendingThreadAlive + + " receiveThread.isAlive()=" + receiveThreadAlive); System.err.flush(); // ensure all output sent System.out.flush(); // ensure all output sent } @@ -865,7 +824,7 @@ public class DisThreadedNetworkInterface { DisThreadedNetworkInterface disThreadedNetworkInterface; - System.out.println("*** DisThreadedNetworkInterface main() test started..."); + System.out.println("*** DisThreadedNetworkInterface main() self test started..."); String selfTestDescriptor = "main() self test"; if ((args != null) && args.length == 3) @@ -874,7 +833,7 @@ public class DisThreadedNetworkInterface disThreadedNetworkInterface = new DisThreadedNetworkInterface( /* default address, port */ selfTestDescriptor); else disThreadedNetworkInterface = new DisThreadedNetworkInterface(args[0], Integer.parseInt(args[1]), selfTestDescriptor); - System.out.println("*** DisThreadedNetworkInterface main() self test created instance."); + System.out.println("*** DisThreadedNetworkInterface main() self test initialized..."); try {