From 2407d4bb08ff7e028ada4c3605d7c4b6d4fb9ef5 Mon Sep 17 00:00:00 2001 From: Conor Flynn Date: Wed, 5 Apr 2023 13:44:58 -0400 Subject: [PATCH] Integrate internalized heartbeat protocol --- .../java/org/out/handler/OutputManager.java | 1 - .../java/org/out/socket/SocketManager.java | 35 +++++++++++++++++-- .../src/main/java/org/properties/Config.java | 4 +-- 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java index fc834178..ea376f9a 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java @@ -117,7 +117,6 @@ public final Response send(String tag, String sub_tag, String... data) { public final synchronized void add(OutputDestination destination) { destinations.put(destination.getKey(), destination); - //destination.send(Packet.packet(handler, "", "", "Connected: " + destination.getKey())); } public final synchronized void remove(String key) { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java index cfca09cc..0751cb62 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java @@ -24,6 +24,8 @@ public class SocketManager { private static final HashMap inflow = new HashMap(); private static final HashMap outflow = new HashMap(); + private static final long HEARTBEAT_OFFSET = 5000L; + public synchronized static boolean createServer(int port) { if(servers.containsKey(port)) return true; @@ -83,16 +85,38 @@ public void run() { String sub_tag = input[1]; // retrieve destination - String destination = ""; + String temp_destination = ""; for(int i = 0; i < data.length; i++) { if(data[i].equals("destination") && data.length - 1 != i) - destination = data[i + 1]; + temp_destination = data[i + 1]; } // if no destination found then continue - if(destination.equals("")) + if(temp_destination.equals("")) continue; + final String destination = temp_destination; + + // create heartbeat connection + Thread heartbeat = new Thread() { + public void run() { + while(true) { + try { + Thread.sleep(HEARTBEAT_OFFSET); + producer.send("OUT", "EDAT", + "data", "<<>>", + "destination", destination); + } catch(Exception e) { + Logger.log(String.format("Heartbeat connection terminated for Socket with key <%s>", key)); + break; + } + } + } + }; + + // start heartbeat + heartbeat.start(); + // execute valid response to engine Response response = producer.send(tag, sub_tag, data); @@ -110,6 +134,11 @@ public void run() { .toString(), "destination", destination); + // terminate heartbeat + try { + heartbeat.interrupt(); + } catch(Exception e) {} + } catch(Exception e) { break; } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java index cbf67676..2c273ca8 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java @@ -16,8 +16,8 @@ public class Config { app_properties.put("general.collection.delim", "="); app_properties.put("general.transfer.delim", "&&&"); app_properties.put("general.data.dateformat", "yyyy-MM-dd"); - app_properties.put("general.logging.packets", "false"); - app_properties.put("general.logging.responses", "false"); + app_properties.put("general.logging.packets", "true"); + app_properties.put("general.logging.responses", "true"); properties.put("app", app_properties); Properties stream_properties = new Properties();