Skip to content

Commit

Permalink
Integrate internalized heartbeat protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
Conor Flynn committed Apr 5, 2023
1 parent 68e88e1 commit 70c90b5
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public final Response send(String tag, String sub_tag, String... data) {

public final synchronized void add(OutputDestination destination) {
destinations.put(destination.getKey(), destination);
//destination.send(Packet.packet(handler, "", "", "Connected: " + destination.getKey()));
}

public final synchronized void remove(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class SocketManager {
private static final HashMap<String, DataInputStream> inflow = new HashMap<String, DataInputStream>();
private static final HashMap<String, DataOutputStream> outflow = new HashMap<String, DataOutputStream>();

private static final long HEARTBEAT_OFFSET = 5000L;

public synchronized static boolean createServer(int port) {
if(servers.containsKey(port))
return true;
Expand Down Expand Up @@ -83,16 +85,38 @@ public void run() {
String sub_tag = input[1];

// retrieve destination
String destination = "";
String temp_destination = "";
for(int i = 0; i < data.length; i++) {
if(data[i].equals("destination") && data.length - 1 != i)
destination = data[i + 1];
temp_destination = data[i + 1];
}

// if no destination found then continue
if(destination.equals(""))
if(temp_destination.equals(""))
continue;

final String destination = temp_destination;

// create heartbeat connection
Thread heartbeat = new Thread() {
public void run() {
while(true) {
try {
Thread.sleep(HEARTBEAT_OFFSET);
producer.send("OUT", "EDAT",
"data", "<<<heartbeat>>>",
"destination", destination);
} catch(Exception e) {
Logger.log(String.format("Heartbeat connection terminated for Socket with key <%s>", key));
break;
}
}
}
};

// start heartbeat
heartbeat.start();

// execute valid response to engine
Response response = producer.send(tag, sub_tag, data);

Expand All @@ -110,6 +134,11 @@ public void run() {
.toString(),
"destination", destination);

// terminate heartbeat
try {
heartbeat.interrupt();
} catch(Exception e) {}

} catch(Exception e) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ public class Config {
app_properties.put("general.collection.delim", "=");
app_properties.put("general.transfer.delim", "&&&");
app_properties.put("general.data.dateformat", "yyyy-MM-dd");
app_properties.put("general.logging.packets", "false");
app_properties.put("general.logging.responses", "false");
app_properties.put("general.logging.packets", "true");
app_properties.put("general.logging.responses", "true");
properties.put("app", app_properties);

Properties stream_properties = new Properties();
Expand Down

0 comments on commit 70c90b5

Please sign in to comment.