Skip to content
Snippets Groups Projects
Commit 56b2a580 authored by J. M. Bailey's avatar J. M. Bailey
Browse files

Remove singleton plumbing, add raw listener code

parent d1e72b85
No related branches found
No related tags found
No related merge requests found
...@@ -25,30 +25,32 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -25,30 +25,32 @@ import java.util.concurrent.LinkedBlockingQueue;
*/ */
public class DisThreadedNetIF public class DisThreadedNetIF
{ {
/* ********** listener interface *********** */ /* ********** Pdu listener interface *********** */
public interface PduListener public interface PduListener
{ {
void incomingPdu(Pdu pdu); void incomingPdu(Pdu pdu);
} }
/* ********** singleton plumbing *************** */ /* ***** Raw pdu listener class and interface ******** */
private static DisThreadedNetIF instance; public class BuffAndLength
public static DisThreadedNetIF inst()
{ {
if (instance == null) public byte[] buff;
instance = new DisThreadedNetIF(); public int length;
return instance;
}
public static DisThreadedNetIF inst(int port, String mcastgroup) public BuffAndLength(byte[] buff, int length)
{
this.buff = buff;
this.length = length;
}
}
public interface RawPduListener
{ {
if (instance == null) void incomingPdu(BuffAndLength bAndL);
instance = new DisThreadedNetIF(port, mcastgroup);
return instance;
} }
/* *********** class instanciation ************* */ /************ Begin class ***************/
public static int DEFAULT_DIS_PORT = 3000; public static int DEFAULT_DIS_PORT = 3000;
public static String DEFAULT_MCAST_GROUP = "225.4.5.6"; public static String DEFAULT_MCAST_GROUP = "225.4.5.6";
private static final int MAX_DIS_PDU_SIZE = 8192; private static final int MAX_DIS_PDU_SIZE = 8192;
...@@ -57,12 +59,12 @@ public class DisThreadedNetIF ...@@ -57,12 +59,12 @@ public class DisThreadedNetIF
private String mcastGroup; private String mcastGroup;
private boolean killed = false; private boolean killed = false;
private DisThreadedNetIF() public DisThreadedNetIF()
{ {
this(DEFAULT_DIS_PORT, DEFAULT_MCAST_GROUP); this(DEFAULT_DIS_PORT, DEFAULT_MCAST_GROUP);
} }
private DisThreadedNetIF(int port, String mcastgroup) public DisThreadedNetIF(int port, String mcastgroup)
{ {
disPort = port; disPort = port;
mcastGroup = mcastgroup; mcastGroup = mcastgroup;
...@@ -72,8 +74,16 @@ public class DisThreadedNetIF ...@@ -72,8 +74,16 @@ public class DisThreadedNetIF
/* *********** queues and lists and public methods ************** */ /* *********** queues and lists and public methods ************** */
private final ArrayList<PduListener> everyTypeListeners = new ArrayList<>(); private final ArrayList<PduListener> everyTypeListeners = new ArrayList<>();
private final HashMap<DISPDUType, ArrayList<PduListener>> typeListeners = new HashMap<>(); private final HashMap<DISPDUType, ArrayList<PduListener>> typeListeners = new HashMap<>();
private final ArrayList<RawPduListener> rawListeners = new ArrayList<>();
private final LinkedBlockingQueue<Pdu> pdus2send = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue<Pdu> pdus2send = new LinkedBlockingQueue<>();
/**
* Add a listener to accept only pdus of the given typ
* @param lis instance of PduListener
* @param typ Pdu type
*/
public void addListener(PduListener lis, DISPDUType typ) public void addListener(PduListener lis, DISPDUType typ)
{ {
if (typ == null) if (typ == null)
...@@ -87,12 +97,20 @@ public class DisThreadedNetIF ...@@ -87,12 +97,20 @@ public class DisThreadedNetIF
arLis.add(lis); arLis.add(lis);
} }
} }
/**
* Add a listener to accept all pdu types
* @param lis instance implementing the PduListener interface
*/
public void addListener(PduListener lis) public void addListener(PduListener lis)
{ {
everyTypeListeners.add(lis); everyTypeListeners.add(lis);
} }
/**
* Remove previously added listener
* @param lis instance implementing the PduListener interface
*/
public void removeListener(PduListener lis) public void removeListener(PduListener lis)
{ {
everyTypeListeners.remove(lis); everyTypeListeners.remove(lis);
...@@ -103,7 +121,25 @@ public class DisThreadedNetIF ...@@ -103,7 +121,25 @@ public class DisThreadedNetIF
arLis.remove(lis); arLis.remove(lis);
}); });
} }
/**
* Add a listener to accept pdus of all types in the form of a byte array
* @param lis instance implementing the RawPduListener interface
*/
public void addRawListener(RawPduListener lis)
{
rawListeners.add(lis);
}
/**
* Remove previously add raw listener
* @param lis
*/
public void removeRawListener(RawPduListener lis)
{
rawListeners.remove(lis);
}
public int getDisPort() public int getDisPort()
{ {
return disPort; return disPort;
...@@ -114,6 +150,10 @@ public class DisThreadedNetIF ...@@ -114,6 +150,10 @@ public class DisThreadedNetIF
return mcastGroup; return mcastGroup;
} }
/**
* Sent the given pdu to the network, using the ip and port given to the constructor
* @param pdu
*/
public void send(Pdu pdu) public void send(Pdu pdu)
{ {
pdus2send.add(pdu); pdus2send.add(pdu);
...@@ -139,7 +179,7 @@ public class DisThreadedNetIF ...@@ -139,7 +179,7 @@ public class DisThreadedNetIF
sender.setPriority(Thread.NORM_PRIORITY); sender.setPriority(Thread.NORM_PRIORITY);
sender.start(); sender.start();
} }
private Runnable receiveThread = () -> { private Runnable receiveThread = () -> {
DatagramPacket packet; DatagramPacket packet;
while (!killed) { // keep trying on error while (!killed) { // keep trying on error
...@@ -148,11 +188,14 @@ public class DisThreadedNetIF ...@@ -148,11 +188,14 @@ public class DisThreadedNetIF
InetAddress maddr = InetAddress.getByName(mcastGroup); InetAddress maddr = InetAddress.getByName(mcastGroup);
socket.setNetworkInterface(findIp4Interface()); socket.setNetworkInterface(findIp4Interface());
socket.joinGroup(maddr); socket.joinGroup(maddr);
while (!killed) { while (!killed) {
byte buffer[] = new byte[MAX_DIS_PDU_SIZE]; byte buffer[] = new byte[MAX_DIS_PDU_SIZE];
packet = new DatagramPacket(buffer, buffer.length); packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet); //blocks here waiting for next DIS pdu to be received on multicast IP and specified port socket.receive(packet); //blocks here waiting for next DIS pdu to be received on multicast IP and specified port
toRawListeners(packet.getData(), packet.getLength());
Pdu pdu = pduFactory.createPdu(packet.getData()); Pdu pdu = pduFactory.createPdu(packet.getData());
if (pdu != null) if (pdu != null)
toListeners(pdu); toListeners(pdu);
...@@ -210,12 +253,22 @@ public class DisThreadedNetIF ...@@ -210,12 +253,22 @@ public class DisThreadedNetIF
private void toListeners(Pdu pdu) private void toListeners(Pdu pdu)
{ {
everyTypeListeners.stream().forEach(lis -> lis.incomingPdu(pdu)); everyTypeListeners.stream().forEach(lis -> lis.incomingPdu(pdu));
if (pdu != null) {
ArrayList<PduListener> arLis = typeListeners.get(pdu.getPduType()); ArrayList<PduListener> arLis = typeListeners.get(pdu.getPduType());
if (arLis != null) if (arLis != null)
arLis.stream().forEach(lis -> lis.incomingPdu(pdu)); arLis.stream().forEach(lis -> lis.incomingPdu(pdu));
}
} }
private void toRawListeners(byte[]data, int len)
{
if(rawListeners.isEmpty())
return;
BuffAndLength bl = new BuffAndLength(data,len);
rawListeners.forEach(lis->lis.incomingPdu(bl));
}
public void kill() public void kill()
{ {
killed = true; killed = true;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment