Skip to content
Snippets Groups Projects
Commit 8487d371 authored by Brutzman, Don's avatar Brutzman, Don
Browse files

using Thread.interrupt() methods external to switch, along with corresponding...

using Thread.interrupt() methods external to switch, along with  corresponding while test, finally killed threads
parent 9cf956ca
No related branches found
No related tags found
No related merge requests found
......@@ -296,33 +296,33 @@ public class DisThreadedNetworkInterface
/* *************** networking i/o ************* */
private PduFactory pduFactory = new PduFactory();
private Thread senderThread;
private Thread receiverThread;
private Thread sendingThread;
private Thread receiveThread;
/** Initialization of threads, otherwise no action that while they remain running */
private synchronized void createSenderReceiverThreads()
private synchronized void createThreads()
{
if (receiverThread == null)
if (receiveThread == null)
{
System.out.println("DisThreadedNetworkInterface initializeSocketSenderReceiverThreads() starting receiverThread...");
receiverThread = new Thread(receiverThreadRunnable, "receiverThread");
// System.out.println("DisThreadedNetworkInterface createThreads() start receiveThread...");
receiveThread = new Thread(receiveThreadRunnable, "receiveThread");
// https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java
receiverThread.setDaemon(false); // user thread, not system thread
receiverThread.setPriority(Thread.NORM_PRIORITY);
receiverThread.start();
receiveThread.setDaemon(false); // user thread, not system thread
receiveThread.setPriority(Thread.NORM_PRIORITY);
receiveThread.start();
}
System.out.println("DisThreadedNetworkInterface initializeSocketSenderReceiverThreads() receiverThread.isAlive()=" + receiverThread.isAlive());
System.out.println("DisThreadedNetworkInterface createThreads() receiveThread.isAlive()=" + receiveThread.isAlive());
if (senderThread == null)
if (sendingThread == null)
{
System.out.println("DisThreadedNetworkInterface initializeSocketSenderReceiverThreads() starting senderThread...");
senderThread = new Thread(senderThreadRunnable, "senderThread");
// System.out.println("DisThreadedNetworkInterface createThreads() start sendingThread...");
sendingThread = new Thread(sendingThreadRunnable, "sendingThread");
// https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java
senderThread.setDaemon(false); // user thread, not system thread
senderThread.setPriority(Thread.NORM_PRIORITY);
senderThread.start();
sendingThread.setDaemon(false); // user thread, not system thread
sendingThread.setPriority(Thread.NORM_PRIORITY);
sendingThread.start();
}
System.out.println("DisThreadedNetworkInterface initializeSocketSenderReceiverThreads() senderThread.isAlive()=" + receiverThread.isAlive());
System.out.println("DisThreadedNetworkInterface createThreads() sendingThread.isAlive()=" + receiveThread.isAlive());
}
/**
* Can be used to restart DisThreadedNetworkInterface if closed.
......@@ -334,7 +334,7 @@ public class DisThreadedNetworkInterface
{
createDatagramSocket(); // common asset, synchronized to prevent interleaved reentry
createSenderReceiverThreads();
createThreads();
}
/**
* Create datagram socket if not already available; can also be invoked by
......@@ -358,17 +358,17 @@ public class DisThreadedNetworkInterface
{
System.out.println(message + "datagramSocket is already connected...");
}
if (datagramSocket.isClosed())
else if (datagramSocket.isClosed())
{
datagramSocket = null;
System.err.println(" *** " + TRACE_PREFIX + "datagramSocket was available but closed before createDatagramSocket(), resetting...");
System.err.println(" *** " + TRACE_PREFIX + "datagramSocket was previously used but closed, resetting...");
}
}
else // (datagramSocket == null)
if (datagramSocket == null) // may have just been reset
{
try
{
System.out.println(message + "datagramSocket is getting connected...");
// System.out.println(message + "datagramSocket is getting connected...");
// The initial value of the SO_BROADCAST socket option is FALSE
datagramSocket = new MulticastSocket(getPort());
datagramSocket.setSoTimeout(1000); // msec timeout on reading and then continue looping in order to avoid blocking
......@@ -398,7 +398,7 @@ public class DisThreadedNetworkInterface
}
@SuppressWarnings("SleepWhileInLoop") // intentional
private Runnable receiverThreadRunnable = () -> {
private Runnable receiveThreadRunnable = () -> {
int pduReceiptCounter = 0;
......@@ -407,51 +407,47 @@ public class DisThreadedNetworkInterface
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_TRANSMISSION_UNIT_SIZE);
DatagramPacket receivedPacket = new DatagramPacket(byteBuffer.array(), byteBuffer.capacity());
while (!killed) // keep trying even if error occurred
while (!killed && !Thread.currentThread().isInterrupted()) // loop until terminated
{
try
{
// while (!killed) // loop until terminated
// {
datagramSocket.receive(receivedPacket); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port
toRawListeners(receivedPacket.getData(), receivedPacket.getLength());
datagramSocket.receive(receivedPacket); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port
toRawListeners(receivedPacket.getData(), receivedPacket.getLength());
Pdu nextPdu = pduFactory.createPdu(byteBuffer);
Pdu nextPdu = pduFactory.createPdu(byteBuffer);
if (nextPdu != null)
if (nextPdu != null)
{
pduReceiptCounter++; // TODO experimental, add to generator as a commented-out diagnostic; consider adding diagnostic mode
if (hasVerboseOutput() && hasVerboseReceipt())
{
pduReceiptCounter++; // TODO experimental, add to generator as a commented-out diagnostic; consider adding diagnostic mode
if (hasVerboseOutput() && hasVerboseReceipt())
{
String pad = "";
if (pduReceiptCounter < 10)
pad = " ";
String message = TRACE_PREFIX + "[receipt " + pad + pduReceiptCounter + "] " + nextPdu.getPduType().toString();
if (hasVerboseOutputIncludesTimestamp())
message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp());
if (nextPdu.getPduType() == DisPduType.ENTITY_STATE)
message += " " + String.format("%11s", ((EntityStatePdu)nextPdu).getMarkingString());
message += ", size " + nextPdu.getMarshalledSize() + " bytes)";
System.err.flush();
System.out.println(message);
System.out.flush();
}
toListeners(nextPdu);
String pad = "";
if (pduReceiptCounter < 10)
pad = " ";
String message = TRACE_PREFIX + "[receipt " + pad + pduReceiptCounter + "] " + nextPdu.getPduType().toString();
if (hasVerboseOutputIncludesTimestamp())
message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp());
if (nextPdu.getPduType() == DisPduType.ENTITY_STATE)
message += " " + String.format("%11s", ((EntityStatePdu)nextPdu).getMarkingString());
message += ", size " + nextPdu.getMarshalledSize() + " bytes)";
System.err.flush();
System.out.println(message);
System.out.flush();
}
byteBuffer.clear();
// }
// Thread.sleep (100L); // loop successfully killed
}
toListeners(nextPdu);
}
byteBuffer.clear();
}
catch (IOException ex)
{
// read commonly times out while waiting for PDUs to arrive, and so this trace statement is for debugging
// System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receiverThread: " + ex.getLocalizedMessage());
// System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface receiveThread: " + ex.getLocalizedMessage());
}
}
// returning kills thread, do not put any other steps here
};
private final Runnable senderThreadRunnable = () -> {
private final Runnable sendingThreadRunnable = () -> {
int pduSentCounter = 0;
......@@ -460,47 +456,43 @@ public class DisThreadedNetworkInterface
dos = new DataOutputStream(baos);
packet = new DatagramPacket(baos.toByteArray(), baos.size(), inetSocket);
while (!killed) // keep trying even if error occurred
while (!killed && !Thread.currentThread().isInterrupted()) // loop until terminated
{
try
{
// while (!killed) // loop until terminated
// {
Pdu nextPdu = pdus2send.take();
nextPdu.marshal(dos);
packet.setData(baos.toByteArray());
datagramSocket.send(packet);
pduSentCounter++;
if (hasVerboseOutput() && hasVerboseSending())
{
String pad = "";
if (pduSentCounter < 10)
pad = " ";
String message = TRACE_PREFIX + "[sending " + pad + pduSentCounter + "] " + nextPdu.getPduType().toString();
if (hasVerboseOutputIncludesTimestamp())
message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp());
if (nextPdu.getPduType() == DisPduType.ENTITY_STATE)
message += " " + String.format("%11s", ((EntityStatePdu)nextPdu).getMarkingString());
message += ", size " + nextPdu.getMarshalledSize() + " bytes)";
System.err.flush();
System.out.println(message);
System.out.flush();
}
dos.flush(); // immediately force pdu write
baos.reset(); // prepare for next send
// }
// if (killed) // operations are finished
// {
// // check if looping complete, cleanup here to ensure thread exit is clean
// System.out.println("*** DisThreadedNetworkInterface senderThreadRunnable killed=true");
// }
// Thread.sleep (100L); // loop successfully killed
Pdu nextPdu = pdus2send.take();
nextPdu.marshal(dos);
packet.setData(baos.toByteArray());
datagramSocket.send(packet);
pduSentCounter++;
if (hasVerboseOutput() && hasVerboseSending())
{
String pad = "";
if (pduSentCounter < 10)
pad = " ";
String message = TRACE_PREFIX + "[sending " + pad + pduSentCounter + "] " + nextPdu.getPduType().toString();
if (hasVerboseOutputIncludesTimestamp())
message += " (timestamp " + DisTime.timeStampToString(nextPdu.getTimestamp());
if (nextPdu.getPduType() == DisPduType.ENTITY_STATE)
message += " " + String.format("%11s", ((EntityStatePdu)nextPdu).getMarkingString());
message += ", size " + nextPdu.getMarshalledSize() + " bytes)";
System.err.flush();
System.out.println(message);
System.out.flush();
}
dos.flush(); // immediately force pdu write
baos.reset(); // prepare for next send
}
catch (InterruptedException ex)
{
// no response, expected termination
}
catch (Exception ex)
{
System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface senderThread: " + ex.getLocalizedMessage());
System.err.println(TRACE_PREFIX + "Exception in DisThreadedNetworkInterface sendingThread: " + ex.getLocalizedMessage());
ex.printStackTrace();
}
}
// returning kills thread, do not put any other steps here
......@@ -540,13 +532,20 @@ public class DisThreadedNetworkInterface
@Deprecated
public void kill()
{
setKillSentinel();
setKillSentinelAndInterrupts();
}
/** Finish pending send/receive activity and then close. */
public void setKillSentinel()
/** Tell sendingThread and receiveThread to stop. */
public void setKillSentinelAndInterrupts() // TODO rename AndInterrupts
{
killed = true; // set loop sentinel for threads to finish
// https://stackoverflow.com/questions/26647840/how-do-i-interrupt-kill-a-hung-thread-in-java
sendingThread.interrupt();
receiveThread.interrupt();
System.out.println ("*** setKillSentinelAndInterrupts() killed=" + killed +
" sendingThread.isInterrupted()=" + sendingThread.isInterrupted() +
" receiveThread.isInterrupted()=" + receiveThread.isInterrupted());
}
/** Terminate the instance after completion of pending send/receive activity.
......@@ -557,43 +556,17 @@ public class DisThreadedNetworkInterface
{
if (!killed)
{
setKillSentinel(); // killed = true;
Thread.sleep(100l); // let senderThread and receiverThread stop
setKillSentinelAndInterrupts(); // killed = true;
Thread.sleep(100l); // let sendingThread and receiveThread stop
}
pdus2send.clear(); // all stop
dos.flush(); // immediately force pdu write
dos.flush(); // immediately force pdu write, if any remain
baos.close();
dos.close();
System.out.println ("*** Thread close(): pdus2send.size()=" + pdus2send.size() +
" baos.size()=" + baos.size() + " dos.size()=" + dos.size());
System.out.println ("*** DisThreadedNetworkInterface close():" +
" pdus2send.size()=" + pdus2send.size() +
" baos.size()=" + baos.size() + " dos.size()=" + dos.size());
// try { // join and kill senderThread
// if (senderThread != null)
// {
// senderThread.join(4000); // wait for thread to die, msec max duration
// Thread.sleep(100l);
// }
// }
// catch (InterruptedException ie)
// {
// System.err.println ("*** DisThreadedNetworkInterface senderThread join() failed to wait for senderThread to die");
// System.err.flush();
// ie.printStackTrace(System.err);
// }
// separate blocks since second join statement otherwise gets skipped if first thread timeout occurs
// try { // join and kill receiverThread
// if (receiverThread != null)
// {
// receiverThread.join(4000); // wait for thread to die, msec max duration
// Thread.sleep(100l);
// }
// }
// catch (InterruptedException ie)
// {
// System.err.println ("*** DisThreadedNetworkInterface receiverThread join() failed to wait for threads to die");
// System.err.flush();
// ie.printStackTrace(System.err);
// }
// now close socket (after killing threads so that hopefully the socket doesn't lock them)
if (datagramSocket != null && !datagramSocket.isClosed())
{
......@@ -616,26 +589,10 @@ public class DisThreadedNetworkInterface
System.out.flush();
}
}
killThread ( senderThread);
killThread (receiverThread);
Thread.sleep(1000l);
killThread (sendingThread); // making sure
killThread (receiveThread); // making sure
Thread.sleep(500l);
reportThreadStatus();
// try again!! to close sender thread, TODO what is blocking?
// try { // join and kill senderThread
// if (senderThread != null)
// {
// senderThread.join(6000); // wait for thread to die, msec max duration
// Thread.sleep(100l);
// }
// }
// catch (InterruptedException ie)
// {
// System.err.println ("*** DisThreadedNetworkInterface senderThread join() failed to wait for senderThread to die");
// System.err.flush();
// ie.printStackTrace(System.err);
// }
}
catch (Exception e)
{
......@@ -647,6 +604,7 @@ public class DisThreadedNetworkInterface
try { // join and kill threadToKill
if (threadToKill != null)
{
threadToKill.interrupt();
threadToKill.join(4000); // wait for thread to die, msec max duration
Thread.sleep(100l);
}
......@@ -661,23 +619,24 @@ public class DisThreadedNetworkInterface
String threadAlive = "null";
if (threadToKill != null)
threadAlive = String.valueOf(threadToKill.isAlive());
System.out.println ("*** killThread() status: " + threadToKill.getName() + ".isAlive()=" + threadAlive);
System.out.println ("*** killThread() status: " + threadToKill.getName() + ".isAlive()=" + threadAlive +
" " + threadToKill.getName() + ".isInterrupted()=" + threadToKill.isInterrupted());
System.err.flush(); // ensure all output sent
System.out.flush(); // ensure all output sent
}
/** Report whether senderThread and receiverThread are alive */
/** Report whether sendingThread and receiveThread are alive */
public void reportThreadStatus()
{
// report if successful
String senderThreadAlive = "null";
String receiverThreadAlive = "null";
if (senderThread != null)
senderThreadAlive = String.valueOf( senderThread.isAlive());
if (receiverThread != null)
receiverThreadAlive = String.valueOf(receiverThread.isAlive());
System.out.println ("*** Thread closing status: senderThread.isAlive()=" + senderThreadAlive +
" receiverThread.isAlive()=" + receiverThreadAlive);
String sendingThreadAlive = "null";
String receiveThreadAlive = "null";
if (sendingThread != null)
sendingThreadAlive = String.valueOf( sendingThread.isAlive());
if (receiveThread != null)
receiveThreadAlive = String.valueOf(receiveThread.isAlive());
System.out.println ("*** Thread close status: sendingThread.isAlive()=" + sendingThreadAlive +
" receiveThread.isAlive()=" + receiveThreadAlive);
System.err.flush(); // ensure all output sent
System.out.flush(); // ensure all output sent
}
......@@ -865,7 +824,7 @@ public class DisThreadedNetworkInterface
{
DisThreadedNetworkInterface disThreadedNetworkInterface;
System.out.println("*** DisThreadedNetworkInterface main() test started...");
System.out.println("*** DisThreadedNetworkInterface main() self test started...");
String selfTestDescriptor = "main() self test";
if ((args != null) && args.length == 3)
......@@ -874,7 +833,7 @@ public class DisThreadedNetworkInterface
disThreadedNetworkInterface = new DisThreadedNetworkInterface( /* default address, port */ selfTestDescriptor);
else disThreadedNetworkInterface = new DisThreadedNetworkInterface(args[0], Integer.parseInt(args[1]), selfTestDescriptor);
System.out.println("*** DisThreadedNetworkInterface main() self test created instance.");
System.out.println("*** DisThreadedNetworkInterface main() self test initialized...");
try
{
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment