Skip to content
Snippets Groups Projects
Commit 877f44af authored by Brutzman, Don's avatar Brutzman, Don
Browse files

improved threading shutdown of DisThreadedNetworkingInterface, PduRecorder self test

parent 2f15a86a
No related branches found
No related tags found
No related merge requests found
......@@ -4,18 +4,14 @@
*/
package edu.nps.moves.dis7.utilities;
import edu.nps.moves.dis7.pdus.Pdu;
import edu.nps.moves.dis7.pdus.DisTime;
import edu.nps.moves.dis7.enumerations.DisPduType;
import edu.nps.moves.dis7.pdus.DisTime;
import edu.nps.moves.dis7.pdus.Pdu;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
......@@ -484,6 +480,7 @@ public class DisThreadedNetworkInterface
try {
// operations are finished
dos.close();
this.close();
}
catch (IOException e) {} // shutting down, no need to report exception
};
......@@ -522,11 +519,11 @@ public class DisThreadedNetworkInterface
@Deprecated
public void kill()
{
finishOperations();
setKillSentinel();
}
/** Finish pending send/receive activity and then close. */
public void finishOperations()
public void setKillSentinel()
{
killed = true; // set loop sentinel for threads to finish
}
......@@ -535,7 +532,19 @@ public class DisThreadedNetworkInterface
* Synchronized to prevent interleaved invocation. */
public synchronized void close()
{
finishOperations();
setKillSentinel();
try
{
senderThread.join(2000); // wait for thread to die, msec max duration
receiverThread.join(2000); // wait for thread to die, msec max duration
}
catch (InterruptedException ie)
{
System.err.println ("*** DisThreadedNetworkInterface thread join() failed to wait for threads to die");
System.err.flush();
ie.printStackTrace(System.err);
}
if (datagramSocket != null && !datagramSocket.isClosed())
{
......@@ -569,8 +578,8 @@ public class DisThreadedNetworkInterface
}
catch (InterruptedException ie)
{
System.err.flush();
System.err.println ("*** " + getClass().getName() + ".sleep(" + duration + ") failed to sleep");
System.err.flush();
ie.printStackTrace(System.err);
}
}
......
......@@ -2,6 +2,7 @@ package edu.nps.moves.dis7.utilities.stream;
import com.google.common.primitives.Longs;
import edu.nps.moves.dis7.enumerations.DisPduType;
import edu.nps.moves.dis7.pdus.Pdu;
import edu.nps.moves.dis7.utilities.DisThreadedNetworkInterface;
import edu.nps.moves.dis7.utilities.PduFactory;
import java.io.BufferedWriter;
......@@ -12,9 +13,11 @@ import java.io.PrintWriter;
import java.io.Writer;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.io.FilenameUtils;
......@@ -45,25 +48,30 @@ public class PduRecorder implements PduReceiver
public static final String COMMENT_MARKER = "#";
static final String OUTPUT_DIRECTORY_DEFAULT = "./pduLog";
String outputDirectory = OUTPUT_DIRECTORY_DEFAULT;
Path outputDirectoryPath;
private String outputDirectory = OUTPUT_DIRECTORY_DEFAULT;
private Path outputDirectoryPath;
static final String DEFAULT_FILE_PREFIX = "PduCaptureLog";
static final String DISLOG_FILE_EXTENSION = ".dislog";
static final String DEFAULT_FILE_NAME = DEFAULT_FILE_PREFIX + DISLOG_FILE_EXTENSION;
static final String START_COMMENT_MARKER = COMMENT_MARKER + " Start, ";
static final String FINISH_COMMENT_MARKER = COMMENT_MARKER + " Finish, ";
static final String ENCODING_BASE64 = "ENCODING_BASE64";
static final String ENCODING_PLAINTEXT = "ENCODING_PLAINTEXT";
// not yet implemented
static final String ENCODING_BINARY = "ENCODING_BINARY"; // TODO likely requires different code path
static final String ENCODING_XML = "ENCODING_XML"; // TODO, repeat Open-DIS version 4 effort
static final String ENCODING_CDIS = "ENCODING_CDIS"; // future work based on new SISO standard
static final String ENCODING_EXI = "ENCODING_EXI"; // TODO, use Exificient or Nagasena libraries
static final String ENCODING_JSON = "ENCODING_JSON"; // TODO, repeat Open-DIS version 4 effort
static final String ENCODING_MAK_DATA_LOGGER = "ENCODING_MAK_DATA_LOGGER"; // verbose pretty-print. perhaps output only (MAK format itself is binary)
static final String ENCODING_WIRESHARK_DATA_LOGGER = "ENCODING_WIRESHARK_DATA_LOGGER"; //
static final String ENCODING_CDIS = "ENCODING_CDIS"; // future work based on new SISO standard
static final List<String> ENCODING_OPTIONS_LIST = new ArrayList<>();
static final List<String> ENCODING_OPTIONS_TODO = new ArrayList<>();
private static String pduLogEncoding = ENCODING_PLAINTEXT; // TODO use Java enumerations, generalize/share across library
private static String encodingPduLog = ENCODING_PLAINTEXT; // default
private String TRACE_PREFIX = ("[pduRecorder " + getDescriptor()).trim() + "] ";
private String descriptor = new String();
......@@ -80,6 +88,23 @@ public class PduRecorder implements PduReceiver
private final int pduCount = 0; // debug
private boolean headerWritten = false;
private boolean running = true; // starts recording by default
private void initialize()
{
if (ENCODING_OPTIONS_LIST.isEmpty())
{
ENCODING_OPTIONS_LIST.add(ENCODING_PLAINTEXT );
ENCODING_OPTIONS_LIST.add(ENCODING_BASE64 );
// not yet implemented
ENCODING_OPTIONS_TODO.add(ENCODING_BINARY ); // IEEE DIS format
ENCODING_OPTIONS_TODO.add(ENCODING_XML );
ENCODING_OPTIONS_TODO.add(ENCODING_CDIS );
ENCODING_OPTIONS_TODO.add(ENCODING_EXI );
ENCODING_OPTIONS_TODO.add(ENCODING_JSON );
ENCODING_OPTIONS_TODO.add(ENCODING_MAK_DATA_LOGGER );
ENCODING_OPTIONS_TODO.add(ENCODING_WIRESHARK_DATA_LOGGER);
}
}
/**
* Default constructor that uses default values for output directory, DIS address and port.
......@@ -89,7 +114,7 @@ public class PduRecorder implements PduReceiver
*/
public PduRecorder() throws IOException
{
this (OUTPUT_DIRECTORY_DEFAULT, DEFAULT_DIS_ADDRESS, DEFAULT_DIS_PORT);
this (OUTPUT_DIRECTORY_DEFAULT, DEFAULT_DIS_ADDRESS, DEFAULT_DIS_PORT);
}
/**
......@@ -103,7 +128,7 @@ public class PduRecorder implements PduReceiver
*/
public PduRecorder(String initialOutputDirectory) throws IOException
{
this(initialOutputDirectory, DEFAULT_DIS_ADDRESS, DEFAULT_DIS_PORT);
this(initialOutputDirectory, DEFAULT_DIS_ADDRESS, DEFAULT_DIS_PORT);
}
/** Constructor to let the user specify all required parameters.
......@@ -117,6 +142,7 @@ public class PduRecorder implements PduReceiver
@SuppressWarnings("Convert2Lambda")
public PduRecorder(String initialOutputDirectory, String initialAddress, int initialPort)
{
initialize();
outputDirectoryPath = new File(initialOutputDirectory).toPath();
setDescriptor("PduRecorder"); // default
setAddress(initialAddress);
......@@ -128,16 +154,35 @@ public class PduRecorder implements PduReceiver
*
* @return the pduLogEncoding
*/
public static String getPduLogEncoding()
public static String getEncodingPduLog()
{
return pduLogEncoding;
return encodingPduLog;
}
/**
* @param newPduLogEncoding the pduLogEncoding to set
* @param newEncodingPduLog the pduLogEncoding to set
*/
public static void setPduLogEncoding(String newPduLogEncoding) {
pduLogEncoding = newPduLogEncoding;
public void setEncodingPduLog(String newEncodingPduLog)
{
newEncodingPduLog = newEncodingPduLog.trim();
String errorMessage = "*** setEncodingPduLog(" + newEncodingPduLog + ") ";
if (ENCODING_OPTIONS_LIST.contains(newEncodingPduLog))
{
encodingPduLog = newEncodingPduLog;
return;
}
else if (ENCODING_OPTIONS_TODO.contains(newEncodingPduLog))
{
errorMessage += " is not yet a supported encoding";
}
else
{
errorMessage += " is not yet a recognizeded encoding";
}
errorMessage += ", encodingPduLog=" + encodingPduLog + " is unchanged";
System.err.println (errorMessage);
System.err.flush(); // since network threads may be occurring
// return
}
/** Resume instance operation
......@@ -154,7 +199,7 @@ public class PduRecorder implements PduReceiver
else running = true;
}
/** Start instance operation
/** Start instance operation, resetting network interface and file recording
* @see stop()
* @see pause()
* @see resume()
......@@ -162,12 +207,12 @@ public class PduRecorder implements PduReceiver
public void start()
{
try {
logFile = createUniquePduLogFile(outputDirectoryPath, getLogFileName());
logFile = createUniquePduLogFile(getOutputDirectoryPath(), getLogFileName());
logFileWriter = new PrintWriter(new BufferedWriter(new FileWriter(logFile)));
}
catch (IOException ex)
{
System.err.println("Exception when creating log file in directory=" + outputDirectoryPath.toAbsolutePath() + "\n" +
System.err.println("Exception when creating log file in directory=" + getOutputDirectoryPath().toAbsolutePath() + "\n" +
" " + ex.getClass().getSimpleName() + ": " + ex.getLocalizedMessage());
}
if (disThreadedNetworkInterface == null)
......@@ -207,7 +252,8 @@ public class PduRecorder implements PduReceiver
if (disThreadedNetworkInterface != null)
{
disThreadedNetworkInterface.removeRawListener(disRawPduListener);
disThreadedNetworkInterface.finishOperations();
disThreadedNetworkInterface.setKillSentinel();
disThreadedNetworkInterface = null; // ensure reset upon re-start
}
writeFooter();
......@@ -247,7 +293,7 @@ public class PduRecorder implements PduReceiver
byte[] buffsized = Arrays.copyOf(newBuffer, newLength);
DisPduType pduType;
switch (pduLogEncoding)
switch (encodingPduLog)
{
case ENCODING_BASE64:
sb.append(base64Encoder.encodeToString(timeByteArray));
......@@ -266,7 +312,7 @@ public class PduRecorder implements PduReceiver
break;
default:
System.err.println ("Encoding " + pduLogEncoding + " not recognized or supported");
System.err.println ("Encoding " + encodingPduLog + " not recognized or supported");
}
if (!headerWritten) {
writeHeader();
......@@ -316,7 +362,7 @@ public class PduRecorder implements PduReceiver
try
{
logFileWriter.write(START_COMMENT_MARKER + pduLogEncoding + ", " + TRACE_PREFIX + timeStamp + ", DIS capture file, " + logFile.getPath());
logFileWriter.write(START_COMMENT_MARKER + encodingPduLog + ", " + TRACE_PREFIX + timeStamp + ", DIS capture file, " + logFile.getPath());
((PrintWriter) logFileWriter).println();
}
catch (IOException ex)
......@@ -331,7 +377,7 @@ public class PduRecorder implements PduReceiver
try
{
logFileWriter.write(FINISH_COMMENT_MARKER + pduLogEncoding + ", " + TRACE_PREFIX + timeStamp + ", DIS capture file, " + logFile.getPath());
logFileWriter.write(FINISH_COMMENT_MARKER + encodingPduLog + ", " + TRACE_PREFIX + timeStamp + ", DIS capture file, " + logFile.getPath());
((PrintWriter) logFileWriter).println();
}
catch (IOException ex)
......@@ -392,56 +438,89 @@ public class PduRecorder implements PduReceiver
return running;
}
/** Entry point invocation. Saves a PDU output log to ./pduLog. Invoking the
* edu.nps.moves.dis7.examples.PduReaderPlayer will playback all logs written
* to the log directory
/**
* Entry point invocation, runs selfTest().
*
* @param args none supported, TODO offer path/filename
*/
public static void main(String[] args)
{
try
{
PduRecorder pduRecorder = new PduRecorder();
pduRecorder.selfTest(args);
}
catch (IOException ioe)
{
ioe.printStackTrace();
}
}
/** This selfTest() method saves a PDU output log to project pduLog/ directory.
* Separately Invoking the edu.nps.moves.dis7.examples.PduReaderPlayer will playback all logs
* written to that log directory
*
* @param args none supported, TODO offer path/filename
*/
public static void main(String[] args)
public void selfTest(String[] args)
{
initialize();
System.out.println("dis7.utilities.stream.PduRecorder main() performs self-test by sending full set of PDUs");
PduFactory factory = new PduFactory(); //default appid, country, etc.
PduRecorder pduRecorder;
try {
PduFactory factory = new PduFactory(); // default appid, country, etc.
PduRecorder pduRecorder;
DisThreadedNetworkInterface disNetworkInterface;
try
{
pduRecorder = new PduRecorder(); // default address, port, output directory path
pduRecorder.setDescriptor("PduRecorder main() self test");
// pduRecorder.setPort(1); // option to avoid listening to other PDU streams during self test
pduRecorder.start();
}
catch(IOException ex) {
System.err.println("Exception creating recorder: " + ex.getLocalizedMessage());
catch(IOException ex)
{
System.err.println("Exception creating PduRecorder: " + ex.getLocalizedMessage());
System.err.println(ex.getStackTrace());
return;
}
System.out.println("dis7.utilities.stream.PduRecorder pduRecorder started... isRunning()=" + pduRecorder.isRunning());
DisPduType allPDUTypesArray[] = DisPduType.values();
System.out.println("dis7.utilities.stream.PduRecorder allPDUTypesArray created, length=" + allPDUTypesArray.length + " ...");
System.out.flush(); // ensure all output sent
Arrays.stream(allPDUTypesArray).forEach((DisPduType pduTypeValue)->
for (String currentEncoding : ENCODING_OPTIONS_LIST)
{
if (pduTypeValue != DisPduType.OTHER)
{
try {
pduRecorder.getDisThreadedNetworkInterface().send(factory.createPdu(pduTypeValue));
Thread.sleep (50L); // let send/receive threads and streams catch up
}
catch (InterruptedException ex) {
System.err.println("Exception sending Pdu: "+ex.getLocalizedMessage());
}
}
else
{
System.err.println("Found pduTypeValue=DisPduType.OTHER=" + pduTypeValue);
}
});
System.out.flush();
System.err.flush(); // ensure all output sent
pduRecorder.stop();
System.out.println("dis7.utilities.stream.PduRecorder pduRecorder complete... isRunning()=" + pduRecorder.isRunning());
System.out.println("=================================================");
System.out.println("Test PduRecorder encoding " + currentEncoding);
pduRecorder.setEncodingPduLog(currentEncoding);
pduRecorder.setDescriptor("PduRecorder main() self test");
// pduRecorder.setPort(1); // option to avoid listening to other PDU streams during self test
pduRecorder.setLogFileName("PduRecorderSelfTest" + currentEncoding + "_" + DEFAULT_FILE_PREFIX + DISLOG_FILE_EXTENSION);
pduRecorder.start();
disNetworkInterface = pduRecorder.getDisThreadedNetworkInterface(); // must reinitialize after each start
System.out.println("dis7.utilities.stream.PduRecorder pduRecorder started... isRunning()=" + pduRecorder.isRunning());
for (int i=1; i < allPDUTypesArray.length; i = i + 1)
{
DisPduType pduTypeValue = allPDUTypesArray[i];
if (pduTypeValue != DisPduType.OTHER)
{
try {
Pdu nextPdu = factory.createPdu(allPDUTypesArray[i]);
disNetworkInterface.send(nextPdu);
Thread.sleep (50L); // let send/receive threads and streams catch up
}
catch (InterruptedException ex) {
System.err.println("Exception sending Pdu " + pduTypeValue + ": " + ex.getLocalizedMessage());
}
}
else
{
System.err.println("Found pduTypeValue=DisPduType.OTHER=" + pduTypeValue);
}
};
System.out.flush();
System.err.flush(); // ensure all output sent
pduRecorder.stop();
System.out.println("dis7.utilities.stream.PduRecorder pduRecorder complete... isRunning()=" + pduRecorder.isRunning());
}
// end of loop ENCODING_OPTIONS_LIST
}
/**
* Get current multicast (or unicast) network address for send and receive connections.
......@@ -458,10 +537,10 @@ public class PduRecorder implements PduReceiver
* @param newAddress the new network address to set
*/
public void setAddress(String newAddress) {
this.disAddress = newAddress;
if (isRunning())
if (isRunning() && (this.disAddress != newAddress))
System.out.println(TRACE_PREFIX + "*** warning, attempting to change network address while running...");
// TODO warn if netIF already created
this.disAddress = newAddress;
}
/** Get network port used, multicast or unicast.
* @see <a href="https://en.wikipedia.org/wiki/Port_(computer_networking)">https://en.wikipedia.org/wiki/Port_(computer_networking)</a>
......@@ -478,10 +557,10 @@ public class PduRecorder implements PduReceiver
*/
public void setPort(int newPortValue)
{
this.disPort = newPortValue;
if (isRunning())
if (isRunning() && (this.disPort != newPortValue))
System.out.println(TRACE_PREFIX + "*** warning, attempting to change network port while running...");
// TODO warn if netIF already created
this.disPort = newPortValue;
}
/**
* Get simple descriptor (such as parent class name) for this network interface, used in trace statements
......@@ -502,7 +581,7 @@ public class PduRecorder implements PduReceiver
}
/**
* Set whether or not trace statements are provided when packets are sent or received.
* @param newValue the verbose status to set. Also resets verboseReceipt and verboseSending to match.
* @param newValue the verbose status to set, also resets verboseReceipt and verboseSending to match.
* @see verboseReceipt
* @see verboseSending
*/
......@@ -609,4 +688,31 @@ public class PduRecorder implements PduReceiver
public void setLogFileName(String logFileName) {
this.logFileName = logFileName;
}
/**
* Get output directory for this PduRecorder
* @return the outputDirectory
*/
public String getOutputDirectory() {
return outputDirectory;
}
/**
* Set output directory for this PduRecorder
* @param outputDirectory the outputDirectory to set
*/
public void setOutputDirectory(String outputDirectory) {
this.outputDirectory = outputDirectory;
outputDirectoryPath = new File(outputDirectory).toPath();
}
/**
* Get output directory path for this PduRecorder.
* To change outputDirectoryPath, invoke setOutputDirectory().
* @see setOutputDirectory(String)
* @return the outputDirectoryPath
*/
public Path getOutputDirectoryPath() {
return outputDirectoryPath;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment