diff --git a/src/edu/nps/moves/dis7/utilities/DisThreadedNetIF.java b/src/edu/nps/moves/dis7/utilities/DisThreadedNetIF.java index 6fa1553398c4b9f8b8a86ca307d36ef947e0c6f1..c6885aef4e12d0e039252803827f5b334152b086 100644 --- a/src/edu/nps/moves/dis7/utilities/DisThreadedNetIF.java +++ b/src/edu/nps/moves/dis7/utilities/DisThreadedNetIF.java @@ -10,6 +10,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.*; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; @@ -72,10 +73,10 @@ public class DisThreadedNetIF } /* *********** queues and lists and public methods ************** */ - private final ArrayList<PduListener> everyTypeListeners = new ArrayList<>(); - private final HashMap<DISPDUType, ArrayList<PduListener>> typeListeners = new HashMap<>(); + private final List<PduListener> everyTypeListeners = new ArrayList<>(); + private final Map<DISPDUType, List<PduListener>> typeListeners = new HashMap<>(); - private final ArrayList<RawPduListener> rawListeners = new ArrayList<>(); + private final List<RawPduListener> rawListeners = new ArrayList<>(); private final LinkedBlockingQueue<Pdu> pdus2send = new LinkedBlockingQueue<>(); @@ -89,7 +90,7 @@ public class DisThreadedNetIF if (typ == null) addListener(lis); else { - ArrayList<PduListener> arLis = typeListeners.get(typ); + List<PduListener> arLis = typeListeners.get(typ); if (arLis == null) { arLis = new ArrayList<>(); typeListeners.put(typ, arLis); @@ -116,7 +117,7 @@ public class DisThreadedNetIF everyTypeListeners.remove(lis); typeListeners.entrySet().stream().forEach(entry -> { - ArrayList<PduListener> arLis = entry.getValue(); + List<PduListener> arLis = entry.getValue(); if (arLis.contains(lis)) arLis.remove(lis); }); @@ -182,22 +183,31 @@ public class DisThreadedNetIF int counter = 0; private Runnable receiveThread = () -> { - DatagramPacket packet; + + byte buffer[] = new byte[MAX_DIS_PDU_SIZE]; + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + InetAddress maddr; + InetSocketAddress group; + Pdu pdu; + ByteBuffer byteBuffer; + while (!killed) { // keep trying on error try { socket = new MulticastSocket(disPort); - InetAddress maddr = InetAddress.getByName(mcastGroup); - socket.setNetworkInterface(findIp4Interface()); - socket.joinGroup(maddr); + maddr = InetAddress.getByName(mcastGroup); + group = new InetSocketAddress(maddr, disPort); + socket.joinGroup(group, findIp4Interface()); while (!killed) { - byte buffer[] = new byte[MAX_DIS_PDU_SIZE]; - packet = new DatagramPacket(buffer, buffer.length); - + socket.receive(packet); //blocks here waiting for next DIS pdu to be received on multicast IP and specified port toRawListeners(packet.getData(), packet.getLength()); - Pdu pdu = pduFactory.createPdu(packet.getData()); + // Uses the NIO byte buffer class--wrap a ByteBuffer instance around + // the data we get from the packet + byteBuffer = ByteBuffer.wrap(packet.getData()); + pdu = pduFactory.createPdu(byteBuffer); + if (pdu != null) { counter++; // TODO experimental, add to generator as a commented-out diagnostic; consider adding diagnostic mode @@ -221,21 +231,31 @@ public class DisThreadedNetIF }; private final Runnable sendThread = () -> { + + InetAddress maddr; + Pdu pdu; + byte[] data; + DatagramPacket packet; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + while (!killed) { try { - InetAddress maddr = InetAddress.getByName(mcastGroup); + maddr = InetAddress.getByName(mcastGroup); while (!killed) { - Pdu pdu = pdus2send.take(); + pdu = pdus2send.take(); // turn object into byte stream - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); pdu.marshal(dos); - byte[] data = baos.toByteArray(); - // load byte buffer into packet and send - DatagramPacket packet = new DatagramPacket(data, data.length, maddr, disPort); + data = baos.toByteArray(); + + // Since we don't know the size of the PDU, reluctantly, we create + // new packet object each listen cycle + packet = new DatagramPacket(data, data.length, maddr, disPort); socket.send(packet); + + baos.reset(); } } @@ -259,7 +279,7 @@ public class DisThreadedNetIF { everyTypeListeners.stream().forEach(lis -> lis.incomingPdu(pdu)); if (pdu != null) { - ArrayList<PduListener> arLis = typeListeners.get(pdu.getPduType()); + List<PduListener> arLis = typeListeners.get(pdu.getPduType()); if (arLis != null) arLis.stream().forEach(lis -> lis.incomingPdu(pdu)); } @@ -294,11 +314,15 @@ public class DisThreadedNetIF private static NetworkInterface findIp4Interface() throws SocketException { Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces(); + NetworkInterface nif; + Enumeration<InetAddress> addresses; + InetAddress addr; + while (ifaces.hasMoreElements()) { - NetworkInterface nif = ifaces.nextElement(); - Enumeration<InetAddress> addresses = nif.getInetAddresses(); + nif = ifaces.nextElement(); + addresses = nif.getInetAddresses(); while (addresses.hasMoreElements()) { - InetAddress addr = addresses.nextElement(); + addr = addresses.nextElement(); if (addr instanceof Inet4Address && !addr.isLoopbackAddress()) { //System.out.println("Using network interface " + nif.getDisplayName()); return nif;