Skip to content
Snippets Groups Projects
Commit 9b3c987e authored by terry-norbraten's avatar terry-norbraten
Browse files

further reduce object creation in loops and consolidate to one socket

for both send/received. Modeled after DMg's efficient 
BehaviorProducerUDP
parent 809a9b93
No related branches found
No related tags found
No related merge requests found
......@@ -6,11 +6,12 @@ package edu.nps.moves.dis7.utilities;
import edu.nps.moves.dis7.Pdu;
import edu.nps.moves.dis7.enumerations.DISPDUType;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
......@@ -67,6 +68,9 @@ public class DisThreadedNetIF
/** 8192: This has actually been superseded by a larger buffer size, but good enough for now */
public static final int MAX_DIS_PDU_SIZE = 8192;
/** 1500: Rough size of an ethernet frame */
public static final int MAX_TRANSMISSION_UNIT_SIZE = 1500;
private int disPort;
private String mcastGroup;
private boolean killed = false;
......@@ -74,8 +78,7 @@ public class DisThreadedNetIF
private InetAddress maddr;
private InetSocketAddress group;
private NetworkInterface ni;
private MulticastSocket ssocket = null;
private MulticastSocket rsocket = null;
private MulticastSocket socket = null;
/**
* Default constructor using default port 3000 and multicast address 225.4.5.6
......@@ -209,102 +212,86 @@ public class DisThreadedNetIF
sender.start();
}
private Runnable receiveThread = () -> {
int counter = 0;
byte buffer[] = new byte[MAX_DIS_PDU_SIZE];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
Pdu pdu;
while (!killed) { // keep trying on error
try {
rsocket = new MulticastSocket(disPort);
rsocket.joinGroup(group, ni);
while (!killed) {
rsocket.receive(packet); //blocks here waiting for next DIS pdu to be received on multicast IP and specified port
toRawListeners(packet.getData(), packet.getLength());
// the PduFactory will wrap data in a ByteBuffer
pdu = pduFactory.createPdu(packet.getData());
if (pdu != null)
{
counter++; // TODO experimental, add to generator as a commented-out diagnostic; consider adding diagnostic mode
System.err.println(counter + ". received " + pdu.getPduType().name());
toListeners(pdu);
}
}
}
catch (IOException ex) {
System.err.println("Exception in DISThreadedNetIF receive thread: " + ex.getLocalizedMessage());
System.err.println("Retrying in 1 second");
} finally {
if (rsocket != null && !rsocket.isClosed()) {
try {
rsocket.leaveGroup(group, ni);
} catch (IOException ex) {
Logger.getLogger(DisThreadedNetIF.class.getName()).log(Level.SEVERE, null, ex);
}
rsocket.close();
rsocket = null;
}
}
private Runnable receiveThread = () -> {
int counter = 0;
// The capacity could go up to MAX_DIS_PDU_SIZE, but this should be good for now
// The raw listeners will strip off any extra padding and process what is
// required
ByteBuffer buffer = ByteBuffer.allocate(MAX_TRANSMISSION_UNIT_SIZE);
DatagramPacket packet = new DatagramPacket(buffer.array(), buffer.capacity());
Pdu pdu;
while (!killed) { // keep trying on error
// If something trips up with the socket, this thread will
// re-establish for both send/receive threads
try {
socket = new MulticastSocket(disPort);
socket.joinGroup(group, ni);
while (!killed) {
socket.receive(packet); // blocks here waiting for next DIS pdu to be received on multicast IP and specified port
toRawListeners(packet.getData(), packet.getLength());
pdu = pduFactory.createPdu(buffer);
if (pdu != null) {
counter++; // TODO experimental, add to generator as a commented-out diagnostic; consider adding diagnostic mode
System.err.println(counter + ". received " + pdu.getPduType().name());
toListeners(pdu);
}
buffer.clear();
}
} catch (IOException ex) {
System.err.println("Exception in DISThreadedNetIF receive thread: " + ex.getLocalizedMessage());
System.err.println("Retrying in 1 second");
} finally {
if (socket != null && !socket.isClosed()) {
try {
socket.leaveGroup(group, ni);
} catch (IOException ex) {
Logger.getLogger(DisThreadedNetIF.class.getName()).log(Level.SEVERE, null, ex);
}
socket.close();
socket = null;
}
}
// if (!killed)
// sleep(250);
}
};
private final Runnable sendThread = () -> {
Pdu pdu;
byte[] data;
DatagramPacket packet;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
while (!killed) { // keep trying on error
try {
ssocket = new MulticastSocket(disPort);
ssocket.joinGroup(group, ni);
while (!killed) {
pdu = pdus2send.take();
// turn object into byte stream
pdu.marshal(dos);
data = baos.toByteArray();
// Since we don't know the size of the PDU, reluctantly, we create a
// new packet here when we finally know
packet = new DatagramPacket(data, data.length, maddr, disPort);
ssocket.send(packet);
baos.reset();
}
}
catch (Exception ex) {
System.err.println("Exception in DISThreadedNetIF send thread: " + ex.getLocalizedMessage());
System.err.println("Retrying in 1 second");
} finally {
if (ssocket != null && !ssocket.isClosed()) {
try {
ssocket.leaveGroup(group, ni);
} catch (IOException ex) {
Logger.getLogger(DisThreadedNetIF.class.getName()).log(Level.SEVERE, null, ex);
}
ssocket.close();
ssocket = null;
}
}
// if (!killed)
// sleep(250);
}
try {
dos.close();
} catch (IOException e) {}
};
};
private final Runnable sendThread = () -> {
Pdu pdu;
// The capacity could go up to MAX_DIS_PDU_SIZE, but this should be good for now
ByteArrayOutputStream baos = new ByteArrayOutputStream(MAX_TRANSMISSION_UNIT_SIZE);
DataOutputStream dos = new DataOutputStream(baos);
DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(), maddr, disPort);
while (!killed) { // keep trying on error
try {
while (!killed) {
pdu = pdus2send.take();
pdu.marshal(dos);
packet.setData(baos.toByteArray());
socket.send(packet);
baos.reset();
}
} catch (Exception ex) {
System.err.println("Exception in DISThreadedNetIF send thread: " + ex.getLocalizedMessage());
}
}
try {
dos.close();
} catch (IOException e) {}
};
private void toListeners(Pdu pdu)
{
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment