From 9cf956cac4ba4147c914e5318eca3b5e12c5309f Mon Sep 17 00:00:00 2001 From: brutzman <brutzman@nps.edu> Date: Sun, 26 Dec 2021 17:59:07 -0800 Subject: [PATCH] continued debugging, why isn't senderThread killed? --- .../DisThreadedNetworkInterface.java | 392 +++++++++++------- 1 file changed, 251 insertions(+), 141 deletions(-) diff --git a/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java b/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java index 90a59bee4e..d2c913b3b9 100644 --- a/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java +++ b/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java @@ -26,6 +26,30 @@ import java.util.logging.Logger; */ public class DisThreadedNetworkInterface { + + /** ********** Begin class ************** */ + /** + * MTU 8192: TODO this has actually been superseded by a larger buffer size, + * but good enough for now + */ + public static final int MAX_DIS_PDU_SIZE = 8192; + + /** + * MTU 1500: size of an Ethernet frame, common value to avoid packet + * segmentation + */ + public static final int MAX_TRANSMISSION_UNIT_SIZE = 1500; + + // independently available parameters for each DisThreadedNetworkInterface object + private String disAddress; + private int disPort; + private boolean killed = false; // thread loop sentinel + + private InetAddress inetAddress; + private InetSocketAddress inetSocket; + private NetworkInterface networkInterface; + private DatagramSocket datagramSocket; + /** Default multicast group address <code>239.1.2.3</code> for send and receive connections. * @see <a href="https://en.wikipedia.org/wiki/Multicast_address">https://en.wikipedia.org/wiki/Multicast_address</a> */ public static String DEFAULT_DIS_ADDRESS = "239.1.2.3"; @@ -41,6 +65,17 @@ public class DisThreadedNetworkInterface private boolean verboseSending = true; private boolean verboseIncludesTimestamp = false; + /* *********** queues and lists and public methods ************** */ + private final List<PduListener> everyTypeListeners = new ArrayList<>(); + private final Map<DisPduType, List<PduListener>> typeListeners = new HashMap<>(); + private final List<RawPduListener> rawListeners = new ArrayList<>(); + // https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/LinkedBlockingQueue.html + private final LinkedBlockingQueue<Pdu> pdus2send = new LinkedBlockingQueue<>(); // FIFO + + ByteArrayOutputStream baos; + DataOutputStream dos; + DatagramPacket packet; + /** * Pdu listener class and interface */ @@ -92,29 +127,6 @@ public class DisThreadedNetworkInterface } } - /** ********** Begin class ************** */ - /** - * MTU 8192: TODO this has actually been superseded by a larger buffer size, - * but good enough for now - */ - public static final int MAX_DIS_PDU_SIZE = 8192; - - /** - * MTU 1500: size of an Ethernet frame, common value to avoid packet - * segmentation - */ - public static final int MAX_TRANSMISSION_UNIT_SIZE = 1500; - - // independently available parameters for each DisThreadedNetworkInterface object - private String disAddress; - private int disPort; - private boolean killed = false; // thread loop sentinel - - private InetAddress inetAddress; - private InetSocketAddress inetSocket; - private NetworkInterface networkInterface; - private DatagramSocket datagramSocket; - /** * Object constructor using default multicast address and port, no descriptor */ @@ -166,40 +178,7 @@ public class DisThreadedNetworkInterface inetSocket = new InetSocketAddress(inetAddress, disPort); // tests that accessor methods are working as set networkInterface = findIpv4Interface(); - start(); - } - - /* *********** queues and lists and public methods ************** */ - private final List<PduListener> everyTypeListeners = new ArrayList<>(); - private final Map<DisPduType, List<PduListener>> typeListeners = new HashMap<>(); - private final List<RawPduListener> rawListeners = new ArrayList<>(); - // https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/LinkedBlockingQueue.html - private final LinkedBlockingQueue<Pdu> pdus2sendLinkedBlockingQueue = new LinkedBlockingQueue<>(); // FIFO - - /** - * Main method for testing. - * @see <a href="https://docs.oracle.com/javase/tutorial/getStarted/application/index.html">Java Tutorials: A Closer Look at the "Hello World!" Application</a> - * @param args [address, port, descriptor] command-line arguments are an array of optional String parameters that are passed from execution environment during invocation - */ - public static void main(String[] args) - { - System.out.println("*** DisThreadedNetworkInterface main() test started..."); - - DisThreadedNetworkInterface disThreadedNetworkInterface; - String selfTestDescriptor = "DisThreadedNetworkInterface main() self test"; - if (args.length == 3) - selfTestDescriptor = args[2]; - if ((args == null) || (args.length == 0)) - disThreadedNetworkInterface = new DisThreadedNetworkInterface( /* default address, port */ selfTestDescriptor); - else disThreadedNetworkInterface = new DisThreadedNetworkInterface(args[0], Integer.parseInt(args[1]), selfTestDescriptor); - - System.out.println("*** DisThreadedNetworkInterface main() self test created instance."); - - disThreadedNetworkInterface.close(); - - System.out.flush(); - System.err.flush(); - System.out.println("*** DisThreadedNetworkInterface main() self test complete."); + begin(); } /** @@ -258,21 +237,21 @@ public class DisThreadedNetworkInterface /** * Add a listener to accept pdus of all types in the form of a byte array * - * @param lis listener instance implementing the RawPduListener interface + * @param rawPduListener listener instance implementing the RawPduListener interface */ - public void addRawListener(RawPduListener lis) + public void addRawListener(RawPduListener rawPduListener) { - rawListeners.add(lis); + rawListeners.add(rawPduListener); } /** * Remove previously added raw listener * - * @param lis listener instance implementing the RawPduListener interface + * @param rawPduListener listener instance implementing the RawPduListener interface */ - public void removeRawListener(RawPduListener lis) + public void removeRawListener(RawPduListener rawPduListener) { - rawListeners.remove(lis); + rawListeners.remove(rawPduListener); // additional sleep, hopefully allowing teardown to proceed to completion sleep(100l); // TODO needed? } @@ -311,7 +290,7 @@ public class DisThreadedNetworkInterface */ public void send(Pdu pdu) { - pdus2sendLinkedBlockingQueue.add(pdu); + pdus2send.add(pdu); } /* *************** networking i/o ************* */ @@ -320,37 +299,42 @@ public class DisThreadedNetworkInterface private Thread senderThread; private Thread receiverThread; - /** Initialization of threads, or confirmation that they remain running */ - private synchronized void initializeSocketSenderReceiverThreads() + /** Initialization of threads, otherwise no action that while they remain running */ + private synchronized void createSenderReceiverThreads() { if (receiverThread == null) { - receiverThread = new Thread(receiverThreadRunnable, TRACE_PREFIX + "receiverThread"); + System.out.println("DisThreadedNetworkInterface initializeSocketSenderReceiverThreads() starting receiverThread..."); + receiverThread = new Thread(receiverThreadRunnable, "receiverThread"); // https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java receiverThread.setDaemon(false); // user thread, not system thread receiverThread.setPriority(Thread.NORM_PRIORITY); receiverThread.start(); } + System.out.println("DisThreadedNetworkInterface initializeSocketSenderReceiverThreads() receiverThread.isAlive()=" + receiverThread.isAlive()); if (senderThread == null) { - senderThread = new Thread(senderThreadRunnable, TRACE_PREFIX + "senderThread"); + System.out.println("DisThreadedNetworkInterface initializeSocketSenderReceiverThreads() starting senderThread..."); + senderThread = new Thread(senderThreadRunnable, "senderThread"); // https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java senderThread.setDaemon(false); // user thread, not system thread senderThread.setPriority(Thread.NORM_PRIORITY); senderThread.start(); } + System.out.println("DisThreadedNetworkInterface initializeSocketSenderReceiverThreads() senderThread.isAlive()=" + receiverThread.isAlive()); } /** * Can be used to restart DisThreadedNetworkInterface if closed. * Create datagram socket if not already available; can also be invoked by * either sender or receiver thread to ensure datagram socket is open. + * Method was originally named start(). */ - public void start() // TODO rename begin() + public void begin() { createDatagramSocket(); // common asset, synchronized to prevent interleaved reentry - initializeSocketSenderReceiverThreads(); + createSenderReceiverThreads(); } /** * Create datagram socket if not already available; can also be invoked by @@ -358,48 +342,59 @@ public class DisThreadedNetworkInterface * Synchronized method to prevent interleaved reentry. * @see <a href="https://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html">Java Tutorials: Synchronized Methods</a> */ - @SuppressWarnings("SleepWhileHoldingLock") // intentional +// @SuppressWarnings("SleepWhileHoldingLock") // intentional private synchronized void createDatagramSocket() { - boolean closedSocket = false; + // The capacity could go up to MAX_DIS_PDU_SIZE, but this should be good for now + baos = new ByteArrayOutputStream(MAX_TRANSMISSION_UNIT_SIZE); + dos = new DataOutputStream(baos); + packet = new DatagramPacket(baos.toByteArray(), baos.size(), inetSocket); + + String message = TRACE_PREFIX; + if ((datagramSocket != null)) { + if (datagramSocket.isConnected()) + { + System.out.println(message + "datagramSocket is already connected..."); + } if (datagramSocket.isClosed()) { - closedSocket = true; datagramSocket = null; System.err.println(" *** " + TRACE_PREFIX + "datagramSocket was available but closed before createDatagramSocket(), resetting..."); } } - if ((datagramSocket == null) || closedSocket) + else // (datagramSocket == null) { try { + System.out.println(message + "datagramSocket is getting connected..."); // The initial value of the SO_BROADCAST socket option is FALSE datagramSocket = new MulticastSocket(getPort()); datagramSocket.setSoTimeout(1000); // msec timeout on reading and then continue looping in order to avoid blocking ((MulticastSocket) datagramSocket).joinGroup(inetSocket, networkInterface); - if (hasVerboseOutput()) - { - String message = TRACE_PREFIX; -// if (hasVerboseOutputIncludesTimestamp()) -// message += " (timestamp " + getTimestamp()); // TODO - message += "datagramSocket.joinGroup address=" + inetSocket.getHostString() + " port=" + inetSocket.getPort() + " start() complete"; - System.out.println(message); - System.out.flush(); - } Thread.sleep (100L); // allow threads, streams to catch up } - catch (InterruptedException ex) { - // continue + catch (InterruptedException ex) + { + System.err.println(" *** " + TRACE_PREFIX + "InterruptedException in DisThreadedNetworkInterface createDatagramSocket(): " + ex.getLocalizedMessage()); } catch (IOException ex) { - System.err.println(" *** " + TRACE_PREFIX + "Exception in DisThreadedNetworkInterface createDatagramSocket(): " + ex.getLocalizedMessage()); + System.err.println(" *** " + TRACE_PREFIX + "IOException in DisThreadedNetworkInterface createDatagramSocket(): " + ex.getLocalizedMessage()); } } + if (hasVerboseOutput()) + { +// if (hasVerboseOutputIncludesTimestamp()) +// message += " (timestamp " + getTimestamp()); // TODO + message += "datagramSocket.joinGroup address=" + inetSocket.getHostString() + " port=" + inetSocket.getPort() + + " isConnected()=" + datagramSocket.isConnected() + " createDatagramSocket() complete."; + System.out.println(message); + System.out.flush(); + } } @SuppressWarnings("SleepWhileInLoop") // intentional @@ -410,16 +405,16 @@ public class DisThreadedNetworkInterface // The capacity could go up to MAX_DIS_PDU_SIZE, but this should be good for now // The raw listeners will strip off any extra padding and process what is required ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_TRANSMISSION_UNIT_SIZE); - DatagramPacket packet = new DatagramPacket(byteBuffer.array(), byteBuffer.capacity()); + DatagramPacket receivedPacket = new DatagramPacket(byteBuffer.array(), byteBuffer.capacity()); while (!killed) // keep trying even if error occurred { try { - while (!killed) // loop until terminated - { - datagramSocket.receive(packet); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port - toRawListeners(packet.getData(), packet.getLength()); +// while (!killed) // loop until terminated +// { + datagramSocket.receive(receivedPacket); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port + toRawListeners(receivedPacket.getData(), receivedPacket.getLength()); Pdu nextPdu = pduFactory.createPdu(byteBuffer); @@ -444,11 +439,8 @@ public class DisThreadedNetworkInterface toListeners(nextPdu); } byteBuffer.clear(); - } - Thread.sleep (100L); // loop successfully killed - } - catch (InterruptedException ex) { - // continue +// } +// Thread.sleep (100L); // loop successfully killed } catch (IOException ex) { @@ -464,23 +456,23 @@ public class DisThreadedNetworkInterface int pduSentCounter = 0; // The capacity could go up to MAX_DIS_PDU_SIZE, but this should be good for now - ByteArrayOutputStream baos = new ByteArrayOutputStream(MAX_TRANSMISSION_UNIT_SIZE); - DataOutputStream dos = new DataOutputStream(baos); - DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(), inetSocket); + baos = new ByteArrayOutputStream(MAX_TRANSMISSION_UNIT_SIZE); + dos = new DataOutputStream(baos); + packet = new DatagramPacket(baos.toByteArray(), baos.size(), inetSocket); while (!killed) // keep trying even if error occurred { try { - while (!killed) // loop until terminated - { - Pdu nextPdu = pdus2sendLinkedBlockingQueue.take(); +// while (!killed) // loop until terminated +// { + Pdu nextPdu = pdus2send.take(); nextPdu.marshal(dos); packet.setData(baos.toByteArray()); datagramSocket.send(packet); + pduSentCounter++; - pduSentCounter++; // TODO experimental, add to generator as a commented-out diagnostic; consider adding diagnostic mode if (hasVerboseOutput() && hasVerboseSending()) { String pad = ""; @@ -489,6 +481,8 @@ public class DisThreadedNetworkInterface String message = TRACE_PREFIX + "[sending " + pad + pduSentCounter + "] " + nextPdu.getPduType().toString(); if (hasVerboseOutputIncludesTimestamp()) message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp()); + if (nextPdu.getPduType() == DisPduType.ENTITY_STATE) + message += " " + String.format("%11s", ((EntityStatePdu)nextPdu).getMarkingString()); message += ", size " + nextPdu.getMarshalledSize() + " bytes)"; System.err.flush(); System.out.println(message); @@ -496,14 +490,13 @@ public class DisThreadedNetworkInterface } dos.flush(); // immediately force pdu write baos.reset(); // prepare for next send - } - if (killed) // operations are finished - { - // check if looping complete, cleanup here to ensure thread exit is clean - baos.close(); - dos.close(); - System.out.println("*** DisThreadedNetworkInterface senderThreadRunnable killed=true, close ByteArrayOutputStream baos and DataOutputStream dos\n"); - } +// } +// if (killed) // operations are finished +// { +// // check if looping complete, cleanup here to ensure thread exit is clean +// System.out.println("*** DisThreadedNetworkInterface senderThreadRunnable killed=true"); +// } +// Thread.sleep (100L); // loop successfully killed } catch (Exception ex) { @@ -562,40 +555,46 @@ public class DisThreadedNetworkInterface { try { - pdus2sendLinkedBlockingQueue.clear(); // all stop if (!killed) { - setKillSentinel(); + setKillSentinel(); // killed = true; Thread.sleep(100l); // let senderThread and receiverThread stop } - try { // join and kill senderThread - if (senderThread != null) - { - senderThread.join(4000); // wait for thread to die, msec max duration - Thread.sleep(100l); - } - } - catch (InterruptedException ie) - { - System.err.println ("*** DisThreadedNetworkInterface senderThread join() failed to wait for senderThread to die"); - System.err.flush(); - ie.printStackTrace(System.err); - } + pdus2send.clear(); // all stop + dos.flush(); // immediately force pdu write + baos.close(); + dos.close(); + System.out.println ("*** Thread close(): pdus2send.size()=" + pdus2send.size() + + " baos.size()=" + baos.size() + " dos.size()=" + dos.size()); + +// try { // join and kill senderThread +// if (senderThread != null) +// { +// senderThread.join(4000); // wait for thread to die, msec max duration +// Thread.sleep(100l); +// } +// } +// catch (InterruptedException ie) +// { +// System.err.println ("*** DisThreadedNetworkInterface senderThread join() failed to wait for senderThread to die"); +// System.err.flush(); +// ie.printStackTrace(System.err); +// } // separate blocks since second join statement otherwise gets skipped if first thread timeout occurs - try { // join and kill receiverThread - if (receiverThread != null) - { - receiverThread.join(4000); // wait for thread to die, msec max duration - Thread.sleep(100l); - } - } - catch (InterruptedException ie) - { - System.err.println ("*** DisThreadedNetworkInterface receiverThread join() failed to wait for threads to die"); - System.err.flush(); - ie.printStackTrace(System.err); - } - // now close socket +// try { // join and kill receiverThread +// if (receiverThread != null) +// { +// receiverThread.join(4000); // wait for thread to die, msec max duration +// Thread.sleep(100l); +// } +// } +// catch (InterruptedException ie) +// { +// System.err.println ("*** DisThreadedNetworkInterface receiverThread join() failed to wait for threads to die"); +// System.err.flush(); +// ie.printStackTrace(System.err); +// } + // now close socket (after killing threads so that hopefully the socket doesn't lock them) if (datagramSocket != null && !datagramSocket.isClosed()) { try { @@ -611,16 +610,65 @@ public class DisThreadedNetworkInterface String message = TRACE_PREFIX; // if (hasVerboseOutputIncludesTimestamp()) // message += " (timestamp " + getTimestamp()); // TODO - message += "datagramSocket.leaveGroup address=" + getAddress() + " port=" + getPort() + " close() complete"; + message += "datagramSocket.leaveGroup address=" + getAddress() + " port=" + getPort() + + " isClosed()=" + datagramSocket.isClosed() + " close() complete."; System.out.println(message); System.out.flush(); } } + killThread ( senderThread); + killThread (receiverThread); + Thread.sleep(1000l); + reportThreadStatus(); + + + // try again!! to close sender thread, TODO what is blocking? +// try { // join and kill senderThread +// if (senderThread != null) +// { +// senderThread.join(6000); // wait for thread to die, msec max duration +// Thread.sleep(100l); +// } +// } +// catch (InterruptedException ie) +// { +// System.err.println ("*** DisThreadedNetworkInterface senderThread join() failed to wait for senderThread to die"); +// System.err.flush(); +// ie.printStackTrace(System.err); +// } } catch (Exception e) { System.err.println ("*** DisThreadedNetworkInterface close()() unexpected exception!"); } + } + public void killThread(Thread threadToKill) + { + try { // join and kill threadToKill + if (threadToKill != null) + { + threadToKill.join(4000); // wait for thread to die, msec max duration + Thread.sleep(100l); + } + } + catch (InterruptedException ie) + { + System.err.println ("*** DisThreadedNetworkInterface threadToKill join() failed to wait for threadToKill to die"); + System.err.flush(); + ie.printStackTrace(System.err); + } + + String threadAlive = "null"; + if (threadToKill != null) + threadAlive = String.valueOf(threadToKill.isAlive()); + System.out.println ("*** killThread() status: " + threadToKill.getName() + ".isAlive()=" + threadAlive); + System.err.flush(); // ensure all output sent + System.out.flush(); // ensure all output sent + } + + /** Report whether senderThread and receiverThread are alive */ + public void reportThreadStatus() + { // report if successful String senderThreadAlive = "null"; String receiverThreadAlive = "null"; @@ -807,4 +855,66 @@ public class DisThreadedNetworkInterface this.descriptor = newDescriptor; TRACE_PREFIX = "[" + (DisThreadedNetworkInterface.class.getSimpleName() + " " + descriptor).trim() + "] "; } + + /** + * Main method for testing. + * @see <a href="https://docs.oracle.com/javase/tutorial/getStarted/application/index.html">Java Tutorials: A Closer Look at the "Hello World!" Application</a> + * @param args [address, port, descriptor] command-line arguments are an array of optional String parameters that are passed from execution environment during invocation + */ + public static void main(String[] args) + { + DisThreadedNetworkInterface disThreadedNetworkInterface; + + System.out.println("*** DisThreadedNetworkInterface main() test started..."); + + String selfTestDescriptor = "main() self test"; + if ((args != null) && args.length == 3) + selfTestDescriptor = args[2]; + if ((args == null) || (args.length == 0)) + disThreadedNetworkInterface = new DisThreadedNetworkInterface( /* default address, port */ selfTestDescriptor); + else disThreadedNetworkInterface = new DisThreadedNetworkInterface(args[0], Integer.parseInt(args[1]), selfTestDescriptor); + + System.out.println("*** DisThreadedNetworkInterface main() self test created instance."); + + try + { + EntityStatePdu espdu = new EntityStatePdu(); + espdu.setMarking("self test"); + disThreadedNetworkInterface.setVerbose(true); + DisThreadedNetworkInterface.PduListener pduListener; + pduListener = new DisThreadedNetworkInterface.PduListener() + { + /** Callback handler for listener */ + @Override + public void incomingPdu(Pdu newPdu) + { + System.out.println( "received newPdu " + newPdu.getPduType().toString()); + System.out.flush(); + } + }; + disThreadedNetworkInterface.addListener(pduListener); + + System.out.println("*** DisThreadedNetworkInterface main() self test sending espdu..."); + disThreadedNetworkInterface.send(espdu); + // briefly wait get response from threaded receiver + Thread.sleep(500L); + System.out.flush(); + System.err.flush(); + // all done, close() in finally block + } + catch(InterruptedException ex) + { + System.out.println("*** DisThreadedNetworkInterface main() self test InterruptedException " + ex.getMessage()); + } + finally // clean up following regular completion and any interruptions + + { + if (disThreadedNetworkInterface != null) + disThreadedNetworkInterface.close(); + + System.out.flush(); + System.err.flush(); + System.out.println("*** DisThreadedNetworkInterface main() self test complete."); + } + } } -- GitLab