diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java index fc834178..ea376f9a 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java @@ -117,7 +117,6 @@ public final Response send(String tag, String sub_tag, String... data) { public final synchronized void add(OutputDestination destination) { destinations.put(destination.getKey(), destination); - //destination.send(Packet.packet(handler, "", "", "Connected: " + destination.getKey())); } public final synchronized void remove(String key) { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java index cfca09cc..0751cb62 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java @@ -24,6 +24,8 @@ public class SocketManager { private static final HashMap inflow = new HashMap(); private static final HashMap outflow = new HashMap(); + private static final long HEARTBEAT_OFFSET = 5000L; + public synchronized static boolean createServer(int port) { if(servers.containsKey(port)) return true; @@ -83,16 +85,38 @@ public void run() { String sub_tag = input[1]; // retrieve destination - String destination = ""; + String temp_destination = ""; for(int i = 0; i < data.length; i++) { if(data[i].equals("destination") && data.length - 1 != i) - destination = data[i + 1]; + temp_destination = data[i + 1]; } // if no destination found then continue - if(destination.equals("")) + if(temp_destination.equals("")) continue; + final String destination = temp_destination; + + // create heartbeat connection + Thread heartbeat = new Thread() { + public void run() { + while(true) { + try { + Thread.sleep(HEARTBEAT_OFFSET); + producer.send("OUT", "EDAT", + "data", "<<>>", + "destination", destination); + } catch(Exception e) { + Logger.log(String.format("Heartbeat connection terminated for Socket with key <%s>", key)); + break; + } + } + } + }; + + // start heartbeat + heartbeat.start(); + // execute valid response to engine Response response = producer.send(tag, sub_tag, data); @@ -110,6 +134,11 @@ public void run() { .toString(), "destination", destination); + // terminate heartbeat + try { + heartbeat.interrupt(); + } catch(Exception e) {} + } catch(Exception e) { break; }