diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java index f503c2e1..f839548e 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java @@ -223,11 +223,11 @@ public final Response receive(Packet packet) { * All {@link Method} objects must contain a single parameter, a {@link Packet} object, * and return a {@link Response} object. * - * @param subtag Subtag of the process to handle the incoming {@link Packet} object. + * @param sub_tag Subtag of the process to handle the incoming {@link Packet} object. * @param method {@link Method} to pass the {@link Packet} object to. */ - public final void addProcess(String subtag, Method method) { - processes.put(subtag, method); + public final void addProcess(String sub_tag, Method method) { + processes.put(sub_tag, method); } /** diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java index 239bdd4a..2b23eab4 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java @@ -21,6 +21,7 @@ public final synchronized boolean send(Packet packet) { try { out.write(packet.getData("data").getBytes()); out.write(10); + out.flush(); } catch (JSONException | IOException e) { return false; } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java index 65ca05de..fc834178 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java @@ -111,7 +111,7 @@ public Object[] producerListen() { return new Object[] {true, ""}; } - protected final Response send(String tag, String sub_tag, String... data) { + public final Response send(String tag, String sub_tag, String... data) { return handler.send(tag, sub_tag, data); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java index d2fd265f..e29be73a 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java @@ -1,5 +1,6 @@ package org.out.producers; +import org.framework.router.Response; import org.out.destinations.SocketDestination; import org.out.handler.OutputManager; import org.out.handler.OutputProducer; @@ -9,6 +10,7 @@ public class SocketProducer extends OutputProducer { private Thread listener; + public final SocketProducer producer = this; public SocketProducer(OutputManager manager) { super(manager); @@ -24,7 +26,7 @@ protected boolean init() { listener = new Thread() { public void run() { while(true) { - String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port"))); + String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port")), producer); if(key == null) { System.err.println("SocketProducer: Could not create connection to socket port."); System.exit(1); @@ -55,4 +57,8 @@ protected boolean kill() { return true; } + + public Response send(String tag, String sub_tag, String... data) { + return manager.send(tag, sub_tag, data); + } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java index 7148a737..0d923738 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java @@ -5,11 +5,16 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.UUID; import org.core.logger.Logger; +import org.framework.router.Response; +import org.json.JSONObject; +import org.out.producers.SocketProducer; +import org.properties.Config; public class SocketManager { @@ -38,8 +43,69 @@ public synchronized static boolean exists(String key) { return connections.containsKey(key); } + public synchronized static void createThread(String key, SocketProducer producer) { + Thread thread = new Thread() { + public void run() { + // perform logger verifications + Logger.log(String.format("Starting thread for Socket with key <%s>", key)); + if(!inflow.containsKey(key)) { + Logger.terminate(String.format("Key <%s> not found within inflow thread configuration, manual review recommended.", key)); + return; + } + + if(!outflow.containsKey(key)) { + Logger.terminate(String.format("Key <%s> not found within outflow thread configuration, manual review recommended.", key)); + return; + } + + // retrieve inflow stream and listen + DataInputStream in = inflow.get(key); + DataOutputStream out = outflow.get(key); + String str; + while(true) + try { + str = readLine(in); + + // parse input + String[] input = str.split(Config.getProperty("app", "general.transfer.delim")); + + // validate input + if(input.length <= 2) { + out.writeUTF(new JSONObject() + .put("response", "502") + .put("message", "Packet processed from REST API does not contain a TAG or SUB_TAG. Review REST API endpoint code.") + .toString()); + } + + // extract non-essential data + String[] data = Arrays.copyOfRange(input, 2, input.length); + String tag = input[0]; + String sub_tag = input[1]; + + // execute valid response to engine + Response response = producer.send(tag, sub_tag, data); + out.writeUTF(new JSONObject() + .put("response", "200") + .put("code", response.code()) + .put("message", response.message()) + .put("data", response.data()) + .toString()); + out.flush(); + System.out.println("<<>>"); + + } catch(Exception e) { + break; + } + + Logger.log(String.format("Terminating thread for Socket with key <%s>", key)); + } + }; + + thread.start(); + } + // used for generic channel accepting - public synchronized static String accept(int port) { + public synchronized static String accept(int port, SocketProducer producer) { if(!servers.containsKey(port)) if(!createServer(port)) return null; @@ -68,6 +134,9 @@ public synchronized static String accept(int port) { if(!synced(key)) throw new Exception("Connection inflow and outflow not synchronized"); + // start internal thread for socket information parsing + createThread(key, producer); + return key; } catch(Exception e) { e.printStackTrace(); @@ -132,4 +201,13 @@ public static DataInputStream read(String key) { private static boolean synced(String key) { return connections.containsKey(key) && inflow.containsKey(key) && outflow.containsKey(key); } + + private static final String readLine(DataInputStream in) throws IOException { + StringBuilder out = new StringBuilder(); + char c = 0; + while((c = (char)in.read()) != 10) + out.append(c); + + return out.toString(); + } } diff --git a/DeFi-Data-Engine/Documentation/Internal-Documentation/Documentation.xlsx b/DeFi-Data-Engine/Documentation/Internal-Documentation/Documentation.xlsx index 801e6381..a5b44e77 100644 Binary files a/DeFi-Data-Engine/Documentation/Internal-Documentation/Documentation.xlsx and b/DeFi-Data-Engine/Documentation/Internal-Documentation/Documentation.xlsx differ diff --git a/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/Endpoint.java b/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/Endpoint.java index f03aabcc..e382e6d6 100644 --- a/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/Endpoint.java +++ b/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/Endpoint.java @@ -150,7 +150,6 @@ private final String request(String tag, String sub_tag, String... data) { formatted_data.append(delim); } - // static request out.writeUTF(String.format("%s%s%s%s%s", tag, delim, sub_tag, delim, formatted_data)); return in.readUTF(); } catch (IOException e) {