From 7956e9e6356a665ca8aa9d7e28993f6a1c91b36b Mon Sep 17 00:00:00 2001 From: brutzman <brutzman@nps.edu> Date: Sat, 28 Aug 2021 22:15:19 -0700 Subject: [PATCH] improved logging and network threading, release version update --- .../DisThreadedNetworkInterface.java | 311 +++++++++++------- 1 file changed, 199 insertions(+), 112 deletions(-) diff --git a/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java b/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java index 3d3f298b99..865be79492 100644 --- a/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java +++ b/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java @@ -29,18 +29,19 @@ import java.util.logging.Logger; */ public class DisThreadedNetworkInterface { - /** Default multicast group address <code>229.1.2.3</code> for send and receive connections. + /** Default multicast group address <code>239.1.2.3</code> for send and receive connections. * @see <a href="https://en.wikipedia.org/wiki/Multicast_address">https://en.wikipedia.org/wiki/Multicast_address</a> */ - public static String DEFAULT_MULTICAST_ADDRESS = "229.1.2.3"; + public static String DEFAULT_DIS_ADDRESS = "239.1.2.3"; /** Default socket port <code>3000</code>, matches Wireshark DIS capture default * @see <a href="https://en.wikipedia.org/wiki/Port_(computer_networking)">https://en.wikipedia.org/wiki/Port_(computer_networking)</a> */ public static int DEFAULT_DIS_PORT = 3000; - private static final String TRACE_PREFIX = "[" + DisThreadedNetworkInterface.class.getName() + "] "; - private boolean verbose = true; - private boolean verboseReceipt = true; - private boolean verboseSending = true; + private String descriptor = new String(); + private String TRACE_PREFIX = "[" + (this.getClass().getSimpleName() + " " + getDescriptor()).trim() + "] "; + private boolean verbose = true; + private boolean verboseReceipt = true; + private boolean verboseSending = true; private boolean verboseIncludesTimestamp = false; /** @@ -107,43 +108,44 @@ public class DisThreadedNetworkInterface */ public static final int MAX_TRANSMISSION_UNIT_SIZE = 1500; - private int disPort; - private String multicastAddress; + // independently available parameters for each DisThreadedNetworkInterface object + private String disAddress; + private int disPort; private boolean killed = false; - private InetAddress inetAddress; + private InetAddress inetAddress; private InetSocketAddress inetSocket; - private NetworkInterface networkInterface; - private DatagramSocket datagramSocket = null; + private NetworkInterface networkInterface; + private DatagramSocket datagramSocket; /** * Object constructor using default multicast address and port */ public DisThreadedNetworkInterface() { - this(DEFAULT_MULTICAST_ADDRESS, DEFAULT_DIS_PORT); + this(DEFAULT_DIS_ADDRESS, DEFAULT_DIS_PORT); } /** * Object constructor using specified multicast address and port - * @param multicastGroup the multicast group address to utilize + * @param address the multicast group or unicast address to utilize * @param port the multicast port to utilize */ - public DisThreadedNetworkInterface(String multicastGroup, int port) + public DisThreadedNetworkInterface(String address, int port) { - disPort = port; - multicastAddress = multicastGroup; + disAddress = address; + disPort = port; try { - inetAddress = InetAddress.getByName(multicastAddress); + inetAddress = InetAddress.getByName(disAddress); } catch (UnknownHostException ex) { Logger.getLogger(DisThreadedNetworkInterface.class.getName()).log(Level.SEVERE, null, ex); } - inetSocket = new InetSocketAddress(inetAddress, disPort); + inetSocket = new InetSocketAddress(disAddress, disPort); // tests that accessor methods are working as set networkInterface = findIpv4Interface(); - init(); + initializeSocketSenderReceiverThreads(); } /* *********** queues and lists and public methods ************** */ @@ -151,6 +153,20 @@ public class DisThreadedNetworkInterface private final Map<DisPduType, List<PduListener>> typeListeners = new HashMap<>(); private final List<RawPduListener> rawListeners = new ArrayList<>(); private final LinkedBlockingQueue<Pdu> pdus2send = new LinkedBlockingQueue<>(); + /** + * Main method for testing. + * @see <a href="https://docs.oracle.com/javase/tutorial/getStarted/application/index.html">Java Tutorials: A Closer Look at the "Hello World!" Application</a> + * @param args command-line arguments are an array of optional String parameters that are passed from execution environment during invocation + */ + public static void main(String[] args) + { + System.out.println("*** DisThreadedNetworkInterface main() test started..."); + + DisThreadedNetworkInterface disThreadedNetworkInterface = new DisThreadedNetworkInterface(); + disThreadedNetworkInterface.close(); + + System.out.println("*** DisThreadedNetworkInterface main() test complete."); + } /** * Add a listener to accept only pdus of a given type @@ -227,23 +243,6 @@ public class DisThreadedNetworkInterface sleep(100l); // TODO needed? } - /** Deprecated, replaced by getPort() - * @return current port value - */ - @Deprecated - public int getDisPort() - { - return getPort(); - } - /** Get network port used, multicast or unicast. - * @see <a href="https://en.wikipedia.org/wiki/Port_(computer_networking)">https://en.wikipedia.org/wiki/Port_(computer_networking)</a> - * @return current port value - */ - public int getPort() - { - return disPort; - } - /** * Deprecated, replaced by getAddress() * @return current multicast address value @@ -255,12 +254,21 @@ public class DisThreadedNetworkInterface } /** - * Get current multicast (or unicast) address value + * Get current multicast (or unicast) network address for send and receive connections. + * @see <a href="https://en.wikipedia.org/wiki/Multicast_address">https://en.wikipedia.org/wiki/Multicast_address</a> * @return current multicast address value */ public String getAddress() { - return multicastAddress; + return this.disAddress; + } + /** + * Network address for send and receive connections. + * @see <a href="https://en.wikipedia.org/wiki/Multicast_address">https://en.wikipedia.org/wiki/Multicast_address</a> + * @param newAddress the new network address to set + */ + public void setAddress(String newAddress) { + this.disAddress = newAddress; } /** @@ -275,25 +283,39 @@ public class DisThreadedNetworkInterface /* *************** networking i/o ************* */ private PduFactory pduFactory = new PduFactory(); - private Thread sender; - private Thread receiver; + private Thread senderThread; + private Thread receiverThread; - /** Initialization */ - private void init() + /** Initialization of threads, or confirmation that they remain running */ + private synchronized void initializeSocketSenderReceiverThreads() { createDatagramSocket(); // common asset, synchronized to prevent interleaved reentry - receiver = new Thread(receiveThread, TRACE_PREFIX + "receive thread"); - receiver.setDaemon(true); - receiver.setPriority(Thread.NORM_PRIORITY); - receiver.start(); + if (receiverThread == null) + { + receiverThread = new Thread(receiverThreadRunnable, TRACE_PREFIX + "receiverThread"); + receiverThread.setDaemon(true); + receiverThread.setPriority(Thread.NORM_PRIORITY); + receiverThread.start(); + } - sender = new Thread(sendThread, TRACE_PREFIX + "send thread"); - sender.setDaemon(true); - sender.setPriority(Thread.NORM_PRIORITY); - sender.start(); + if (senderThread == null) + { + senderThread = new Thread(senderThreadRunnable, TRACE_PREFIX + "senderThread"); + senderThread.setDaemon(true); + senderThread.setPriority(Thread.NORM_PRIORITY); + senderThread.start(); + } + } + /** + * Can be used to restart DisThreadedNetworkInterface if closed. + * Create datagram socket if not already available; can also be invoked by + * either sender or receiver thread to ensure datagram socket is open. + */ + public void start () + { + createDatagramSocket(); // usually automatic, might be needed if previous connection was closed } - /** * Create datagram socket if not already available; can also be invoked by * either sender or receiver thread to ensure datagram socket is open. @@ -303,10 +325,19 @@ public class DisThreadedNetworkInterface private synchronized void createDatagramSocket() { boolean closedSocket = false; - if ((datagramSocket != null) && datagramSocket.isClosed()) + if ((datagramSocket != null)) { - closedSocket = true; - System.err.println(" *** " + TRACE_PREFIX + "datagramSocket.isClosed() unexpectedly, retrying..."); + if (datagramSocket.isClosed()) + { + closedSocket = true; + datagramSocket = null; + System.err.println(" *** " + TRACE_PREFIX + "datagramSocket was available but closed before createDatagramSocket(), resetting..."); + } + else + { +// System.err.println(" *** " + TRACE_PREFIX + "datagramSocket was not closed before createDatagramSocket(), ignoring..."); // debug + return; + } } if ((datagramSocket == null) || closedSocket) { @@ -321,76 +352,85 @@ public class DisThreadedNetworkInterface String message = TRACE_PREFIX; // if (hasVerboseOutputIncludesTimestamp()) // message += " (timestamp " + getTimestamp()); // TODO - message += "datagramSocket.joinGroup address=" + getAddress() + " port=" + getPort(); + message += "datagramSocket.joinGroup address=" + inetSocket.getHostString() + " port=" + inetSocket.getPort() + " start() complete"; System.out.println(message); System.out.flush(); } - } + initializeSocketSenderReceiverThreads(); + Thread.sleep (100L); // allow threads, streams to catch up + } + catch (InterruptedException ex) { + // continue + } catch (IOException ex) { - System.err.println(" *** " + TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receive thread: " + ex.getLocalizedMessage()); + System.err.println(" *** " + TRACE_PREFIX + "Exception in DisThreadedNetworkInterface createDatagramSocket(): " + ex.getLocalizedMessage()); } } } - private Runnable receiveThread = () -> { + private Runnable receiverThreadRunnable = () -> { int pduReceiptCounter = 0; // The capacity could go up to MAX_DIS_PDU_SIZE, but this should be good for now - // The raw listeners will strip off any extra padding and process what is - // required + // The raw listeners will strip off any extra padding and process what is required ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_TRANSMISSION_UNIT_SIZE); DatagramPacket packet = new DatagramPacket(byteBuffer.array(), byteBuffer.capacity()); - Pdu pdu; - while (!killed) { // keep trying on error - + while (!killed) // keep trying even if error occured + { // If something trips up with the socket, this thread will attempt to - // re-establish for both send/receive threads + // re-establish socket for both send/receive threads try { createDatagramSocket (); // ensure socket open, recreate if needed, other thread may occur first - while (!killed) + while (!killed) // loop until terminated { datagramSocket.receive(packet); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port toRawListeners(packet.getData(), packet.getLength()); - pdu = pduFactory.createPdu(byteBuffer); + Pdu nextPdu = pduFactory.createPdu(byteBuffer); - if (pdu != null) + if (nextPdu != null) { pduReceiptCounter++; // TODO experimental, add to generator as a commented-out diagnostic; consider adding diagnostic mode if (hasVerboseOutput() && hasVerboseReceipt()) { - String message = TRACE_PREFIX + "[receipt " + pduReceiptCounter + "] " + pdu.getPduType().toString(); + String pad = ""; + if (pduReceiptCounter < 10) + pad = " "; + String message = TRACE_PREFIX + "[receipt " + pad + pduReceiptCounter + "] " + nextPdu.getPduType().toString(); if (hasVerboseOutputIncludesTimestamp()) - message += " (timestamp " + DisTime.timeStampToString(pdu.getTimestamp()); - message +=", size " + pdu.getMarshalledSize() + " bytes)"; + message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp()); + message += ", size " + nextPdu.getMarshalledSize() + " bytes)"; System.out.println(message); System.out.flush(); } - toListeners(pdu); + toListeners(nextPdu); } byteBuffer.clear(); } + Thread.sleep (100L); + } + catch (InterruptedException ex) { + // continue } catch (IOException ex) { - System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receive thread: " + ex.getLocalizedMessage()); - System.err.println(TRACE_PREFIX + "Retrying new socket..."); + if (!killed) + { + System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receiverThread: " + ex.getLocalizedMessage()); + System.err.println(TRACE_PREFIX + "Retrying new socket..."); + } } finally { close(); } -// if (!killed) -// sleep(250); } }; - private final Runnable sendThread = () -> { - - Pdu pdu; + private final Runnable senderThreadRunnable = () -> { int pduSentCounter = 0; @@ -399,37 +439,44 @@ public class DisThreadedNetworkInterface DataOutputStream dos = new DataOutputStream(baos); DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(), inetSocket); - while (!killed) { // keep trying on error - + while (!killed) // keep trying even if error occured + { // If something trips up with the socket, this thread will attempt to - // re-establish for both send/receive threads + // re-establish socket for both send/receive threads try { createDatagramSocket (); // ensure socket open, recreate if needed, other thread may occur first - while (!killed) { - pdu = pdus2send.take(); + while (!killed) // loop until terminated + { + Pdu nextPdu = pdus2send.take(); - pdu.marshal(dos); + nextPdu.marshal(dos); packet.setData(baos.toByteArray()); datagramSocket.send(packet); pduSentCounter++; // TODO experimental, add to generator as a commented-out diagnostic; consider adding diagnostic mode if (hasVerboseOutput() && hasVerboseSending()) { - String message = TRACE_PREFIX + "[sending " + pduSentCounter + "] " + pdu.getPduType().toString(); + String pad = ""; + if (pduSentCounter < 10) + pad = " "; + String message = TRACE_PREFIX + "[sending " + pad + pduSentCounter + "] " + nextPdu.getPduType().toString(); if (hasVerboseOutputIncludesTimestamp()) - message += " (timestamp " + DisTime.timeStampToString(pdu.getTimestamp()); - message +=", size " + pdu.getMarshalledSize() + " bytes)"; + message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp()); + message += ", size " + nextPdu.getMarshalledSize() + " bytes)"; System.out.println(message); System.out.flush(); } dos.flush(); // immediately force pdu write - baos.reset(); + baos.reset(); // prepare for next send } } catch (Exception ex) { - System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface send thread: " + ex.getLocalizedMessage()); + if (!killed) + { + System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface senderThread: " + ex.getLocalizedMessage()); + } } } try { @@ -439,7 +486,8 @@ public class DisThreadedNetworkInterface catch (IOException e) {} // shutting down, no need to report exception }; - private void toListeners(Pdu pdu) { + private void toListeners(Pdu pdu) + { if (everyTypeListeners.isEmpty()) { return; } @@ -478,15 +526,17 @@ public class DisThreadedNetworkInterface /** Finish pending send/receive activity and then close. */ public void finishOperations() { - killed = true; // set loop sentinel for threads + killed = true; // set loop sentinel for threads to finish } - /** Terminate the instance after completion of pending send/receive activity. */ - public void close() + /** Terminate the instance after completion of pending send/receive activity. + * Synchronized to prevent interleaved invocation. */ + public synchronized void close() { finishOperations(); - if (datagramSocket != null && !datagramSocket.isClosed()) { + if (datagramSocket != null && !datagramSocket.isClosed()) + { try { ((MulticastSocket)datagramSocket).leaveGroup(inetSocket, networkInterface); } catch (IOException ex) { @@ -494,18 +544,18 @@ public class DisThreadedNetworkInterface } datagramSocket.close(); sleep(100l); // TODO needed? - datagramSocket = null; if (hasVerboseOutput()) { String message = TRACE_PREFIX; // if (hasVerboseOutputIncludesTimestamp()) // message += " (timestamp " + getTimestamp()); // TODO - message += "datagramSocket.leaveGroup address=" + getAddress() + " port=" + getPort(); + message += "datagramSocket.leaveGroup address=" + getAddress() + " port=" + getPort() + " stop() complete"; System.out.println(message); System.out.flush(); } } + datagramSocket = null; // make sure } /** Thread sleep for indicated interval @@ -524,27 +574,30 @@ public class DisThreadedNetworkInterface } /** - * Find proper IPV4 interface + * Find proper IPV4 interface on this computer for use with the djs address * * @return a network interface to use to join a multicast group */ - public static NetworkInterface findIpv4Interface() { + public static NetworkInterface findIpv4Interface() + { try { - Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces(); - NetworkInterface nif; - Enumeration<InetAddress> addresses; - InetAddress addr; - - while (ifaces != null && ifaces.hasMoreElements()) { - nif = ifaces.nextElement(); - if (nif.isUp()) { - addresses = nif.getInetAddresses(); - while (addresses.hasMoreElements()) { - addr = addresses.nextElement(); - if (addr instanceof Inet4Address && !addr.isLoopbackAddress() && !addr.isLinkLocalAddress()) + Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces(); + + while (networkInterfaces != null && networkInterfaces.hasMoreElements()) + { + NetworkInterface networkInterface = networkInterfaces.nextElement(); + if (networkInterface.isUp()) + { + // now check available addresses available on this running interface + Enumeration<InetAddress> addresses = networkInterface.getInetAddresses(); + while (addresses.hasMoreElements()) + { + InetAddress nextAddress = addresses.nextElement(); + if (nextAddress instanceof Inet4Address && !nextAddress.isLoopbackAddress() && !nextAddress.isLinkLocalAddress()) { - System.out.println(TRACE_PREFIX + "Using network interface " + nif.getDisplayName()); - return nif; + // can't use object descriptor in static context + System.out.println("[" + DisThreadedNetworkInterface.class.getSimpleName() + "] " + "using network interface " + networkInterface.getDisplayName()); + return networkInterface; } } } @@ -637,6 +690,22 @@ public class DisThreadedNetworkInterface this.verboseIncludesTimestamp = verboseIncludesTimestamp; } + /** Deprecated, replaced by getPort() + * @return current port value + */ + @Deprecated + public int getDisPort() + { + return getPort(); + } + /** Get network port used, multicast or unicast. + * @see <a href="https://en.wikipedia.org/wiki/Port_(computer_networking)">https://en.wikipedia.org/wiki/Port_(computer_networking)</a> + * @return current port value + */ + public int getPort() + { + return this.disPort; + } /** /** Set network port used, multicast or unicast. * @see <a href="https://en.wikipedia.org/wiki/Port_(computer_networking)">https://en.wikipedia.org/wiki/Port_(computer_networking)</a> @@ -646,4 +715,22 @@ public class DisThreadedNetworkInterface { this.disPort = newPortValue; } + /** + * Get simple descriptor (such as parent class name) for this network interface, used in trace statements + * @return simple descriptor name + */ + public String getDescriptor() + { + return descriptor; + } + + /** + * Set new simple descriptor (such as parent class name) for this network interface, used in trace statements + * @param newDescriptor simple descriptor name + */ + public void setDescriptor(String newDescriptor) + { + this.descriptor = newDescriptor; + TRACE_PREFIX = "[" + (DisThreadedNetworkInterface.class.getSimpleName() + " " + descriptor).trim() + "] "; + } } -- GitLab