diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java index f503c2e1..f839548e 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java @@ -223,11 +223,11 @@ public final Response receive(Packet packet) { * All {@link Method} objects must contain a single parameter, a {@link Packet} object, * and return a {@link Response} object. * - * @param subtag Subtag of the process to handle the incoming {@link Packet} object. + * @param sub_tag Subtag of the process to handle the incoming {@link Packet} object. * @param method {@link Method} to pass the {@link Packet} object to. */ - public final void addProcess(String subtag, Method method) { - processes.put(subtag, method); + public final void addProcess(String sub_tag, Method method) { + processes.put(sub_tag, method); } /** diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java index 239bdd4a..2b23eab4 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java @@ -21,6 +21,7 @@ public final synchronized boolean send(Packet packet) { try { out.write(packet.getData("data").getBytes()); out.write(10); + out.flush(); } catch (JSONException | IOException e) { return false; } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java index 65ca05de..fc834178 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java @@ -111,7 +111,7 @@ public Object[] producerListen() { return new Object[] {true, ""}; } - protected final Response send(String tag, String sub_tag, String... data) { + public final Response send(String tag, String sub_tag, String... data) { return handler.send(tag, sub_tag, data); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java index d2fd265f..e29be73a 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java @@ -1,5 +1,6 @@ package org.out.producers; +import org.framework.router.Response; import org.out.destinations.SocketDestination; import org.out.handler.OutputManager; import org.out.handler.OutputProducer; @@ -9,6 +10,7 @@ public class SocketProducer extends OutputProducer { private Thread listener; + public final SocketProducer producer = this; public SocketProducer(OutputManager manager) { super(manager); @@ -24,7 +26,7 @@ protected boolean init() { listener = new Thread() { public void run() { while(true) { - String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port"))); + String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port")), producer); if(key == null) { System.err.println("SocketProducer: Could not create connection to socket port."); System.exit(1); @@ -55,4 +57,8 @@ protected boolean kill() { return true; } + + public Response send(String tag, String sub_tag, String... data) { + return manager.send(tag, sub_tag, data); + } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java index 7148a737..0d923738 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java @@ -5,11 +5,16 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.UUID; import org.core.logger.Logger; +import org.framework.router.Response; +import org.json.JSONObject; +import org.out.producers.SocketProducer; +import org.properties.Config; public class SocketManager { @@ -38,8 +43,69 @@ public synchronized static boolean exists(String key) { return connections.containsKey(key); } + public synchronized static void createThread(String key, SocketProducer producer) { + Thread thread = new Thread() { + public void run() { + // perform logger verifications + Logger.log(String.format("Starting thread for Socket with key <%s>", key)); + if(!inflow.containsKey(key)) { + Logger.terminate(String.format("Key <%s> not found within inflow thread configuration, manual review recommended.", key)); + return; + } + + if(!outflow.containsKey(key)) { + Logger.terminate(String.format("Key <%s> not found within outflow thread configuration, manual review recommended.", key)); + return; + } + + // retrieve inflow stream and listen + DataInputStream in = inflow.get(key); + DataOutputStream out = outflow.get(key); + String str; + while(true) + try { + str = readLine(in); + + // parse input + String[] input = str.split(Config.getProperty("app", "general.transfer.delim")); + + // validate input + if(input.length <= 2) { + out.writeUTF(new JSONObject() + .put("response", "502") + .put("message", "Packet processed from REST API does not contain a TAG or SUB_TAG. Review REST API endpoint code.") + .toString()); + } + + // extract non-essential data + String[] data = Arrays.copyOfRange(input, 2, input.length); + String tag = input[0]; + String sub_tag = input[1]; + + // execute valid response to engine + Response response = producer.send(tag, sub_tag, data); + out.writeUTF(new JSONObject() + .put("response", "200") + .put("code", response.code()) + .put("message", response.message()) + .put("data", response.data()) + .toString()); + out.flush(); + System.out.println("<<>>"); + + } catch(Exception e) { + break; + } + + Logger.log(String.format("Terminating thread for Socket with key <%s>", key)); + } + }; + + thread.start(); + } + // used for generic channel accepting - public synchronized static String accept(int port) { + public synchronized static String accept(int port, SocketProducer producer) { if(!servers.containsKey(port)) if(!createServer(port)) return null; @@ -68,6 +134,9 @@ public synchronized static String accept(int port) { if(!synced(key)) throw new Exception("Connection inflow and outflow not synchronized"); + // start internal thread for socket information parsing + createThread(key, producer); + return key; } catch(Exception e) { e.printStackTrace(); @@ -132,4 +201,13 @@ public static DataInputStream read(String key) { private static boolean synced(String key) { return connections.containsKey(key) && inflow.containsKey(key) && outflow.containsKey(key); } + + private static final String readLine(DataInputStream in) throws IOException { + StringBuilder out = new StringBuilder(); + char c = 0; + while((c = (char)in.read()) != 10) + out.append(c); + + return out.toString(); + } } diff --git a/DeFi-Data-Engine/Documentation/Internal-Documentation/Documentation.xlsx b/DeFi-Data-Engine/Documentation/Internal-Documentation/Documentation.xlsx index 801e6381..a5b44e77 100644 Binary files a/DeFi-Data-Engine/Documentation/Internal-Documentation/Documentation.xlsx and b/DeFi-Data-Engine/Documentation/Internal-Documentation/Documentation.xlsx differ diff --git a/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/Endpoint.java b/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/Endpoint.java index f03aabcc..e382e6d6 100644 --- a/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/Endpoint.java +++ b/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/Endpoint.java @@ -150,7 +150,6 @@ private final String request(String tag, String sub_tag, String... data) { formatted_data.append(delim); } - // static request out.writeUTF(String.format("%s%s%s%s%s", tag, delim, sub_tag, delim, formatted_data)); return in.readUTF(); } catch (IOException e) { diff --git a/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/LocalTest.java b/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/LocalTest.java index 1b3a2b35..6d7c931f 100644 --- a/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/LocalTest.java +++ b/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/LocalTest.java @@ -2,6 +2,7 @@ import java.io.BufferedReader; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; @@ -15,28 +16,15 @@ public class LocalTest { - private static final String host = "defi-de.idea.rpi.edu"; + private static final String host = "localhost"; public static void main(String[] args) throws UnknownHostException, IOException { final Socket socket = SocketFactory.getDefault().createSocket(host, 61200); - final DataInputStream in = new DataInputStream(socket.getInputStream()); + final BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + final DataOutputStream out = new DataOutputStream(socket.getOutputStream()); String destination = readLine(in); - Thread thread = new Thread() { - public void run() { - try { - while(true) { - System.out.println(readLine(in)); - } - } catch(Exception e) { - e.printStackTrace(); - System.exit(1); - } - } - }; - thread.start(); - String init = request(String.format("http://%s:8080/defi/v1/rest/initialize?" + "source=amber_data&" + "auth_data=key,UAK7ed69235426c360be22bfc2bde1809b6", @@ -45,34 +33,49 @@ public void run() { JSONObject json_init = new JSONObject(init); String key = json_init.getString("data"); - String rqst_protocol_dated = String.format("http://%s:8080/defi/v1/rest/request_dated?" - + "destination=%s&" - + "key=%s&" - + "request=request,aave-protocol-dated&" - + "query=aave-protocol-dated&" - + "start_date=%s&" - + "end_date=%s", - host, + out.writeBytes(String.format( + "SRC&&&RQST&&&" + + "destination&&&%s&&&" + + "key&&&%s&&&" + + "start_date&&&2022-08-01&&&" + + "end_date&&&2022-08-02&&&" + + "query&&&aave-protocol-dated&&&" + + "request&&&aave-protocol-dated\n", destination, - key, - "2022-08-01", - "2022-08-02"); + key)); - String rqst_asset_dated = String.format("http://%s:8080/defi/v1/rest/request_dated?" - + "destination=%s&" - + "key=%s&" - + "request=request,aave-protocol-dated&" - + "query=aave-protocol-dated&" - + "start_date=%s&" - + "end_date=%s", - host, - destination, - key, - "2022-08-01", - "2022-08-02"); + while(true) { + System.out.print(readLine(in)); + } - String rqst = request(rqst_asset_dated); - System.out.println(rqst); +// String rqst_protocol_dated = String.format("http://%s:8080/defi/v1/rest/request_dated?" +// + "destination=%s&" +// + "key=%s&" +// + "request=request,aave-protocol-dated&" +// + "query=aave-protocol-dated&" +// + "start_date=%s&" +// + "end_date=%s", +// host, +// destination, +// key, +// "2022-08-01", +// "2022-08-02"); +// +// String rqst_asset_dated = String.format("http://%s:8080/defi/v1/rest/request_dated?" +// + "destination=%s&" +// + "key=%s&" +// + "request=request,aave-protocol-dated&" +// + "query=aave-protocol-dated&" +// + "start_date=%s&" +// + "end_date=%s", +// host, +// destination, +// key, +// "2022-08-01", +// "2022-08-02"); +// +// String rqst = request(rqst_asset_dated); +// System.out.println(rqst); } public static String request(String str) throws IOException { @@ -99,7 +102,7 @@ public static String request(String str) throws IOException { return ""; } - public static final String readLine(DataInputStream in) throws IOException { + public static final String readLine(BufferedReader in) throws IOException { StringBuilder out = new StringBuilder(); char c = 0; while((c = (char)in.read()) != 10)