Skip to content
Snippets Groups Projects
Commit 831447a7 authored by J. M. Bailey's avatar J. M. Bailey
Browse files

Redo threading, add stop, pause methods

parent df08db7a
No related branches found
No related tags found
No related merge requests found
/* /**
* To change this license header, choose License Headers in Project Properties. * Copyright (c) 2008-2019, MOVES Institute, Naval Postgraduate School. All rights reserved.
* To change this template file, choose Tools | Templates * This work is licensed under the BSD open source license, available at https://www.movesinstitute.org/licenses/bsd.html
* and open the template in the editor.
*/ */
package edu.nps.moves.dis7.util.playerrecorder; package edu.nps.moves.dis7.util.playerrecorder;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import java.io.*; import java.io.*;
...@@ -24,158 +22,217 @@ public class Player ...@@ -24,158 +22,217 @@ public class Player
private Path disLogDirectory; private Path disLogDirectory;
private String ip; private String ip;
private int port; private int port;
private Thread thrd;
public Player(String ip, int port, Path disLogDirectory)
public Player(String ip, int port, Path disLogDirectory) throws IOException
{ {
this.disLogDirectory = disLogDirectory; this.disLogDirectory = disLogDirectory;
this.ip = ip; this.ip = ip;
this.port = port; this.port = port;
thrd = new Thread(() -> begin());
thrd.setPriority(Thread.NORM_PRIORITY);
thrd.setName("PlayerThread");
thrd.setDaemon(true);
thrd.start();
} }
private Integer scenarioPduCount = null; private Integer scenarioPduCount = null;
private boolean showPduCountsOneTime = false; private boolean showPduCountsOneTime = false;
private int pduCount = 0; private int pduCount = 0;
private DatagramSocket dsock; private DatagramSocket dsock;
private BufferedReader brdr; private BufferedReader brdr;
private Long startNanoTime=null; private Long startNanoTime = null;
private boolean paused = false;
@SuppressWarnings("StatementWithEmptyBody") @SuppressWarnings("StatementWithEmptyBody")
public void startResume() throws IOException public void begin()
{ {
System.out.println("Replaying DIS logs."); try {
InetAddress addr = InetAddress.getByName(ip); System.out.println("Replaying DIS logs.");
InetAddress addr = InetAddress.getByName(ip);
FilenameFilter filter = (dir, name) -> name.endsWith(Recorder.DISLOG_FILE_TAIL) && !name.startsWith(".");
FilenameFilter filter = (dir, name) -> name.endsWith(Recorder.DISLOG_FILE_TAIL) && !name.startsWith(".");
File[] fs = disLogDirectory.toFile().listFiles(filter);
if(fs == null) File[] fs = disLogDirectory.toFile().listFiles(filter);
fs = new File[0]; if (fs == null)
fs = new File[0];
Arrays.sort(fs,(f1,f2)->{return f1.getName().compareTo(f2.getName());});
//Arrays.sort(fs, Comparator.comparing(File::getName)); Arrays.sort(fs, (f1, f2) -> {
return f1.getName().compareTo(f2.getName());
dsock = new DatagramSocket(); });
Base64.Decoder decdr = Base64.getDecoder(); //Arrays.sort(fs, Comparator.comparing(File::getName));
for(File f : fs) { dsock = new DatagramSocket();
System.out.println("Replaying "+f.getAbsolutePath()); Base64.Decoder decdr = Base64.getDecoder();
brdr = new BufferedReader(new FileReader(f), 1024 * 200); // 200kb buffer
for (File f : fs) {
String line = brdr.readLine(); System.out.println("Replaying " + f.getAbsolutePath());
while(line != null) { brdr = new BufferedReader(new FileReader(f), 1024 * 200); // 200kb buffer
if(line.length()<=0)
; // blank lines ok String line = brdr.readLine();
else if (line.startsWith(COMMENT_MARKER)) { while (line != null && !Thread.interrupted()) {
if(handleComment(line,f)) { while (paused) {
// here if we got an end comment sleep(500l); // half sec
break; // out of read loop
} }
} if (line.length() <= 0)
else { ; // blank lines ok
else if (line.startsWith(COMMENT_MARKER)) {
String[] sa = line.split(","); if (handleComment(line, f)) {
if (sa.length != 2) { // here if we got an end comment
System.err.println("Error: parsing error. Line follows."); break; // out of read loop
System.err.println(line); }
byebye();
} }
else {
if (startNanoTime == null)
startNanoTime = System.nanoTime(); String[] sa = line.split(",");
if (sa.length != 2) {
byte[] pduTimeBytes = decdr.decode(sa[0]); System.err.println("Error: parsing error. Line follows.");
long pduTimeInterval = Longs.fromByteArray(pduTimeBytes); System.err.println(line);
// This is a relative number in nanoseconds of the time of packet reception minus first packet reception for scenario. byebye();
}
long targetSimTime = startNanoTime + pduTimeInterval; // when we should send the packet
long now = System.nanoTime(); if (startNanoTime == null)
long sleepTime = targetSimTime - now; //System.nanoTime(); // the difference between then and now startNanoTime = System.nanoTime();
if (sleepTime > 20000000) { // 20 ms // byte[] pduTimeBytes = decdr.decode(sa[0]);
//System.out.println("sim interval = " + pduTimeInterval + ", sleeping for " + sleepTime/1000000l + " ms"); long pduTimeInterval = Longs.fromByteArray(pduTimeBytes);
sleep(sleepTime/1000000L, (int)(sleepTime % 1000000L)); // This is a relative number in nanoseconds of the time of packet reception minus first packet reception for scenario.
long targetSimTime = startNanoTime + pduTimeInterval; // when we should send the packet
long now = System.nanoTime();
long sleepTime = targetSimTime - now; //System.nanoTime(); // the difference between then and now
if (sleepTime > 20000000) { // 20 ms //
//System.out.println("sim interval = " + pduTimeInterval + ", sleeping for " + sleepTime/1000000l + " ms");
sleep(sleepTime / 1000000L, (int) (sleepTime % 1000000L));
}
byte[] buffer = decdr.decode(sa[1]);
DatagramPacket packet = new DatagramPacket(buffer, buffer.length, addr, port);
dsock.send(packet);
pduCount++;
if (scenarioPduCount != null)
scenarioPduCount++;
if (showPduCountsOneTime || pduCount % 5 == 0)
showCounts();
} }
byte[] buffer = decdr.decode(sa[1]); line = brdr.readLine();
DatagramPacket packet = new DatagramPacket(buffer, buffer.length, addr, port);
dsock.send(packet);
pduCount++;
if(scenarioPduCount != null)
scenarioPduCount++;
if(showPduCountsOneTime || pduCount % 5 == 0)
showCounts();
} }
brdr.close();
line = brdr.readLine();
} }
brdr.close(); }
catch (Exception ex) {
System.err.println("Exception reading/writing pdus: "+ex.getClass().getSimpleName()+": "+ex.getLocalizedMessage());
thrd = null;
closer();
} }
} }
private void sleep(long ms)
{
try {
Thread.sleep(ms);
}
catch (InterruptedException ex) {
}
}
private void showCounts() private void showCounts()
{ {
if(scenarioPduCount != null) if (scenarioPduCount != null)
System.out.print(pduCount + " " + ++scenarioPduCount+"\r"); System.out.print(pduCount + " " + ++scenarioPduCount + "\r");
else else
System.out.print(pduCount+"\r"); System.out.print(pduCount + "\r");
showPduCountsOneTime = false; showPduCountsOneTime = false;
} }
private void byebye() throws IOException private void byebye() throws IOException
{ {
System.out.println("Replay stopped."); System.out.println("Replay stopped.");
closer();
// @formatter:off
try { dsock.close();brdr.close(); } catch (Exception ioex) {System.err.println("IOException closing reader in Player"); }
// @formatter:on
throw new IOException("Dis Replayer parsing error"); throw new IOException("Dis Replayer parsing error");
} }
private void closer()
{
try {
if (dsock != null) {
dsock.close();
dsock = null;
}
if (brdr != null) {
brdr.close();
brdr = null;
}
}
catch (Exception ioex) {
System.err.println("IOException closing reader in Player");
}
}
private boolean handleComment(String s, File f) //true if we're done private boolean handleComment(String s, File f) //true if we're done
{ {
boolean ret=false; boolean ret = false;
if(s.startsWith(START_COMMENT_MARKER)) { if (s.startsWith(START_COMMENT_MARKER)) {
//System.out.println(); //System.out.println();
s = s.substring(START_COMMENT_MARKER.length()); s = s.substring(START_COMMENT_MARKER.length());
System.out.println(s+" "); System.out.println(s + " ");
showPduCountsOneTime = true; // get the first one in there showPduCountsOneTime = true; // get the first one in there
} }
else if (s.startsWith(STOP_COMMENT_MARKER)) { else if (s.startsWith(STOP_COMMENT_MARKER)) {
showCounts(); showCounts();
System.out.println(); System.out.println();
System.out.println("End of replay from "+f.getName()); System.out.println("End of replay from " + f.getName());
System.out.println(s.substring(STOP_COMMENT_MARKER.length())); System.out.println(s.substring(STOP_COMMENT_MARKER.length()));
scenarioPduCount = 0; scenarioPduCount = 0;
startNanoTime=null; startNanoTime = null;
ret = true; ret = true;
} }
return ret; return ret;
} }
@SuppressWarnings("unused") public void startResume()
{
paused = false;
}
public void stopPause() public void stopPause()
{ {
paused = false;
}
public void end()
{
thrd.interrupt();
closer();
} }
public static void main(String[] args) public static void main(String[] args)
{ {
try { try {
//new Player("230.0.0.0", 3000, new File("./pdulog").toPath()).startResume(); //new Player("230.0.0.0", 3000, new File("./pdulog").toPath()).startResume();
new Player("230.0.0.0", 3000, new File("/Users/mike/NetbeansProjects/open-dis7-java/examples/pdulog").toPath()).startResume(); new Player("230.0.0.0", 3000, new File("/Users/mike/NetbeansProjects/open-dis7-java/examples/pdulog").toPath());
} }
catch(Exception ex) { catch (Exception ex) {
ex.printStackTrace(); ex.printStackTrace();
} }
} }
private static void sleep(long ms, int ns) private static void sleep(long ms, int ns)
{ {
// @formatter:off // @formatter:off
try { Thread.sleep(ms,ns); } catch (InterruptedException ex) { System.out.println("InterruptedException"); } try {
Thread.sleep(ms, ns);
}
catch (InterruptedException ex) {
System.out.println("InterruptedException");
}
// @formatter:on // @formatter:on
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment