diff --git a/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java b/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java index ae8e09e5260cfd4bd384646d593b40cee25ba890..90a59bee4e0c90c517ba68fc0d60fec62a50a000 100644 --- a/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java +++ b/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java @@ -6,6 +6,7 @@ package edu.nps.moves.dis7.utilities; import edu.nps.moves.dis7.enumerations.DisPduType; import edu.nps.moves.dis7.pdus.DisTime; +import edu.nps.moves.dis7.pdus.EntityStatePdu; import edu.nps.moves.dis7.pdus.Pdu; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -107,8 +108,7 @@ public class DisThreadedNetworkInterface // independently available parameters for each DisThreadedNetworkInterface object private String disAddress; private int disPort; - private boolean killed = false; - private int killCounter = 0; + private boolean killed = false; // thread loop sentinel private InetAddress inetAddress; private InetSocketAddress inetSocket; @@ -116,13 +116,20 @@ public class DisThreadedNetworkInterface private DatagramSocket datagramSocket; /** - * Object constructor using default multicast address and port + * Object constructor using default multicast address and port, no descriptor */ public DisThreadedNetworkInterface() { - this(DEFAULT_DIS_ADDRESS, DEFAULT_DIS_PORT); + this(DEFAULT_DIS_ADDRESS, DEFAULT_DIS_PORT, ""); + } + /** + * Object constructor with descriptor, using default multicast address and port + * @param newDescriptor simple descriptor name for this interface + */ + public DisThreadedNetworkInterface(String newDescriptor) + { + this(DEFAULT_DIS_ADDRESS, DEFAULT_DIS_PORT, newDescriptor); } - /** * Object constructor using specified multicast address and port * @param address the multicast group or unicast address to utilize @@ -130,6 +137,22 @@ public class DisThreadedNetworkInterface */ public DisThreadedNetworkInterface(String address, int port) { + + this(address, port, ""); + } + /** + * Object constructor using specified multicast address and port, plus descriptor. + * @param address the multicast group or unicast address to utilize + * @param port the multicast port to utilize + * @param newDescriptor simple descriptor name for this interface + */ + public DisThreadedNetworkInterface(String address, int port, String newDescriptor) + { + if (newDescriptor == null) + descriptor = ""; + else descriptor = newDescriptor; + TRACE_PREFIX = "[" + (DisThreadedNetworkInterface.class.getSimpleName() + " " + descriptor).trim() + "] "; + disAddress = address; disPort = port; try @@ -142,27 +165,41 @@ public class DisThreadedNetworkInterface } inetSocket = new InetSocketAddress(inetAddress, disPort); // tests that accessor methods are working as set networkInterface = findIpv4Interface(); - initializeSocketSenderReceiverThreads(); + + start(); } /* *********** queues and lists and public methods ************** */ private final List<PduListener> everyTypeListeners = new ArrayList<>(); private final Map<DisPduType, List<PduListener>> typeListeners = new HashMap<>(); private final List<RawPduListener> rawListeners = new ArrayList<>(); - private final LinkedBlockingQueue<Pdu> pdus2send = new LinkedBlockingQueue<>(); + // https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/LinkedBlockingQueue.html + private final LinkedBlockingQueue<Pdu> pdus2sendLinkedBlockingQueue = new LinkedBlockingQueue<>(); // FIFO + /** * Main method for testing. * @see <a href="https://docs.oracle.com/javase/tutorial/getStarted/application/index.html">Java Tutorials: A Closer Look at the "Hello World!" Application</a> - * @param args command-line arguments are an array of optional String parameters that are passed from execution environment during invocation + * @param args [address, port, descriptor] command-line arguments are an array of optional String parameters that are passed from execution environment during invocation */ public static void main(String[] args) { System.out.println("*** DisThreadedNetworkInterface main() test started..."); - DisThreadedNetworkInterface disThreadedNetworkInterface = new DisThreadedNetworkInterface(); + DisThreadedNetworkInterface disThreadedNetworkInterface; + String selfTestDescriptor = "DisThreadedNetworkInterface main() self test"; + if (args.length == 3) + selfTestDescriptor = args[2]; + if ((args == null) || (args.length == 0)) + disThreadedNetworkInterface = new DisThreadedNetworkInterface( /* default address, port */ selfTestDescriptor); + else disThreadedNetworkInterface = new DisThreadedNetworkInterface(args[0], Integer.parseInt(args[1]), selfTestDescriptor); + + System.out.println("*** DisThreadedNetworkInterface main() self test created instance."); + disThreadedNetworkInterface.close(); - System.out.println("*** DisThreadedNetworkInterface main() test complete."); + System.out.flush(); + System.err.flush(); + System.out.println("*** DisThreadedNetworkInterface main() self test complete."); } /** @@ -274,7 +311,7 @@ public class DisThreadedNetworkInterface */ public void send(Pdu pdu) { - pdus2send.add(pdu); + pdus2sendLinkedBlockingQueue.add(pdu); } /* *************** networking i/o ************* */ @@ -286,12 +323,11 @@ public class DisThreadedNetworkInterface /** Initialization of threads, or confirmation that they remain running */ private synchronized void initializeSocketSenderReceiverThreads() { - createDatagramSocket(); // common asset, synchronized to prevent interleaved reentry - if (receiverThread == null) { receiverThread = new Thread(receiverThreadRunnable, TRACE_PREFIX + "receiverThread"); - receiverThread.setDaemon(true); + // https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java + receiverThread.setDaemon(false); // user thread, not system thread receiverThread.setPriority(Thread.NORM_PRIORITY); receiverThread.start(); } @@ -299,7 +335,8 @@ public class DisThreadedNetworkInterface if (senderThread == null) { senderThread = new Thread(senderThreadRunnable, TRACE_PREFIX + "senderThread"); - senderThread.setDaemon(true); + // https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java + senderThread.setDaemon(false); // user thread, not system thread senderThread.setPriority(Thread.NORM_PRIORITY); senderThread.start(); } @@ -309,9 +346,11 @@ public class DisThreadedNetworkInterface * Create datagram socket if not already available; can also be invoked by * either sender or receiver thread to ensure datagram socket is open. */ - public void start () + public void start() // TODO rename begin() { - createDatagramSocket(); // usually automatic, might be needed if previous connection was closed + createDatagramSocket(); // common asset, synchronized to prevent interleaved reentry + + initializeSocketSenderReceiverThreads(); } /** * Create datagram socket if not already available; can also be invoked by @@ -331,11 +370,6 @@ public class DisThreadedNetworkInterface datagramSocket = null; System.err.println(" *** " + TRACE_PREFIX + "datagramSocket was available but closed before createDatagramSocket(), resetting..."); } - else - { -// System.err.println(" *** " + TRACE_PREFIX + "datagramSocket was not closed before createDatagramSocket(), ignoring..."); // debug - return; - } } if ((datagramSocket == null) || closedSocket) { @@ -343,6 +377,8 @@ public class DisThreadedNetworkInterface { // The initial value of the SO_BROADCAST socket option is FALSE datagramSocket = new MulticastSocket(getPort()); + datagramSocket.setSoTimeout(1000); // msec timeout on reading and then continue looping in order to avoid blocking + ((MulticastSocket) datagramSocket).joinGroup(inetSocket, networkInterface); if (hasVerboseOutput()) @@ -354,7 +390,6 @@ public class DisThreadedNetworkInterface System.out.println(message); System.out.flush(); } - initializeSocketSenderReceiverThreads(); Thread.sleep (100L); // allow threads, streams to catch up } catch (InterruptedException ex) { @@ -377,13 +412,10 @@ public class DisThreadedNetworkInterface ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_TRANSMISSION_UNIT_SIZE); DatagramPacket packet = new DatagramPacket(byteBuffer.array(), byteBuffer.capacity()); - while (!killed) // keep trying even if error occured + while (!killed) // keep trying even if error occurred { - // If something trips up with the socket, this thread will attempt to - // re-establish socket for both send/receive threads - try { - createDatagramSocket (); // ensure socket open, recreate if needed, other thread may occur first - + try + { while (!killed) // loop until terminated { datagramSocket.receive(packet); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port @@ -402,36 +434,29 @@ public class DisThreadedNetworkInterface String message = TRACE_PREFIX + "[receipt " + pad + pduReceiptCounter + "] " + nextPdu.getPduType().toString(); if (hasVerboseOutputIncludesTimestamp()) message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp()); + if (nextPdu.getPduType() == DisPduType.ENTITY_STATE) + message += " " + String.format("%11s", ((EntityStatePdu)nextPdu).getMarkingString()); message += ", size " + nextPdu.getMarshalledSize() + " bytes)"; + System.err.flush(); System.out.println(message); System.out.flush(); - System.err.flush(); } toListeners(nextPdu); } byteBuffer.clear(); } - Thread.sleep (100L); + Thread.sleep (100L); // loop successfully killed } catch (InterruptedException ex) { // continue } - catch (IOException ex) { - if (!killed) - { - System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receiverThread: " + ex.getLocalizedMessage()); - System.err.println(TRACE_PREFIX + "Retrying new socket..."); - } - } - finally + catch (IOException ex) { - close(); - } + // read commonly times out while waiting for PDUs to arrive, and so this trace statement is for debugging +// System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receiverThread: " + ex.getLocalizedMessage()); + } } - if (killed) - close(); // retry now that threads are killed - System.err.flush(); - System.out.flush(); + // returning kills thread, do not put any other steps here }; private final Runnable senderThreadRunnable = () -> { @@ -443,16 +468,13 @@ public class DisThreadedNetworkInterface DataOutputStream dos = new DataOutputStream(baos); DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(), inetSocket); - while (!killed) // keep trying even if error occured + while (!killed) // keep trying even if error occurred { - // If something trips up with the socket, this thread will attempt to - // re-establish socket for both send/receive threads - try { - createDatagramSocket (); // ensure socket open, recreate if needed, other thread may occur first - + try + { while (!killed) // loop until terminated { - Pdu nextPdu = pdus2send.take(); + Pdu nextPdu = pdus2sendLinkedBlockingQueue.take(); nextPdu.marshal(dos); packet.setData(baos.toByteArray()); @@ -468,34 +490,27 @@ public class DisThreadedNetworkInterface if (hasVerboseOutputIncludesTimestamp()) message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp()); message += ", size " + nextPdu.getMarshalledSize() + " bytes)"; + System.err.flush(); System.out.println(message); System.out.flush(); - System.err.flush(); } dos.flush(); // immediately force pdu write baos.reset(); // prepare for next send } - } - catch (Exception ex) - { - if (!killed) + if (killed) // operations are finished { - System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface senderThread: " + ex.getLocalizedMessage()); + // check if looping complete, cleanup here to ensure thread exit is clean + baos.close(); + dos.close(); + System.out.println("*** DisThreadedNetworkInterface senderThreadRunnable killed=true, close ByteArrayOutputStream baos and DataOutputStream dos\n"); } } + catch (Exception ex) + { + System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface senderThread: " + ex.getLocalizedMessage()); + } } - try { - // operations are finished - pdus2send.clear(); - dos.close(); - this.close(); - - } - catch (IOException e) {} // shutting down, no need to report exception - if (killed) - close(); // retry now that threads are killed - System.err.flush(); - System.out.flush(); + // returning kills thread, do not put any other steps here }; private void toListeners(Pdu pdu) @@ -518,81 +533,106 @@ public class DisThreadedNetworkInterface } } - private void toRawListeners(byte[] data, int len) - { - if(rawListeners.isEmpty()) - return; - - ByteArrayBufferAndLength bl = new ByteArrayBufferAndLength(data, len); - rawListeners.forEach(lis->lis.incomingPdu(bl)); - } - - /** Method renamed as <code>close()</code> so use that method instead. - */ - @Deprecated - public void kill() - { - setKillSentinel(); - } + private void toRawListeners(byte[] data, int len) + { + if(rawListeners.isEmpty()) + return; - /** Finish pending send/receive activity and then close. */ - public void setKillSentinel() - { - killed = true; // set loop sentinel for threads to finish - } + ByteArrayBufferAndLength bl = new ByteArrayBufferAndLength(data, len); + rawListeners.forEach(lis->lis.incomingPdu(bl)); + } - /** Terminate the instance after completion of pending send/receive activity. - * Synchronized to prevent interleaved invocation. */ - public synchronized void close() - { - if (!killed || ((killCounter > 0) && (killCounter < 2))) - { - killCounter++; // repeat for all threads - setKillSentinel(); - return; // allow network operations to finish, then killed threads are expected to reenter here - } - - try + /** Method renamed as <code>close()</code> so use that method instead. + */ + @Deprecated + public void kill() { - if (senderThread != null) - senderThread.join(4000); // wait for thread to die, msec max duration - if (receiverThread != null) - receiverThread.join(4000); // wait for thread to die, msec max duration - senderThread = null; // make sure - receiverThread = null; // make sure + setKillSentinel(); } - catch (InterruptedException ie) + + /** Finish pending send/receive activity and then close. */ + public void setKillSentinel() { - System.err.println ("*** DisThreadedNetworkInterface thread join() failed to wait for threads to die"); - System.err.flush(); - ie.printStackTrace(System.err); + killed = true; // set loop sentinel for threads to finish } - - if (datagramSocket != null && !datagramSocket.isClosed()) + + /** Terminate the instance after completion of pending send/receive activity. + * Synchronized to prevent interleaved invocation. */ + public synchronized void close() { - try { - ((MulticastSocket)datagramSocket).leaveGroup(inetSocket, networkInterface); - } catch (IOException ex) { - Logger.getLogger(DisThreadedNetworkInterface.class.getName()).log(Level.SEVERE, null, ex); - } - datagramSocket.close(); - sleep(100l); // TODO needed? + try + { + pdus2sendLinkedBlockingQueue.clear(); // all stop + if (!killed) + { + setKillSentinel(); + Thread.sleep(100l); // let senderThread and receiverThread stop + } + try { // join and kill senderThread + if (senderThread != null) + { + senderThread.join(4000); // wait for thread to die, msec max duration + Thread.sleep(100l); + } + } + catch (InterruptedException ie) + { + System.err.println ("*** DisThreadedNetworkInterface senderThread join() failed to wait for senderThread to die"); + System.err.flush(); + ie.printStackTrace(System.err); + } + // separate blocks since second join statement otherwise gets skipped if first thread timeout occurs + try { // join and kill receiverThread + if (receiverThread != null) + { + receiverThread.join(4000); // wait for thread to die, msec max duration + Thread.sleep(100l); + } + } + catch (InterruptedException ie) + { + System.err.println ("*** DisThreadedNetworkInterface receiverThread join() failed to wait for threads to die"); + System.err.flush(); + ie.printStackTrace(System.err); + } + // now close socket + if (datagramSocket != null && !datagramSocket.isClosed()) + { + try { + ((MulticastSocket)datagramSocket).leaveGroup(inetSocket, networkInterface); + } catch (IOException ex) { + Logger.getLogger(DisThreadedNetworkInterface.class.getName()).log(Level.SEVERE, null, ex); + } + datagramSocket.close(); + sleep(100l); // TODO needed? - if (hasVerboseOutput()) + if (hasVerboseOutput()) + { + String message = TRACE_PREFIX; + // if (hasVerboseOutputIncludesTimestamp()) + // message += " (timestamp " + getTimestamp()); // TODO + message += "datagramSocket.leaveGroup address=" + getAddress() + " port=" + getPort() + " close() complete"; + System.out.println(message); + System.out.flush(); + } + } + } + catch (Exception e) { - String message = TRACE_PREFIX; -// if (hasVerboseOutputIncludesTimestamp()) -// message += " (timestamp " + getTimestamp()); // TODO - message += "datagramSocket.leaveGroup address=" + getAddress() + " port=" + getPort() + " stop() complete"; - System.out.println(message); - System.out.flush(); + System.err.println ("*** DisThreadedNetworkInterface close()() unexpected exception!"); } + // report if successful + String senderThreadAlive = "null"; + String receiverThreadAlive = "null"; + if (senderThread != null) + senderThreadAlive = String.valueOf( senderThread.isAlive()); + if (receiverThread != null) + receiverThreadAlive = String.valueOf(receiverThread.isAlive()); + System.out.println ("*** Thread closing status: senderThread.isAlive()=" + senderThreadAlive + + " receiverThread.isAlive()=" + receiverThreadAlive); + System.err.flush(); // ensure all output sent + System.out.flush(); // ensure all output sent } - datagramSocket = null; // make sure - killCounter = 0; // prepare for next run - System.err.flush(); // ensure all output sent - System.out.flush(); // ensure all output sent - } /** Thread sleep for indicated interval * @param duration milliseconds */ @@ -760,7 +800,7 @@ public class DisThreadedNetworkInterface /** * Set new simple descriptor (such as parent class name) for this network interface, used in trace statements - * @param newDescriptor simple descriptor name + * @param newDescriptor simple descriptor name for this interface */ public void setDescriptor(String newDescriptor) {