Skip to content

Commit

Permalink
Integrate ESH and LSH to communicate properly
Browse files Browse the repository at this point in the history
  • Loading branch information
Conor Flynn committed Sep 25, 2022
1 parent c390be4 commit 3372fa5
Show file tree
Hide file tree
Showing 21 changed files with 446 additions and 89 deletions.
Binary file modified Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx
Binary file not shown.
8 changes: 7 additions & 1 deletion DeFi-Data-Engine/DeFi Data Engine/config/app.properties
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
# === GENERAL PROPERTIES ===

# delimiter used for internal processing
general.internal.delim=,.,
general.internal.delim=:::

# data delimiter
general.data.delim=,

# collection delimiter
general.collection.delim==

# date time formatter for data intake
general.data.dateformat=dd-MM-yyyy

# enable all packet logging
general.logging.packets=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ public static final void log(Response response) {
System.out.println(responseFormat(response));
}

public static final void warn(Packet packet) {
System.err.println(packetFormat(packet));
}

public static final void warn(Response response) {
System.err.println(responseFormat(response));
}

public static final void terminate(Packet packet) {
System.err.println(packetFormat(packet));
System.exit(1);
Expand All @@ -38,10 +46,11 @@ private static final String packetFormat(Packet packet) {
}

private static final String responseFormat(Response response) {
return String.format("[%s] [%-10s] RESPONSE - [%3d] [%s]",
return String.format("[%s] [%-10s] RESPONSE - [%3d] [%s] [%s]",
time(),
Thread.currentThread().getName(),
response.code(),
response.message(),
response.data());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public static Response response471(String key) {
}

public static Response response472(String key) {
return Response.create(472, String.format("Output producer <%s> failed to send data to external connection <%s>.", key));
return Response.create(472, String.format("Output producer failed to send data to external connection <%s>.", key));
}

public static Response response500(String loc, String parameter) {
Expand All @@ -154,7 +154,15 @@ public static Response response501() {
return Response.create(501, String.format("Fatal error occurred. This response should not be displayed."));
}

public static Response response501(String message) {
return Response.create(501, String.format("Fatal error occurred. This response should not be displayed. Message: <%s>", message));
}

public static Response response502() {
return Response.create(502, String.format("Internal language failure. This error is commonly causes by a static protocol being treated as a live protocol or vice versa."));
}

public static Response response503(String format, String... dates) {
return Response.create(503, String.format("Local data stream failed to process date of the format <%s> from given strings <%s>", format, Arrays.toString(dates)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public Response processSTRT(Packet packet) {
}

public Response processEDAT(Packet packet) {
if(packet.getSubData().equals("null"))
return ResponseFactory.response200();

if(!manager.containsDestination(packet.getSubData()))
return ResponseFactory.response471(packet.getSubData());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ public static final void setProperty(String name, String property, String value)

public static final void validate(String name, String... keys) {
if(!properties.containsKey(name)) {
System.err.println(String.format("Property file <%s> does not exist. Program terminating.", name));
new IllegalArgumentException(String.format("Property file <%s> does not exist. Program terminating.", name)).printStackTrace();
System.exit(1);
}

for(String key : keys)
if(!properties.get(name).containsKey(key)) {
System.err.println(String.format("Missing property <%s> in file <%s>. Program terminating.", key, name));
new IllegalArgumentException(String.format("Missing property <%s> in file <%s>. Program terminating.", key, name)).printStackTrace();
System.exit(1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,10 @@ public boolean stop() {
public Object[] request(String destination, String data) {
return AmberDataRequestHandler.request(this.data, data);
}

@Override
public Object[] request(String destination, String request, String date) {
// TODO Auto-generated method stub
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.stream.external.connected.connections;

import java.time.format.DateTimeFormatter;

import org.properties.Config;
import org.stream.external.handler.ExternalStreamConnection;
import org.stream.external.handler.ExternalStreamManager;

Expand All @@ -9,6 +12,8 @@ public class TemplateExternalConnection extends ExternalStreamConnection {
private boolean authorized = false;
private boolean override = false;

private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.getProperty("app", "general.data.dateformat"));

public TemplateExternalConnection(ExternalStreamManager manager, String data) {
super(manager, data.split(",")[0]);

Expand Down Expand Up @@ -50,6 +55,7 @@ public void defineRequestTypes() {
addRequestType("correct");
addRequestType("irregular");
addRequestType("external");
addRequestType("template-external-request");
}

@Override
Expand Down Expand Up @@ -85,19 +91,27 @@ public boolean stop() {
}

@Override
public Object[] request(String destination, String data) {
if(data.equals("correct"))
return new Object[] {true, "Successful request"};

else if(data.equals("external")) {
// submit 10 packets of data to external destination
for(int i = 0; i < 10; i++) {
this.process("test", destination, "" + i);
public Object[] request(String destination, String data, String date) {
if(date == null) {
if(data.equals("correct"))
return new Object[] {true, "success"};

else if(data.equals("template-external-request")) {
for(int i = 0; i < 3; i++)
this.processRequest(data, date, destination, String.format("element, %d, mult7, %d", i, i * 7));
return new Object[] {true, "success"};
}
} else {
if(data.equals("correct"))
return new Object[] {true, "success"};

return new Object[] {true, "Successful request"};
else if(data.equals("template-external-request")) {
for(int i = 0; i < 3; i++)
this.processRequest(data, date, destination, String.format("element, %d, mult10, %d", i, i * 10));
return new Object[] {true, "success"};
}
}

return new Object[] {false, "Request handled irregularly"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import org.framework.interfaces.Hash;
import org.framework.interfaces.UUID;
import org.properties.Config;

public abstract class ExternalStreamConnection implements UUID, Hash {

Expand Down Expand Up @@ -31,8 +32,22 @@ public void init() {

}

public void process(String subscription, String destination, String data) {
manager.process(hash, subscription, destination, data);
public void processSubscription(String subscription, String destination, String data) {
manager.processSubscription(hash, subscription, destination, data);
}

public void processRequest(String request, String date, String destination, String data) {
if(date != null)
manager.processRequest(hash,
getUUID() + Config.getProperty("app", "general.collection.delim") + request + Config.getProperty("app", "general.collection.delim") + date,
destination,
data);

else
manager.processRequest(hash,
getUUID() + Config.getProperty("app", "general.collection.delim") + request,
destination,
data);
}

public final String getHash() {
Expand All @@ -52,7 +67,8 @@ public final String getHash() {
public abstract void defineRequestTypes();
public final void addRequestType(String type) { requestTypes.add(type); }
public final boolean containsRequestType(String type) { return requestTypes.contains(type); }
public abstract Object[] request(String destination, String request);
public Object[] request(String destination, String request) { return request(destination, request, null); }
public abstract Object[] request(String destination, String request, String date);

public abstract boolean start();
public abstract boolean stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.framework.router.Response;
import org.framework.router.ResponseFactory;
import org.framework.router.Router;
import org.properties.Config;

public final class ExternalStreamHandler extends Router {

Expand Down Expand Up @@ -63,7 +64,7 @@ public Response processINIT(Packet packet) {

// if successful authorize
if(authorized)
return ResponseFactory.response200(String.format("true, %s", hash));
return ResponseFactory.response200(String.format("%s", hash));

manager.removeStream(hash);
return ResponseFactory.response422(template);
Expand Down Expand Up @@ -152,36 +153,57 @@ public Response processSUBS(Packet packet) {

public Response processRQST(Packet packet) {
if(packet.getData().equals(""))
return ResponseFactory.response500("ExternalStreamHandler", "streamHash");
return ResponseFactory.response500("ExternalStreamHandler", "key, query");

if(packet.getSubData().equals(""))
return ResponseFactory.response500("ExternalStreamHandler", "destination");

// extract hash from data
String request = packet.getData();
int splitIndex = request.indexOf(',');
String hash = "";
if(splitIndex != -1) {
hash = request.substring(0, splitIndex).trim();
request = request.substring(splitIndex + 1).trim();
} else {
return ResponseFactory.response500("ExternalStreamHandler", "requestType");
}
String[] data = packet.getData().split(Config.getProperty("app", "general.internal.delim"));

// standard request length is 2:
// key,.,request
if(data.length != 2)
return ResponseFactory.response501("Request is not formatted properly.");

if(data[0].isEmpty() || data[1].isEmpty())
return ResponseFactory.response500("ExternalStreamHandler", "key, query");

String hash = data[0];
String[] request = data[1].split(Config.getProperty("app", "general.collection.delim"));
if(!manager.containsStream(hash))
return ResponseFactory.response421(hash);

if(!manager.isStreamReady(hash))
return ResponseFactory.response423(hash);

if(!manager.containsRequestType(hash, request))
return ResponseFactory.response428(hash, request);

Object[] response = manager.request(hash, packet.getSubData(), request);
if(!manager.containsRequestType(hash, request[1]))
return ResponseFactory.response428(hash, request[1]);

Object[] response;
if(request.length == 2)
response = manager.request(hash, packet.getSubData(), request[1]);
else
response = manager.request(hash, packet.getSubData(), request[1], request[2]);

if((Boolean)response[0])
return ResponseFactory.response200(String.format("%s", (String)response[1]));

return ResponseFactory.response429(hash, request, (String)response[1]);
return ResponseFactory.response429(hash, request[0], (String)response[1]);
}

public Response processTYPE(Packet packet) {
if(packet.getData().equals(""))
return ResponseFactory.response500("ExternalStreamHandler", "key");

String key = packet.getData();
if(!manager.containsStream(key))
return ResponseFactory.response421(key);

String uuid = manager.getStreamType(key);
if(uuid == null)
return ResponseFactory.response501("Stream was removed in different thread mid observation.");

return ResponseFactory.response200(uuid);

}
}
Loading

0 comments on commit 3372fa5

Please sign in to comment.