diff --git a/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java b/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java index 091c9933b1a64b863a20634ec8f3c46840570441..ed9989b30b2823f14c1191593aff327ab591b7d1 100644 --- a/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java +++ b/src/edu/nps/moves/dis7/utilities/DisThreadedNetworkInterface.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2008-2020, MOVES Institute, Naval Postgraduate School (NPS). All rights reserved. + * Copyright (c) 2008-2021, MOVES Institute, Naval Postgraduate School (NPS). All rights reserved. * This work is provided under a BSD open-source license, see project license.html and license.txt */ package edu.nps.moves.dis7.utilities; @@ -26,206 +26,277 @@ import java.util.logging.Logger; */ public class DisThreadedNetworkInterface { - /** Default value */ - public static String DEFAULT_MULTICAST_ADDRESS = "225.4.5.6"; + /** Default value */ + public static String DEFAULT_MULTICAST_ADDRESS = "225.4.5.6"; - /** Default value */ - public static int DEFAULT_DIS_PORT = 3000; - - private static final String TRACE_PREFIX = "[" + DisThreadedNetworkInterface.class.getName() + "] "; - private boolean verbose = true; - private boolean verboseIncludesTimestamp = false; - - /** Pdu listener interface */ - public interface PduListener - { - /** Callback method - * @param pdu received pdu*/ - void incomingPdu(Pdu pdu); - } + /** Default value */ + public static int DEFAULT_DIS_PORT = 3000; - /** Raw pdu listener class and interface */ - public interface RawPduListener - { - /** Callback method - * @param bAndL exposed buffer to receive incoming pdu*/ - void incomingPdu(ByteArrayBufferAndLength bAndL); - } - - /** - * Stores data for further processing - */ - public class ByteArrayBufferAndLength - { - /** Active ByteArray buffer */ - public byte[] bufferByteArray; - /** Active ByteArray buffer length */ - public int length; + private static final String TRACE_PREFIX = "[" + DisThreadedNetworkInterface.class.getName() + "] "; + private boolean verbose = true; + private boolean verboseIncludesTimestamp = false; /** - * Default constructor for data storage - * @param bufferByteArray the data buffer to store - * @param length the length of the data buffer + * Pdu listener interface */ - public ByteArrayBufferAndLength(byte[] bufferByteArray, int length) + public interface PduListener { - this.bufferByteArray = bufferByteArray; - this.length = length; + /** + * Callback method + * @param pdu received pdu + */ + void incomingPdu(Pdu pdu); } - } - - /************ Begin class ***************/ - - /** MTU 8192: TODO this has actually been superseded by a larger buffer size, but good enough for now */ - public static final int MAX_DIS_PDU_SIZE = 8192; - - /** MTU 1500: size of an Ethernet frame, common value to avoid packet segmentation */ - public static final int MAX_TRANSMISSION_UNIT_SIZE = 1500; - - private int disPort; - private String multicastAddress; - private boolean killed = false; - - private InetAddress inetAddress; - private InetSocketAddress inetSocket; - private NetworkInterface networkInterface; - private DatagramSocket socket = null; - - /** - * Default constructor using default port 3000 and multicast address 225.4.5.6 - */ - public DisThreadedNetworkInterface() - { - this(DEFAULT_MULTICAST_ADDRESS, DEFAULT_DIS_PORT); - } - /** - * Constructor - * @param multicastGroup the multicast group address to utilize - * @param port the multicast port to utilize - */ - public DisThreadedNetworkInterface(String multicastGroup, int port) - { - disPort = port; - multicastAddress = multicastGroup; - try { - inetAddress = InetAddress.getByName(multicastAddress); - } catch (UnknownHostException ex) { - Logger.getLogger(DisThreadedNetworkInterface.class.getName()).log(Level.SEVERE, null, ex); - } - inetSocket = new InetSocketAddress(inetAddress, disPort); - networkInterface = findIpv4Interface(); - init(); - } + /** + * Raw pdu listener class and interface + */ + public interface RawPduListener + { + /** + * Callback method + * @param bAndL exposed buffer to receive incoming pdu + */ + void incomingPdu(ByteArrayBufferAndLength bAndL); + } - /* *********** queues and lists and public methods ************** */ - private final List<PduListener> everyTypeListeners = new ArrayList<>(); - private final Map<DISPDUType, List<PduListener>> typeListeners = new HashMap<>(); - private final List<RawPduListener> rawListeners = new ArrayList<>(); - private final LinkedBlockingQueue<Pdu> pdus2send = new LinkedBlockingQueue<>(); - - /** - * Add a listener to accept only pdus of a given type - * @param lis listener instance implementing the RawPduListener interface - * @param typ Pdu type - */ - public void addListener(PduListener lis, DISPDUType typ) - { - if (typ == null) - addListener(lis); - else { - List<PduListener> arLis = typeListeners.get(typ); - if (arLis == null) { - arLis = new ArrayList<>(); - typeListeners.put(typ, arLis); - } - arLis.add(lis); + /** + * Stores data for further processing + */ + public class ByteArrayBufferAndLength + { + /** + * Active ByteArray buffer + */ + public byte[] bufferByteArray; + /** + * Active ByteArray buffer length + */ + public int length; + + /** + * Default constructor for data storage + * + * @param bufferByteArray the data buffer to store + * @param length the length of the data buffer + */ + public ByteArrayBufferAndLength(byte[] bufferByteArray, int length) + { + this.bufferByteArray = bufferByteArray; + this.length = length; + } } - } - - /** - * Add a listener to accept all pdu types - * @param lis listener instance implementing the RawPduListener interface - */ - public void addListener(PduListener lis) - { - everyTypeListeners.add(lis); - } - /** - * Remove previously added listener - * @param lis listener instance implementing the RawPduListener interface - */ - public void removeListener(PduListener lis) - { - everyTypeListeners.remove(lis); + /** ********** Begin class ************** */ + /** + * MTU 8192: TODO this has actually been superseded by a larger buffer size, + * but good enough for now + */ + public static final int MAX_DIS_PDU_SIZE = 8192; - typeListeners.entrySet().forEach(entry -> { - List<PduListener> arLis = entry.getValue(); - if (arLis.contains(lis)) - arLis.remove(lis); - }); - } - - /** - * Add a listener to accept pdus of all types in the form of a byte array - * @param lis listener instance implementing the RawPduListener interface - */ - public void addRawListener(RawPduListener lis) - { - rawListeners.add(lis); - } - - /** - * Remove previously added raw listener - * @param lis listener instance implementing the RawPduListener interface - */ - public void removeRawListener(RawPduListener lis) - { - rawListeners.remove(lis); - } + /** + * MTU 1500: size of an Ethernet frame, common value to avoid packet + * segmentation + */ + public static final int MAX_TRANSMISSION_UNIT_SIZE = 1500; - /** Get current port value - * @return current port value */ - public int getDisPort() - { - return disPort; - } + private int disPort; + private String multicastAddress; + private boolean killed = false; - /** Get current multicast address value - * @return current multicast address value */ - public String getMcastGroup() - { - return multicastAddress; - } + private InetAddress inetAddress; + private InetSocketAddress inetSocket; + private NetworkInterface networkInterface; + private DatagramSocket datagramSocket = null; - /** - * Send the given pdu to the network using the ip and port given to the constructor - * @param pdu the pdu to send - */ - public void send(Pdu pdu) - { - pdus2send.add(pdu); - } + /** + * Default constructor using default port and multicast address + */ + public DisThreadedNetworkInterface() + { + this(DEFAULT_MULTICAST_ADDRESS, DEFAULT_DIS_PORT); + } - /* *************** networking i/o ************* */ - private PduFactory pduFactory = new PduFactory(); + /** + * Constructor + * @param multicastGroup the multicast group address to utilize + * @param port the multicast port to utilize + */ + public DisThreadedNetworkInterface(String multicastGroup, int port) + { + disPort = port; + multicastAddress = multicastGroup; + try + { + inetAddress = InetAddress.getByName(multicastAddress); + } + catch (UnknownHostException ex) + { + Logger.getLogger(DisThreadedNetworkInterface.class.getName()).log(Level.SEVERE, null, ex); + } + inetSocket = new InetSocketAddress(inetAddress, disPort); + networkInterface = findIpv4Interface(); + init(); + } - private Thread sender; - private Thread receiver; + /* *********** queues and lists and public methods ************** */ + private final List<PduListener> everyTypeListeners = new ArrayList<>(); + private final Map<DISPDUType, List<PduListener>> typeListeners = new HashMap<>(); + private final List<RawPduListener> rawListeners = new ArrayList<>(); + private final LinkedBlockingQueue<Pdu> pdus2send = new LinkedBlockingQueue<>(); - private void init() - { - receiver = new Thread(receiveThread, "DisThreadedNetIF receive thread"); - receiver.setDaemon(true); - receiver.setPriority(Thread.NORM_PRIORITY); - receiver.start(); - - sender = new Thread(sendThread, "DisThreadedNetIF send thread"); - sender.setDaemon(true); - sender.setPriority(Thread.NORM_PRIORITY); - sender.start(); - } + /** + * Add a listener to accept only pdus of a given type + * @param newListener listener instance implementing the RawPduListener interface + * @param disPduType Pdu type + */ + public void addListener(PduListener newListener, DISPDUType disPduType) + { + if (disPduType == null) + { + addListener(newListener); + } + else + { + List<PduListener> arLis = typeListeners.get(disPduType); + if (arLis == null) + { + arLis = new ArrayList<>(); + typeListeners.put(disPduType, arLis); + } + arLis.add(newListener); + } + } + + /** + * Add a listener to accept all pdu types + * @param newListener listener instance implementing the RawPduListener interface + */ + public void addListener(PduListener newListener) + { + everyTypeListeners.add(newListener); + } + + /** + * Remove previously added listener + * + * @param priorListener listener instance implementing the RawPduListener interface + */ + public void removeListener(PduListener priorListener) + { + everyTypeListeners.remove(priorListener); + + typeListeners.entrySet().forEach(entry -> + { + List<PduListener> arLis = entry.getValue(); + if (arLis.contains(priorListener)) + { + arLis.remove(priorListener); + } + }); + // additional sleep, hopefully allowing teardown to proceed to completion + sleep(100l); // TODO needed? + } + + /** + * Add a listener to accept pdus of all types in the form of a byte array + * + * @param lis listener instance implementing the RawPduListener interface + */ + public void addRawListener(RawPduListener lis) + { + rawListeners.add(lis); + } + + /** + * Remove previously added raw listener + * + * @param lis listener instance implementing the RawPduListener interface + */ + public void removeRawListener(RawPduListener lis) + { + rawListeners.remove(lis); + // additional sleep, hopefully allowing teardown to proceed to completion + sleep(100l); // TODO needed? + } + + /** + * Get current port value + * @return current port value + */ + public int getDisPort() + { + return disPort; + } + + /** + * Get current multicast address value + * @return current multicast address value + */ + public String getMulticastGroup() + { + return multicastAddress; + } + + /** + * Send the given pdu to the network using the IP address and port given to the constructor + * @param pdu the pdu to send + */ + public void send(Pdu pdu) + { + pdus2send.add(pdu); + } + + /* *************** networking i/o ************* */ + private PduFactory pduFactory = new PduFactory(); + + private Thread sender; + private Thread receiver; + + /** Initialization */ + private void init() + { + createDatagramSocket(); // common asset, synchronized to prevent interleaved reentry + + receiver = new Thread(receiveThread, "DisThreadedNetworkInterface receive thread"); + receiver.setDaemon(true); + receiver.setPriority(Thread.NORM_PRIORITY); + receiver.start(); + + sender = new Thread(sendThread, "DisThreadedNetworkInterface send thread"); + sender.setDaemon(true); + sender.setPriority(Thread.NORM_PRIORITY); + sender.start(); + } + + /** + * Create datagram socket if not already available; can also be invoked by + * either sender or receiver thread to ensure datagram socket is open. + * Synchronized method to prevent interleaved reentry. + * @see <a href="https://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html">Java Tutorials: Synchronized Methods</a> + */ + private synchronized void createDatagramSocket() + { + boolean closedSocket = false; + if ((datagramSocket != null) && datagramSocket.isClosed()) + { + closedSocket = true; + System.err.println(" *** " + TRACE_PREFIX + "datagramSocket.isClosed() unexpectedly, retrying..."); + } + if ((datagramSocket == null) || closedSocket) + { + try + { + // The initial value of the SO_BROADCAST socket option is FALSE + datagramSocket = new MulticastSocket(getDisPort()); + ((MulticastSocket) datagramSocket).joinGroup(inetSocket, networkInterface); + } + catch (IOException ex) + { + System.err.println(" *** " + TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receive thread: " + ex.getLocalizedMessage()); + } + } + } private Runnable receiveThread = () -> { @@ -243,13 +314,11 @@ public class DisThreadedNetworkInterface // If something trips up with the socket, this thread will attempt to // re-establish for both send/receive threads try { - // The initial value of the SO_BROADCAST socket option is FALSE - socket = new MulticastSocket(getDisPort()); - ((MulticastSocket)socket).joinGroup(inetSocket, networkInterface); + createDatagramSocket (); // ensure socket open, recreate if needed, other thread may occur first - while (!killed) { - - socket.receive(packet); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port + while (!killed) + { + datagramSocket.receive(packet); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port toRawListeners(packet.getData(), packet.getLength()); pdu = pduFactory.createPdu(byteBuffer); @@ -272,18 +341,20 @@ public class DisThreadedNetworkInterface } } catch (IOException ex) { - System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetIF receive thread: " + ex.getLocalizedMessage()); - System.err.println(TRACE_PREFIX + "Retrying new socket in 1 second"); + System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receive thread: " + ex.getLocalizedMessage()); + System.err.println(TRACE_PREFIX + "Retrying new socket..."); } - finally { - if (socket != null && !socket.isClosed()) { + finally + { + if (datagramSocket != null && !datagramSocket.isClosed()) { try { - ((MulticastSocket)socket).leaveGroup(inetSocket, networkInterface); + ((MulticastSocket)datagramSocket).leaveGroup(inetSocket, networkInterface); } catch (IOException ex) { Logger.getLogger(DisThreadedNetworkInterface.class.getName()).log(Level.SEVERE, null, ex); } - socket.close(); - socket = null; + datagramSocket.close(); + sleep(100l); // TODO needed? + datagramSocket = null; } } // if (!killed) @@ -301,13 +372,18 @@ public class DisThreadedNetworkInterface DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(), inetSocket); while (!killed) { // keep trying on error + + // If something trips up with the socket, this thread will attempt to + // re-establish for both send/receive threads try { + createDatagramSocket (); // ensure socket open, recreate if needed, other thread may occur first + while (!killed) { pdu = pdus2send.take(); pdu.marshal(dos); packet.setData(baos.toByteArray()); - socket.send(packet); + datagramSocket.send(packet); dos.flush(); // immediately force pdu write baos.reset(); @@ -315,7 +391,7 @@ public class DisThreadedNetworkInterface } catch (Exception ex) { - System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetIF send thread: " + ex.getLocalizedMessage()); + System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface send thread: " + ex.getLocalizedMessage()); } } try { @@ -354,16 +430,22 @@ public class DisThreadedNetworkInterface /** Terminate the instance */ public void kill() { - killed = true; + killed = true; // set loop sentinel for threads } - /** Thread sleep for indicated interval */ - private void sleep(long ms) + /** Thread sleep for indicated interval + * @param duration milliseconds */ + private void sleep(long duration) { try { - Thread.sleep(ms); + Thread.sleep(duration); + } + catch (InterruptedException ie) + { + System.err.flush(); + System.err.println ("*** DisThreadedNetworkInterface.sleep(" + duration + ") failed to sleep"); + ie.printStackTrace(); } - catch (InterruptedException ex) {} } /**