Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history


into main

Conflicts:
	DeFi-Data-Engine/DeFi Data
Engine/src/main/java/org/out/handler/OutputHandler.java
	DeFi-Data-Engine/Rest
Application/src/main/java/org/rest/application/Endpoint.java
  • Loading branch information
conmf committed Sep 12, 2022
2 parents cc1a9cb + cb1214a commit b857ae4
Show file tree
Hide file tree
Showing 18 changed files with 225 additions and 52 deletions.
Binary file modified Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx
Binary file not shown.
4 changes: 4 additions & 0 deletions DeFi-Data-Engine/DeFi Data Engine/config/app.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# === GENERAL PROPERTIES ===

# delimiter used for internal processing
general.internal.delim=,.,
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class Packet {
private final String tag;
private final String sub_tag;
private final String data;
private final String sub_data;

/**
* Initializes a new {@link Packet} object.
Expand All @@ -23,12 +24,14 @@ public class Packet {
* @param tag Tag of the destination the {@link Packet} will be sent to.
* @param sub_tag Sub tag describing the action performed at the destination.
* @param data Data transmitted through the {@link Packet} for processing at the destination.
* @param sub_data Supporting data used for processing and transmitting from {@code data}. This parameter is optional.
*/
private Packet(Router router, String tag, String sub_tag, String data) {
private Packet(Router router, String tag, String sub_tag, String data, String sub_data) {
this.sender = router.getTag();
this.tag = tag;
this.sub_tag = sub_tag;
this.data = data;
this.sub_data = sub_data;
}

/**
Expand Down Expand Up @@ -67,6 +70,15 @@ public final String getData() {
return data;
}

/**
* Supporting data used for processing and transmitting from {@code data}.
*
* @return String containing all sub_data.
*/
public final String getSubData() {
return sub_data;
}

/**
* Factory method used to create a {@link Packet} object.
*
Expand All @@ -77,6 +89,20 @@ public final String getData() {
* @return New {@link Packet} object.
*/
public static Packet packet(Router router, String tag, String sub_tag, String data) {
return new Packet(router, tag, sub_tag, data);
return new Packet(router, tag, sub_tag, data, "");
}

/**
* Factory method used to create a {@link Packet} object.
*
* @param router {@link Router} object the {@link Packet} was sent from.
* @param tag Tag of the destination the {@link Packet} will be sent to.
* @param sub_tag Sub tag describing the action performed at the destination.
* @param data Data transmitted through the {@link Packet} for processing at the destination.
* @param sub_data Supporting data used for processing and transmitting from {@code data}.
* @return New {@link Packet} object.
*/
public static Packet packet(Router router, String tag, String sub_tag, String data, String sub_data) {
return new Packet(router, tag, sub_tag, data, sub_data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,16 @@ public static Response response460(String consumer) {
return Response.create(460, String.format("Output consumer <%s> failed to listen to consumption channel.", consumer));
}

public static Response response470(String consumer) {
return Response.create(470, String.format("Output producer <%s> failed to listen to production channel.", consumer));
public static Response response470(String producer) {
return Response.create(470, String.format("Output producer <%s> failed to listen to production channel.", producer));
}

public static Response response471(String key) {
return Response.create(471, String.format("Output manager does not contain destination with key <%s>", key));
}

public static Response response472(String key) {
return Response.create(472, String.format("Output producer <%s> failed to send data to external connection <%s>.", key));
}

public static Response response500(String loc, String parameter) {
Expand All @@ -99,4 +107,8 @@ public static Response response500(String loc, String parameter) {
public static Response response501() {
return Response.create(501, String.format("Fatal error occurred. This response should not be displayed."));
}

public static Response response502() {
return Response.create(502, String.format("Internal language failure. This error is commonly causes by a static protocol being treated as a live protocol or vice versa."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,24 @@ protected final Collection<String> connectedTags() {
* @return Integer representing the return code of the sent {@link Packet}.
*/
public final Response send(String tag, String sub_tag, String data) {
return send(tag, sub_tag, data, "");
}

/**
* Function used to send a {@link Packet} object to the desired destination.
*
* @param tag Tag Tag of the destination's {@link Router} object.
* @param sub_tag Sub tag describing the action performed at the destination.
* @param data Data transmitted through the {@link Packet} for processing at the destination.
* @param sub_data Supporting data used for processing and transmitting from {@code data}. This parameter is optional.
* @return Integer representing the return code of the sent {@link Packet}.
*/
public final Response send(String tag, String sub_tag, String data, String sub_data) {
if(manager == null)
return ResponseFactory.response400(this.getTag());

// create packet and push to receive method
Packet packet = Packet.packet(this, tag, sub_tag, data);
Packet packet = Packet.packet(this, tag, sub_tag, data, sub_data);
return manager.send(packet);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,31 @@ public void run() {

// listen for data packets from rest socket
while(true) {
String[] data = ((String)in.readUTF()).split(",.,");
String[] protocol = ProtocolDirectory.getProtocol(data[1]);
String[] data = ((String)in.readUTF()).split(Config.getProperty("app", "general.internal.delim"));
int protocol_index = 0, data_index = 0;
if(data.length == 2) {
protocol_index = 0;
data_index = 1;
}

else if(data.length == 3) {
protocol_index = 1;
data_index = 2;
}

else {
System.err.println("Invalid data format for SocketConsumer. Terminating.");
System.exit(1);
}

String[] protocol = ProtocolDirectory.getProtocol(data[protocol_index]);
if(protocol == null) {
out.writeUTF(new JSONObject()
.put("response", "403")
.put("message", "Protocol does not exist. Please reference documentation.")
.toString());
} else {
Response response = send(protocol[0], protocol[1], data[2]);
Response response = send(protocol[0], protocol[1], data[data_index], data[0]);
out.writeUTF(new JSONObject()
.put("response", "200")
.put("code", response.code())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ protected final Response send(String tag, String sub_tag, String data) {
return manager.send(tag, sub_tag, data);
}

protected final Response send(String tag, String sub_tag, String data, String sub_data) {
return manager.send(tag, sub_tag, data, sub_data);
}

protected abstract boolean init();
protected abstract boolean listen();
protected abstract boolean kill();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,18 @@ public Response processSTRT(Packet packet) {
}

public Response processEDAT(Packet packet) {
<<<<<<< HEAD

//manager.send();
return ResponseFactory.response0();
=======
if(!manager.containsDestination(packet.getSubData()))
return ResponseFactory.response471(packet.getSubData());

if(!manager.send(packet.getSubData(), packet))
return ResponseFactory.response472(packet.getSubData());

return ResponseFactory.response200("");
>>>>>>> branch 'main' of https://github.rpi.edu/DataINCITE/IDEA-DeFi-CRAFT
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ protected final Response send(String tag, String sub_tag, String data) {
return handler.send(tag, sub_tag, data);
}

protected final Response send(String tag, String sub_tag, String data, String sub_data) {
return handler.send(tag, sub_tag, data, sub_data);
}

public final synchronized void add(OutputDestination destination) {
destinations.put(destination.getKey(), destination);
destination.send(Packet.packet(handler, "", "", "Connected: " + destination.getKey()));
Expand All @@ -124,6 +128,10 @@ public final synchronized void remove(String key) {
destinations.remove(key);
}

public final boolean containsDestination(String key) {
return destinations.containsKey(key);
}

public final boolean send(String key, Packet packet) {
if(!destinations.containsKey(key))
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public boolean stop() {
}

@Override
public Object[] request(String data) {
public Object[] request(String destination, String data) {
return AmberDataRequestHandler.request(this.data, data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ public boolean isAuthorized() {
public void defineSubscriptionTypes() {
addSubscriptionType("correct");
addSubscriptionType("irregular");
addSubscriptionType("external");
}

@Override
public void defineRequestTypes() {
addRequestType("correct");
addRequestType("irregular");
addRequestType("external");
}

@Override
Expand Down Expand Up @@ -83,10 +85,19 @@ public boolean stop() {
}

@Override
public Object[] request(String data) {
public Object[] request(String destination, String data) {
if(data.equals("correct"))
return new Object[] {true, "Successful request"};
else
return new Object[] {false, "Request handled irregularly"};

else if(data.equals("external")) {
// submit 10 packets of data to external destination
for(int i = 0; i < 10; i++) {
this.process("test", destination, "" + i);
}

return new Object[] {true, "Successful request"};
}

return new Object[] {false, "Request handled irregularly"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public void init() {

}

public void process(String subscription, String data) {
manager.process(hash, subscription, data);
public void process(String subscription, String destination, String data) {
manager.process(hash, subscription, destination, data);
}

public final String getHash() {
Expand All @@ -52,7 +52,7 @@ public final String getHash() {
public abstract void defineRequestTypes();
public final void addRequestType(String type) { requestTypes.add(type); }
public final boolean containsRequestType(String type) { return requestTypes.contains(type); }
public abstract Object[] request(String request);
public abstract Object[] request(String destination, String request);

public abstract boolean start();
public abstract boolean stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ public Response processRQST(Packet packet) {
if(packet.getData().equals(""))
return ResponseFactory.response500("ExternalStreamHandler", "streamHash");

if(packet.getSubData().equals(""))
return ResponseFactory.response500("ExternalStreamHandler", "destination");

// extract hash from data
String request = packet.getData();
int splitIndex = request.indexOf(',');
Expand All @@ -174,11 +177,11 @@ public Response processRQST(Packet packet) {
if(!manager.containsRequestType(hash, request))
return ResponseFactory.response428(hash, request);

Object[] response = manager.request(hash, request);
Object[] response = manager.request(hash, packet.getSubData(), request);

if((Boolean)response[0])
return Response.create(200, "", String.format("%s", (String)response[1]));

return ResponseFactory.response429(hash, request, (String)response[1]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,18 +290,19 @@ protected boolean containsRequestType(String hash, String type) {
* If a stream with the given hash does not exist, this function returns false.
*
* @param hash Hash of the stream returned by the {@link ExternalStreamConnection#getHash(String)} function.
* @param destination Destination of the request to be processed by the {@link ExternalStreamManager#process(String, String, String)} function.
* @param request Request data used for processing the single request.
* @return Returns a string object containing all data returned by the request.
*/
protected Object[] request(String hash, String request) {
protected Object[] request(String hash, String destination, String request) {
if(!streams.containsKey(hash))
return new Object[] {false, null};

ExternalStreamConnection stream = streams.get(hash);
if(!stream.isAuthorized() || !stream.isReady())
return new Object[] {false, null};

return stream.request(request);
return stream.request(destination, request);
}

/**
Expand Down Expand Up @@ -355,7 +356,7 @@ protected boolean killStream(String hash) {
* @param subscription Subscription which the data was received by.
* @param data Data sent by the given subscription.
*/
protected void process(String hash, String subscription, String data) {
handler.send("SRC", "EDAT", String.format("%s, %s, %s", hash, subscription, data));
protected void process(String hash, String subscription, String destination, String data) {
handler.send("OUT", "EDAT", String.format("%s, %s, %s", hash, subscription, data), destination);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.framework.router.Packet;
import org.framework.router.Response;
import org.framework.router.ResponseFactory;
import org.framework.router.Router;

public class StreamRegistryController extends Router {
Expand All @@ -12,42 +11,42 @@ public StreamRegistryController() {
}

public Response processEXSR(Packet packet) {
return send("ESH", "EXSR", packet.getData());
return send("ESH", "EXSR", packet.getData(), packet.getSubData());
}

public Response processEXST(Packet packet) {
return send("ESH", "EXST", packet.getData());
return send("ESH", "EXST", packet.getData(), packet.getSubData());
}

public Response processINIT(Packet packet) {
return send("ESH", "INIT", packet.getData());
return send("ESH", "INIT", packet.getData(), packet.getSubData());
}

public Response processIATH(Packet packet) {
return send("ESH", "IATH", packet.getData());
return send("ESH", "IATH", packet.getData(), packet.getSubData());
}

public Response processIATV(Packet packet) {
return send("ESH", "IATV", packet.getData());
return send("ESH", "IATV", packet.getData(), packet.getSubData());
}

public Response processEXEC(Packet packet) {
return send("ESH", "EXEC", packet.getData());
return send("ESH", "EXEC", packet.getData(), packet.getSubData());
}

public Response processKILL(Packet packet) {
return send("ESH", "KILL", packet.getData());
return send("ESH", "KILL", packet.getData(), packet.getSubData());
}

public Response processSUBS(Packet packet) {
return send("ESH", "SUBS", packet.getData());
return send("ESH", "SUBS", packet.getData(), packet.getSubData());
}

public Response processEDAT(Packet packet) {
return ResponseFactory.response0();
return send("OUT", "EDAT", packet.getData(), packet.getSubData());
}

public Response processRQST(Packet packet) {
return send("ESH", "RQST", packet.getData());
return send("ESH", "RQST", packet.getData(), packet.getSubData());
}
}
Loading

0 comments on commit b857ae4

Please sign in to comment.