Skip to content
Snippets Groups Projects
Commit 66c06ea6 authored by Brutzman, Don's avatar Brutzman, Don
Browse files

Numerous threading complications unraveled, more stable operation, watch out...

Numerous threading complications unraveled, more stable operation, watch out for senderThread closing
parent fcd2c577
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,7 @@ package edu.nps.moves.dis7.utilities; ...@@ -6,6 +6,7 @@ package edu.nps.moves.dis7.utilities;
import edu.nps.moves.dis7.enumerations.DisPduType; import edu.nps.moves.dis7.enumerations.DisPduType;
import edu.nps.moves.dis7.pdus.DisTime; import edu.nps.moves.dis7.pdus.DisTime;
import edu.nps.moves.dis7.pdus.EntityStatePdu;
import edu.nps.moves.dis7.pdus.Pdu; import edu.nps.moves.dis7.pdus.Pdu;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
...@@ -107,8 +108,7 @@ public class DisThreadedNetworkInterface ...@@ -107,8 +108,7 @@ public class DisThreadedNetworkInterface
// independently available parameters for each DisThreadedNetworkInterface object // independently available parameters for each DisThreadedNetworkInterface object
private String disAddress; private String disAddress;
private int disPort; private int disPort;
private boolean killed = false; private boolean killed = false; // thread loop sentinel
private int killCounter = 0;
private InetAddress inetAddress; private InetAddress inetAddress;
private InetSocketAddress inetSocket; private InetSocketAddress inetSocket;
...@@ -116,13 +116,20 @@ public class DisThreadedNetworkInterface ...@@ -116,13 +116,20 @@ public class DisThreadedNetworkInterface
private DatagramSocket datagramSocket; private DatagramSocket datagramSocket;
/** /**
* Object constructor using default multicast address and port * Object constructor using default multicast address and port, no descriptor
*/ */
public DisThreadedNetworkInterface() public DisThreadedNetworkInterface()
{ {
this(DEFAULT_DIS_ADDRESS, DEFAULT_DIS_PORT); this(DEFAULT_DIS_ADDRESS, DEFAULT_DIS_PORT, "");
}
/**
* Object constructor with descriptor, using default multicast address and port
* @param newDescriptor simple descriptor name for this interface
*/
public DisThreadedNetworkInterface(String newDescriptor)
{
this(DEFAULT_DIS_ADDRESS, DEFAULT_DIS_PORT, newDescriptor);
} }
/** /**
* Object constructor using specified multicast address and port * Object constructor using specified multicast address and port
* @param address the multicast group or unicast address to utilize * @param address the multicast group or unicast address to utilize
...@@ -130,6 +137,22 @@ public class DisThreadedNetworkInterface ...@@ -130,6 +137,22 @@ public class DisThreadedNetworkInterface
*/ */
public DisThreadedNetworkInterface(String address, int port) public DisThreadedNetworkInterface(String address, int port)
{ {
this(address, port, "");
}
/**
* Object constructor using specified multicast address and port, plus descriptor.
* @param address the multicast group or unicast address to utilize
* @param port the multicast port to utilize
* @param newDescriptor simple descriptor name for this interface
*/
public DisThreadedNetworkInterface(String address, int port, String newDescriptor)
{
if (newDescriptor == null)
descriptor = "";
else descriptor = newDescriptor;
TRACE_PREFIX = "[" + (DisThreadedNetworkInterface.class.getSimpleName() + " " + descriptor).trim() + "] ";
disAddress = address; disAddress = address;
disPort = port; disPort = port;
try try
...@@ -142,27 +165,41 @@ public class DisThreadedNetworkInterface ...@@ -142,27 +165,41 @@ public class DisThreadedNetworkInterface
} }
inetSocket = new InetSocketAddress(inetAddress, disPort); // tests that accessor methods are working as set inetSocket = new InetSocketAddress(inetAddress, disPort); // tests that accessor methods are working as set
networkInterface = findIpv4Interface(); networkInterface = findIpv4Interface();
initializeSocketSenderReceiverThreads();
start();
} }
/* *********** queues and lists and public methods ************** */ /* *********** queues and lists and public methods ************** */
private final List<PduListener> everyTypeListeners = new ArrayList<>(); private final List<PduListener> everyTypeListeners = new ArrayList<>();
private final Map<DisPduType, List<PduListener>> typeListeners = new HashMap<>(); private final Map<DisPduType, List<PduListener>> typeListeners = new HashMap<>();
private final List<RawPduListener> rawListeners = new ArrayList<>(); private final List<RawPduListener> rawListeners = new ArrayList<>();
private final LinkedBlockingQueue<Pdu> pdus2send = new LinkedBlockingQueue<>(); // https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/LinkedBlockingQueue.html
private final LinkedBlockingQueue<Pdu> pdus2sendLinkedBlockingQueue = new LinkedBlockingQueue<>(); // FIFO
/** /**
* Main method for testing. * Main method for testing.
* @see <a href="https://docs.oracle.com/javase/tutorial/getStarted/application/index.html">Java Tutorials: A Closer Look at the "Hello World!" Application</a> * @see <a href="https://docs.oracle.com/javase/tutorial/getStarted/application/index.html">Java Tutorials: A Closer Look at the "Hello World!" Application</a>
* @param args command-line arguments are an array of optional String parameters that are passed from execution environment during invocation * @param args [address, port, descriptor] command-line arguments are an array of optional String parameters that are passed from execution environment during invocation
*/ */
public static void main(String[] args) public static void main(String[] args)
{ {
System.out.println("*** DisThreadedNetworkInterface main() test started..."); System.out.println("*** DisThreadedNetworkInterface main() test started...");
DisThreadedNetworkInterface disThreadedNetworkInterface = new DisThreadedNetworkInterface(); DisThreadedNetworkInterface disThreadedNetworkInterface;
String selfTestDescriptor = "DisThreadedNetworkInterface main() self test";
if (args.length == 3)
selfTestDescriptor = args[2];
if ((args == null) || (args.length == 0))
disThreadedNetworkInterface = new DisThreadedNetworkInterface( /* default address, port */ selfTestDescriptor);
else disThreadedNetworkInterface = new DisThreadedNetworkInterface(args[0], Integer.parseInt(args[1]), selfTestDescriptor);
System.out.println("*** DisThreadedNetworkInterface main() self test created instance.");
disThreadedNetworkInterface.close(); disThreadedNetworkInterface.close();
System.out.println("*** DisThreadedNetworkInterface main() test complete."); System.out.flush();
System.err.flush();
System.out.println("*** DisThreadedNetworkInterface main() self test complete.");
} }
/** /**
...@@ -274,7 +311,7 @@ public class DisThreadedNetworkInterface ...@@ -274,7 +311,7 @@ public class DisThreadedNetworkInterface
*/ */
public void send(Pdu pdu) public void send(Pdu pdu)
{ {
pdus2send.add(pdu); pdus2sendLinkedBlockingQueue.add(pdu);
} }
/* *************** networking i/o ************* */ /* *************** networking i/o ************* */
...@@ -286,12 +323,11 @@ public class DisThreadedNetworkInterface ...@@ -286,12 +323,11 @@ public class DisThreadedNetworkInterface
/** Initialization of threads, or confirmation that they remain running */ /** Initialization of threads, or confirmation that they remain running */
private synchronized void initializeSocketSenderReceiverThreads() private synchronized void initializeSocketSenderReceiverThreads()
{ {
createDatagramSocket(); // common asset, synchronized to prevent interleaved reentry
if (receiverThread == null) if (receiverThread == null)
{ {
receiverThread = new Thread(receiverThreadRunnable, TRACE_PREFIX + "receiverThread"); receiverThread = new Thread(receiverThreadRunnable, TRACE_PREFIX + "receiverThread");
receiverThread.setDaemon(true); // https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java
receiverThread.setDaemon(false); // user thread, not system thread
receiverThread.setPriority(Thread.NORM_PRIORITY); receiverThread.setPriority(Thread.NORM_PRIORITY);
receiverThread.start(); receiverThread.start();
} }
...@@ -299,7 +335,8 @@ public class DisThreadedNetworkInterface ...@@ -299,7 +335,8 @@ public class DisThreadedNetworkInterface
if (senderThread == null) if (senderThread == null)
{ {
senderThread = new Thread(senderThreadRunnable, TRACE_PREFIX + "senderThread"); senderThread = new Thread(senderThreadRunnable, TRACE_PREFIX + "senderThread");
senderThread.setDaemon(true); // https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java
senderThread.setDaemon(false); // user thread, not system thread
senderThread.setPriority(Thread.NORM_PRIORITY); senderThread.setPriority(Thread.NORM_PRIORITY);
senderThread.start(); senderThread.start();
} }
...@@ -309,9 +346,11 @@ public class DisThreadedNetworkInterface ...@@ -309,9 +346,11 @@ public class DisThreadedNetworkInterface
* Create datagram socket if not already available; can also be invoked by * Create datagram socket if not already available; can also be invoked by
* either sender or receiver thread to ensure datagram socket is open. * either sender or receiver thread to ensure datagram socket is open.
*/ */
public void start () public void start() // TODO rename begin()
{ {
createDatagramSocket(); // usually automatic, might be needed if previous connection was closed createDatagramSocket(); // common asset, synchronized to prevent interleaved reentry
initializeSocketSenderReceiverThreads();
} }
/** /**
* Create datagram socket if not already available; can also be invoked by * Create datagram socket if not already available; can also be invoked by
...@@ -331,11 +370,6 @@ public class DisThreadedNetworkInterface ...@@ -331,11 +370,6 @@ public class DisThreadedNetworkInterface
datagramSocket = null; datagramSocket = null;
System.err.println(" *** " + TRACE_PREFIX + "datagramSocket was available but closed before createDatagramSocket(), resetting..."); System.err.println(" *** " + TRACE_PREFIX + "datagramSocket was available but closed before createDatagramSocket(), resetting...");
} }
else
{
// System.err.println(" *** " + TRACE_PREFIX + "datagramSocket was not closed before createDatagramSocket(), ignoring..."); // debug
return;
}
} }
if ((datagramSocket == null) || closedSocket) if ((datagramSocket == null) || closedSocket)
{ {
...@@ -343,6 +377,8 @@ public class DisThreadedNetworkInterface ...@@ -343,6 +377,8 @@ public class DisThreadedNetworkInterface
{ {
// The initial value of the SO_BROADCAST socket option is FALSE // The initial value of the SO_BROADCAST socket option is FALSE
datagramSocket = new MulticastSocket(getPort()); datagramSocket = new MulticastSocket(getPort());
datagramSocket.setSoTimeout(1000); // msec timeout on reading and then continue looping in order to avoid blocking
((MulticastSocket) datagramSocket).joinGroup(inetSocket, networkInterface); ((MulticastSocket) datagramSocket).joinGroup(inetSocket, networkInterface);
if (hasVerboseOutput()) if (hasVerboseOutput())
...@@ -354,7 +390,6 @@ public class DisThreadedNetworkInterface ...@@ -354,7 +390,6 @@ public class DisThreadedNetworkInterface
System.out.println(message); System.out.println(message);
System.out.flush(); System.out.flush();
} }
initializeSocketSenderReceiverThreads();
Thread.sleep (100L); // allow threads, streams to catch up Thread.sleep (100L); // allow threads, streams to catch up
} }
catch (InterruptedException ex) { catch (InterruptedException ex) {
...@@ -377,13 +412,10 @@ public class DisThreadedNetworkInterface ...@@ -377,13 +412,10 @@ public class DisThreadedNetworkInterface
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_TRANSMISSION_UNIT_SIZE); ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_TRANSMISSION_UNIT_SIZE);
DatagramPacket packet = new DatagramPacket(byteBuffer.array(), byteBuffer.capacity()); DatagramPacket packet = new DatagramPacket(byteBuffer.array(), byteBuffer.capacity());
while (!killed) // keep trying even if error occured while (!killed) // keep trying even if error occurred
{ {
// If something trips up with the socket, this thread will attempt to try
// re-establish socket for both send/receive threads {
try {
createDatagramSocket (); // ensure socket open, recreate if needed, other thread may occur first
while (!killed) // loop until terminated while (!killed) // loop until terminated
{ {
datagramSocket.receive(packet); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port datagramSocket.receive(packet); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port
...@@ -402,36 +434,29 @@ public class DisThreadedNetworkInterface ...@@ -402,36 +434,29 @@ public class DisThreadedNetworkInterface
String message = TRACE_PREFIX + "[receipt " + pad + pduReceiptCounter + "] " + nextPdu.getPduType().toString(); String message = TRACE_PREFIX + "[receipt " + pad + pduReceiptCounter + "] " + nextPdu.getPduType().toString();
if (hasVerboseOutputIncludesTimestamp()) if (hasVerboseOutputIncludesTimestamp())
message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp()); message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp());
if (nextPdu.getPduType() == DisPduType.ENTITY_STATE)
message += " " + String.format("%11s", ((EntityStatePdu)nextPdu).getMarkingString());
message += ", size " + nextPdu.getMarshalledSize() + " bytes)"; message += ", size " + nextPdu.getMarshalledSize() + " bytes)";
System.err.flush();
System.out.println(message); System.out.println(message);
System.out.flush(); System.out.flush();
System.err.flush();
} }
toListeners(nextPdu); toListeners(nextPdu);
} }
byteBuffer.clear(); byteBuffer.clear();
} }
Thread.sleep (100L); Thread.sleep (100L); // loop successfully killed
} }
catch (InterruptedException ex) { catch (InterruptedException ex) {
// continue // continue
} }
catch (IOException ex) { catch (IOException ex)
if (!killed)
{
System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receiverThread: " + ex.getLocalizedMessage());
System.err.println(TRACE_PREFIX + "Retrying new socket...");
}
}
finally
{ {
close(); // read commonly times out while waiting for PDUs to arrive, and so this trace statement is for debugging
} // System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receiverThread: " + ex.getLocalizedMessage());
}
} }
if (killed) // returning kills thread, do not put any other steps here
close(); // retry now that threads are killed
System.err.flush();
System.out.flush();
}; };
private final Runnable senderThreadRunnable = () -> { private final Runnable senderThreadRunnable = () -> {
...@@ -443,16 +468,13 @@ public class DisThreadedNetworkInterface ...@@ -443,16 +468,13 @@ public class DisThreadedNetworkInterface
DataOutputStream dos = new DataOutputStream(baos); DataOutputStream dos = new DataOutputStream(baos);
DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(), inetSocket); DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(), inetSocket);
while (!killed) // keep trying even if error occured while (!killed) // keep trying even if error occurred
{ {
// If something trips up with the socket, this thread will attempt to try
// re-establish socket for both send/receive threads {
try {
createDatagramSocket (); // ensure socket open, recreate if needed, other thread may occur first
while (!killed) // loop until terminated while (!killed) // loop until terminated
{ {
Pdu nextPdu = pdus2send.take(); Pdu nextPdu = pdus2sendLinkedBlockingQueue.take();
nextPdu.marshal(dos); nextPdu.marshal(dos);
packet.setData(baos.toByteArray()); packet.setData(baos.toByteArray());
...@@ -468,34 +490,27 @@ public class DisThreadedNetworkInterface ...@@ -468,34 +490,27 @@ public class DisThreadedNetworkInterface
if (hasVerboseOutputIncludesTimestamp()) if (hasVerboseOutputIncludesTimestamp())
message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp()); message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp());
message += ", size " + nextPdu.getMarshalledSize() + " bytes)"; message += ", size " + nextPdu.getMarshalledSize() + " bytes)";
System.err.flush();
System.out.println(message); System.out.println(message);
System.out.flush(); System.out.flush();
System.err.flush();
} }
dos.flush(); // immediately force pdu write dos.flush(); // immediately force pdu write
baos.reset(); // prepare for next send baos.reset(); // prepare for next send
} }
} if (killed) // operations are finished
catch (Exception ex)
{
if (!killed)
{ {
System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface senderThread: " + ex.getLocalizedMessage()); // check if looping complete, cleanup here to ensure thread exit is clean
baos.close();
dos.close();
System.out.println("*** DisThreadedNetworkInterface senderThreadRunnable killed=true, close ByteArrayOutputStream baos and DataOutputStream dos\n");
} }
} }
catch (Exception ex)
{
System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface senderThread: " + ex.getLocalizedMessage());
}
} }
try { // returning kills thread, do not put any other steps here
// operations are finished
pdus2send.clear();
dos.close();
this.close();
}
catch (IOException e) {} // shutting down, no need to report exception
if (killed)
close(); // retry now that threads are killed
System.err.flush();
System.out.flush();
}; };
private void toListeners(Pdu pdu) private void toListeners(Pdu pdu)
...@@ -518,81 +533,106 @@ public class DisThreadedNetworkInterface ...@@ -518,81 +533,106 @@ public class DisThreadedNetworkInterface
} }
} }
private void toRawListeners(byte[] data, int len) private void toRawListeners(byte[] data, int len)
{ {
if(rawListeners.isEmpty()) if(rawListeners.isEmpty())
return; return;
ByteArrayBufferAndLength bl = new ByteArrayBufferAndLength(data, len);
rawListeners.forEach(lis->lis.incomingPdu(bl));
}
/** Method renamed as <code>close()</code> so use that method instead.
*/
@Deprecated
public void kill()
{
setKillSentinel();
}
/** Finish pending send/receive activity and then close. */ ByteArrayBufferAndLength bl = new ByteArrayBufferAndLength(data, len);
public void setKillSentinel() rawListeners.forEach(lis->lis.incomingPdu(bl));
{ }
killed = true; // set loop sentinel for threads to finish
}
/** Terminate the instance after completion of pending send/receive activity. /** Method renamed as <code>close()</code> so use that method instead.
* Synchronized to prevent interleaved invocation. */ */
public synchronized void close() @Deprecated
{ public void kill()
if (!killed || ((killCounter > 0) && (killCounter < 2)))
{
killCounter++; // repeat for all threads
setKillSentinel();
return; // allow network operations to finish, then killed threads are expected to reenter here
}
try
{ {
if (senderThread != null) setKillSentinel();
senderThread.join(4000); // wait for thread to die, msec max duration
if (receiverThread != null)
receiverThread.join(4000); // wait for thread to die, msec max duration
senderThread = null; // make sure
receiverThread = null; // make sure
} }
catch (InterruptedException ie)
/** Finish pending send/receive activity and then close. */
public void setKillSentinel()
{ {
System.err.println ("*** DisThreadedNetworkInterface thread join() failed to wait for threads to die"); killed = true; // set loop sentinel for threads to finish
System.err.flush();
ie.printStackTrace(System.err);
} }
if (datagramSocket != null && !datagramSocket.isClosed()) /** Terminate the instance after completion of pending send/receive activity.
* Synchronized to prevent interleaved invocation. */
public synchronized void close()
{ {
try { try
((MulticastSocket)datagramSocket).leaveGroup(inetSocket, networkInterface); {
} catch (IOException ex) { pdus2sendLinkedBlockingQueue.clear(); // all stop
Logger.getLogger(DisThreadedNetworkInterface.class.getName()).log(Level.SEVERE, null, ex); if (!killed)
} {
datagramSocket.close(); setKillSentinel();
sleep(100l); // TODO needed? Thread.sleep(100l); // let senderThread and receiverThread stop
}
try { // join and kill senderThread
if (senderThread != null)
{
senderThread.join(4000); // wait for thread to die, msec max duration
Thread.sleep(100l);
}
}
catch (InterruptedException ie)
{
System.err.println ("*** DisThreadedNetworkInterface senderThread join() failed to wait for senderThread to die");
System.err.flush();
ie.printStackTrace(System.err);
}
// separate blocks since second join statement otherwise gets skipped if first thread timeout occurs
try { // join and kill receiverThread
if (receiverThread != null)
{
receiverThread.join(4000); // wait for thread to die, msec max duration
Thread.sleep(100l);
}
}
catch (InterruptedException ie)
{
System.err.println ("*** DisThreadedNetworkInterface receiverThread join() failed to wait for threads to die");
System.err.flush();
ie.printStackTrace(System.err);
}
// now close socket
if (datagramSocket != null && !datagramSocket.isClosed())
{
try {
((MulticastSocket)datagramSocket).leaveGroup(inetSocket, networkInterface);
} catch (IOException ex) {
Logger.getLogger(DisThreadedNetworkInterface.class.getName()).log(Level.SEVERE, null, ex);
}
datagramSocket.close();
sleep(100l); // TODO needed?
if (hasVerboseOutput()) if (hasVerboseOutput())
{
String message = TRACE_PREFIX;
// if (hasVerboseOutputIncludesTimestamp())
// message += " (timestamp " + getTimestamp()); // TODO
message += "datagramSocket.leaveGroup address=" + getAddress() + " port=" + getPort() + " close() complete";
System.out.println(message);
System.out.flush();
}
}
}
catch (Exception e)
{ {
String message = TRACE_PREFIX; System.err.println ("*** DisThreadedNetworkInterface close()() unexpected exception!");
// if (hasVerboseOutputIncludesTimestamp())
// message += " (timestamp " + getTimestamp()); // TODO
message += "datagramSocket.leaveGroup address=" + getAddress() + " port=" + getPort() + " stop() complete";
System.out.println(message);
System.out.flush();
} }
// report if successful
String senderThreadAlive = "null";
String receiverThreadAlive = "null";
if (senderThread != null)
senderThreadAlive = String.valueOf( senderThread.isAlive());
if (receiverThread != null)
receiverThreadAlive = String.valueOf(receiverThread.isAlive());
System.out.println ("*** Thread closing status: senderThread.isAlive()=" + senderThreadAlive +
" receiverThread.isAlive()=" + receiverThreadAlive);
System.err.flush(); // ensure all output sent
System.out.flush(); // ensure all output sent
} }
datagramSocket = null; // make sure
killCounter = 0; // prepare for next run
System.err.flush(); // ensure all output sent
System.out.flush(); // ensure all output sent
}
/** Thread sleep for indicated interval /** Thread sleep for indicated interval
* @param duration milliseconds */ * @param duration milliseconds */
...@@ -760,7 +800,7 @@ public class DisThreadedNetworkInterface ...@@ -760,7 +800,7 @@ public class DisThreadedNetworkInterface
/** /**
* Set new simple descriptor (such as parent class name) for this network interface, used in trace statements * Set new simple descriptor (such as parent class name) for this network interface, used in trace statements
* @param newDescriptor simple descriptor name * @param newDescriptor simple descriptor name for this interface
*/ */
public void setDescriptor(String newDescriptor) public void setDescriptor(String newDescriptor)
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment