Skip to content
Snippets Groups Projects
Commit 11182643 authored by Brutzman, Don's avatar Brutzman, Don
Browse files

fix threading/race condition causing intermittent socket errors by refactoring...

fix threading/race condition causing intermittent socket errors by refactoring socket creation from send/receive threads, synchronized to avoid interleaved reentry
parent 400a1427
No related branches found
No related tags found
No related merge requests found
/** /**
* Copyright (c) 2008-2020, MOVES Institute, Naval Postgraduate School (NPS). All rights reserved. * Copyright (c) 2008-2021, MOVES Institute, Naval Postgraduate School (NPS). All rights reserved.
* This work is provided under a BSD open-source license, see project license.html and license.txt * This work is provided under a BSD open-source license, see project license.html and license.txt
*/ */
package edu.nps.moves.dis7.utilities; package edu.nps.moves.dis7.utilities;
...@@ -26,206 +26,277 @@ import java.util.logging.Logger; ...@@ -26,206 +26,277 @@ import java.util.logging.Logger;
*/ */
public class DisThreadedNetworkInterface public class DisThreadedNetworkInterface
{ {
/** Default value */ /** Default value */
public static String DEFAULT_MULTICAST_ADDRESS = "225.4.5.6"; public static String DEFAULT_MULTICAST_ADDRESS = "225.4.5.6";
/** Default value */ /** Default value */
public static int DEFAULT_DIS_PORT = 3000; public static int DEFAULT_DIS_PORT = 3000;
private static final String TRACE_PREFIX = "[" + DisThreadedNetworkInterface.class.getName() + "] ";
private boolean verbose = true;
private boolean verboseIncludesTimestamp = false;
/** Pdu listener interface */
public interface PduListener
{
/** Callback method
* @param pdu received pdu*/
void incomingPdu(Pdu pdu);
}
/** Raw pdu listener class and interface */ private static final String TRACE_PREFIX = "[" + DisThreadedNetworkInterface.class.getName() + "] ";
public interface RawPduListener private boolean verbose = true;
{ private boolean verboseIncludesTimestamp = false;
/** Callback method
* @param bAndL exposed buffer to receive incoming pdu*/
void incomingPdu(ByteArrayBufferAndLength bAndL);
}
/**
* Stores data for further processing
*/
public class ByteArrayBufferAndLength
{
/** Active ByteArray buffer */
public byte[] bufferByteArray;
/** Active ByteArray buffer length */
public int length;
/** /**
* Default constructor for data storage * Pdu listener interface
* @param bufferByteArray the data buffer to store
* @param length the length of the data buffer
*/ */
public ByteArrayBufferAndLength(byte[] bufferByteArray, int length) public interface PduListener
{ {
this.bufferByteArray = bufferByteArray; /**
this.length = length; * Callback method
* @param pdu received pdu
*/
void incomingPdu(Pdu pdu);
} }
}
/************ Begin class ***************/
/** MTU 8192: TODO this has actually been superseded by a larger buffer size, but good enough for now */
public static final int MAX_DIS_PDU_SIZE = 8192;
/** MTU 1500: size of an Ethernet frame, common value to avoid packet segmentation */
public static final int MAX_TRANSMISSION_UNIT_SIZE = 1500;
private int disPort;
private String multicastAddress;
private boolean killed = false;
private InetAddress inetAddress;
private InetSocketAddress inetSocket;
private NetworkInterface networkInterface;
private DatagramSocket socket = null;
/**
* Default constructor using default port 3000 and multicast address 225.4.5.6
*/
public DisThreadedNetworkInterface()
{
this(DEFAULT_MULTICAST_ADDRESS, DEFAULT_DIS_PORT);
}
/** /**
* Constructor * Raw pdu listener class and interface
* @param multicastGroup the multicast group address to utilize */
* @param port the multicast port to utilize public interface RawPduListener
*/ {
public DisThreadedNetworkInterface(String multicastGroup, int port) /**
{ * Callback method
disPort = port; * @param bAndL exposed buffer to receive incoming pdu
multicastAddress = multicastGroup; */
try { void incomingPdu(ByteArrayBufferAndLength bAndL);
inetAddress = InetAddress.getByName(multicastAddress); }
} catch (UnknownHostException ex) {
Logger.getLogger(DisThreadedNetworkInterface.class.getName()).log(Level.SEVERE, null, ex);
}
inetSocket = new InetSocketAddress(inetAddress, disPort);
networkInterface = findIpv4Interface();
init();
}
/* *********** queues and lists and public methods ************** */ /**
private final List<PduListener> everyTypeListeners = new ArrayList<>(); * Stores data for further processing
private final Map<DISPDUType, List<PduListener>> typeListeners = new HashMap<>(); */
private final List<RawPduListener> rawListeners = new ArrayList<>(); public class ByteArrayBufferAndLength
private final LinkedBlockingQueue<Pdu> pdus2send = new LinkedBlockingQueue<>(); {
/**
/** * Active ByteArray buffer
* Add a listener to accept only pdus of a given type */
* @param lis listener instance implementing the RawPduListener interface public byte[] bufferByteArray;
* @param typ Pdu type /**
*/ * Active ByteArray buffer length
public void addListener(PduListener lis, DISPDUType typ) */
{ public int length;
if (typ == null)
addListener(lis); /**
else { * Default constructor for data storage
List<PduListener> arLis = typeListeners.get(typ); *
if (arLis == null) { * @param bufferByteArray the data buffer to store
arLis = new ArrayList<>(); * @param length the length of the data buffer
typeListeners.put(typ, arLis); */
} public ByteArrayBufferAndLength(byte[] bufferByteArray, int length)
arLis.add(lis); {
this.bufferByteArray = bufferByteArray;
this.length = length;
}
} }
}
/**
* Add a listener to accept all pdu types
* @param lis listener instance implementing the RawPduListener interface
*/
public void addListener(PduListener lis)
{
everyTypeListeners.add(lis);
}
/** /** ********** Begin class ************** */
* Remove previously added listener /**
* @param lis listener instance implementing the RawPduListener interface * MTU 8192: TODO this has actually been superseded by a larger buffer size,
*/ * but good enough for now
public void removeListener(PduListener lis) */
{ public static final int MAX_DIS_PDU_SIZE = 8192;
everyTypeListeners.remove(lis);
typeListeners.entrySet().forEach(entry -> { /**
List<PduListener> arLis = entry.getValue(); * MTU 1500: size of an Ethernet frame, common value to avoid packet
if (arLis.contains(lis)) * segmentation
arLis.remove(lis); */
}); public static final int MAX_TRANSMISSION_UNIT_SIZE = 1500;
}
/**
* Add a listener to accept pdus of all types in the form of a byte array
* @param lis listener instance implementing the RawPduListener interface
*/
public void addRawListener(RawPduListener lis)
{
rawListeners.add(lis);
}
/**
* Remove previously added raw listener
* @param lis listener instance implementing the RawPduListener interface
*/
public void removeRawListener(RawPduListener lis)
{
rawListeners.remove(lis);
}
/** Get current port value private int disPort;
* @return current port value */ private String multicastAddress;
public int getDisPort() private boolean killed = false;
{
return disPort;
}
/** Get current multicast address value private InetAddress inetAddress;
* @return current multicast address value */ private InetSocketAddress inetSocket;
public String getMcastGroup() private NetworkInterface networkInterface;
{ private DatagramSocket datagramSocket = null;
return multicastAddress;
}
/** /**
* Send the given pdu to the network using the ip and port given to the constructor * Default constructor using default port and multicast address
* @param pdu the pdu to send */
*/ public DisThreadedNetworkInterface()
public void send(Pdu pdu) {
{ this(DEFAULT_MULTICAST_ADDRESS, DEFAULT_DIS_PORT);
pdus2send.add(pdu); }
}
/* *************** networking i/o ************* */ /**
private PduFactory pduFactory = new PduFactory(); * Constructor
* @param multicastGroup the multicast group address to utilize
* @param port the multicast port to utilize
*/
public DisThreadedNetworkInterface(String multicastGroup, int port)
{
disPort = port;
multicastAddress = multicastGroup;
try
{
inetAddress = InetAddress.getByName(multicastAddress);
}
catch (UnknownHostException ex)
{
Logger.getLogger(DisThreadedNetworkInterface.class.getName()).log(Level.SEVERE, null, ex);
}
inetSocket = new InetSocketAddress(inetAddress, disPort);
networkInterface = findIpv4Interface();
init();
}
private Thread sender; /* *********** queues and lists and public methods ************** */
private Thread receiver; private final List<PduListener> everyTypeListeners = new ArrayList<>();
private final Map<DISPDUType, List<PduListener>> typeListeners = new HashMap<>();
private final List<RawPduListener> rawListeners = new ArrayList<>();
private final LinkedBlockingQueue<Pdu> pdus2send = new LinkedBlockingQueue<>();
private void init() /**
{ * Add a listener to accept only pdus of a given type
receiver = new Thread(receiveThread, "DisThreadedNetIF receive thread"); * @param newListener listener instance implementing the RawPduListener interface
receiver.setDaemon(true); * @param disPduType Pdu type
receiver.setPriority(Thread.NORM_PRIORITY); */
receiver.start(); public void addListener(PduListener newListener, DISPDUType disPduType)
{
sender = new Thread(sendThread, "DisThreadedNetIF send thread"); if (disPduType == null)
sender.setDaemon(true); {
sender.setPriority(Thread.NORM_PRIORITY); addListener(newListener);
sender.start(); }
} else
{
List<PduListener> arLis = typeListeners.get(disPduType);
if (arLis == null)
{
arLis = new ArrayList<>();
typeListeners.put(disPduType, arLis);
}
arLis.add(newListener);
}
}
/**
* Add a listener to accept all pdu types
* @param newListener listener instance implementing the RawPduListener interface
*/
public void addListener(PduListener newListener)
{
everyTypeListeners.add(newListener);
}
/**
* Remove previously added listener
*
* @param priorListener listener instance implementing the RawPduListener interface
*/
public void removeListener(PduListener priorListener)
{
everyTypeListeners.remove(priorListener);
typeListeners.entrySet().forEach(entry ->
{
List<PduListener> arLis = entry.getValue();
if (arLis.contains(priorListener))
{
arLis.remove(priorListener);
}
});
// additional sleep, hopefully allowing teardown to proceed to completion
sleep(100l); // TODO needed?
}
/**
* Add a listener to accept pdus of all types in the form of a byte array
*
* @param lis listener instance implementing the RawPduListener interface
*/
public void addRawListener(RawPduListener lis)
{
rawListeners.add(lis);
}
/**
* Remove previously added raw listener
*
* @param lis listener instance implementing the RawPduListener interface
*/
public void removeRawListener(RawPduListener lis)
{
rawListeners.remove(lis);
// additional sleep, hopefully allowing teardown to proceed to completion
sleep(100l); // TODO needed?
}
/**
* Get current port value
* @return current port value
*/
public int getDisPort()
{
return disPort;
}
/**
* Get current multicast address value
* @return current multicast address value
*/
public String getMulticastGroup()
{
return multicastAddress;
}
/**
* Send the given pdu to the network using the IP address and port given to the constructor
* @param pdu the pdu to send
*/
public void send(Pdu pdu)
{
pdus2send.add(pdu);
}
/* *************** networking i/o ************* */
private PduFactory pduFactory = new PduFactory();
private Thread sender;
private Thread receiver;
/** Initialization */
private void init()
{
createDatagramSocket(); // common asset, synchronized to prevent interleaved reentry
receiver = new Thread(receiveThread, "DisThreadedNetworkInterface receive thread");
receiver.setDaemon(true);
receiver.setPriority(Thread.NORM_PRIORITY);
receiver.start();
sender = new Thread(sendThread, "DisThreadedNetworkInterface send thread");
sender.setDaemon(true);
sender.setPriority(Thread.NORM_PRIORITY);
sender.start();
}
/**
* Create datagram socket if not already available; can also be invoked by
* either sender or receiver thread to ensure datagram socket is open.
* Synchronized method to prevent interleaved reentry.
* @see <a href="https://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html">Java Tutorials: Synchronized Methods</a>
*/
private synchronized void createDatagramSocket()
{
boolean closedSocket = false;
if ((datagramSocket != null) && datagramSocket.isClosed())
{
closedSocket = true;
System.err.println(" *** " + TRACE_PREFIX + "datagramSocket.isClosed() unexpectedly, retrying...");
}
if ((datagramSocket == null) || closedSocket)
{
try
{
// The initial value of the SO_BROADCAST socket option is FALSE
datagramSocket = new MulticastSocket(getDisPort());
((MulticastSocket) datagramSocket).joinGroup(inetSocket, networkInterface);
}
catch (IOException ex)
{
System.err.println(" *** " + TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receive thread: " + ex.getLocalizedMessage());
}
}
}
private Runnable receiveThread = () -> { private Runnable receiveThread = () -> {
...@@ -243,13 +314,11 @@ public class DisThreadedNetworkInterface ...@@ -243,13 +314,11 @@ public class DisThreadedNetworkInterface
// If something trips up with the socket, this thread will attempt to // If something trips up with the socket, this thread will attempt to
// re-establish for both send/receive threads // re-establish for both send/receive threads
try { try {
// The initial value of the SO_BROADCAST socket option is FALSE createDatagramSocket (); // ensure socket open, recreate if needed, other thread may occur first
socket = new MulticastSocket(getDisPort());
((MulticastSocket)socket).joinGroup(inetSocket, networkInterface);
while (!killed) { while (!killed)
{
socket.receive(packet); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port datagramSocket.receive(packet); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port
toRawListeners(packet.getData(), packet.getLength()); toRawListeners(packet.getData(), packet.getLength());
pdu = pduFactory.createPdu(byteBuffer); pdu = pduFactory.createPdu(byteBuffer);
...@@ -272,18 +341,20 @@ public class DisThreadedNetworkInterface ...@@ -272,18 +341,20 @@ public class DisThreadedNetworkInterface
} }
} }
catch (IOException ex) { catch (IOException ex) {
System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetIF receive thread: " + ex.getLocalizedMessage()); System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receive thread: " + ex.getLocalizedMessage());
System.err.println(TRACE_PREFIX + "Retrying new socket in 1 second"); System.err.println(TRACE_PREFIX + "Retrying new socket...");
} }
finally { finally
if (socket != null && !socket.isClosed()) { {
if (datagramSocket != null && !datagramSocket.isClosed()) {
try { try {
((MulticastSocket)socket).leaveGroup(inetSocket, networkInterface); ((MulticastSocket)datagramSocket).leaveGroup(inetSocket, networkInterface);
} catch (IOException ex) { } catch (IOException ex) {
Logger.getLogger(DisThreadedNetworkInterface.class.getName()).log(Level.SEVERE, null, ex); Logger.getLogger(DisThreadedNetworkInterface.class.getName()).log(Level.SEVERE, null, ex);
} }
socket.close(); datagramSocket.close();
socket = null; sleep(100l); // TODO needed?
datagramSocket = null;
} }
} }
// if (!killed) // if (!killed)
...@@ -301,13 +372,18 @@ public class DisThreadedNetworkInterface ...@@ -301,13 +372,18 @@ public class DisThreadedNetworkInterface
DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(), inetSocket); DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(), inetSocket);
while (!killed) { // keep trying on error while (!killed) { // keep trying on error
// If something trips up with the socket, this thread will attempt to
// re-establish for both send/receive threads
try { try {
createDatagramSocket (); // ensure socket open, recreate if needed, other thread may occur first
while (!killed) { while (!killed) {
pdu = pdus2send.take(); pdu = pdus2send.take();
pdu.marshal(dos); pdu.marshal(dos);
packet.setData(baos.toByteArray()); packet.setData(baos.toByteArray());
socket.send(packet); datagramSocket.send(packet);
dos.flush(); // immediately force pdu write dos.flush(); // immediately force pdu write
baos.reset(); baos.reset();
...@@ -315,7 +391,7 @@ public class DisThreadedNetworkInterface ...@@ -315,7 +391,7 @@ public class DisThreadedNetworkInterface
} }
catch (Exception ex) catch (Exception ex)
{ {
System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetIF send thread: " + ex.getLocalizedMessage()); System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface send thread: " + ex.getLocalizedMessage());
} }
} }
try { try {
...@@ -354,16 +430,22 @@ public class DisThreadedNetworkInterface ...@@ -354,16 +430,22 @@ public class DisThreadedNetworkInterface
/** Terminate the instance */ /** Terminate the instance */
public void kill() public void kill()
{ {
killed = true; killed = true; // set loop sentinel for threads
} }
/** Thread sleep for indicated interval */ /** Thread sleep for indicated interval
private void sleep(long ms) * @param duration milliseconds */
private void sleep(long duration)
{ {
try { try {
Thread.sleep(ms); Thread.sleep(duration);
}
catch (InterruptedException ie)
{
System.err.flush();
System.err.println ("*** DisThreadedNetworkInterface.sleep(" + duration + ") failed to sleep");
ie.printStackTrace();
} }
catch (InterruptedException ex) {}
} }
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment