From 68e88e16762a030ae4943da633b5d560f7f1a83b Mon Sep 17 00:00:00 2001 From: Conor Flynn Date: Sun, 2 Apr 2023 16:31:22 -0400 Subject: [PATCH] add in response handling and output for client processing --- DeFi-Data-Engine/DeFi Data Engine/.classpath | 12 +-- .../java/org/out/socket/SocketManager.java | 81 ++++++++++++------- .../registry/StreamRegistryController.java | 2 +- 3 files changed, 55 insertions(+), 40 deletions(-) diff --git a/DeFi-Data-Engine/DeFi Data Engine/.classpath b/DeFi-Data-Engine/DeFi Data Engine/.classpath index b2e8bc05..2f837501 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/.classpath +++ b/DeFi-Data-Engine/DeFi Data Engine/.classpath @@ -25,11 +25,6 @@ - - - - - @@ -38,20 +33,21 @@ + - + + - - + diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java index 062c7e84..cfca09cc 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java @@ -62,38 +62,57 @@ public void run() { DataInputStream in = inflow.get(key); DataOutputStream out = outflow.get(key); String str; - while(true) - try { - str = readLine(in); - - // parse input - String[] input = str.split(Config.getProperty("app", "general.transfer.delim")); - - // validate input - if(input.length <= 2) { - out.writeUTF(new JSONObject() - .put("response", "502") - .put("message", "Packet processed from REST API does not contain a TAG or SUB_TAG. Review REST API endpoint code.") - .toString()); + while(true) { + try { + str = readLine(in); + + // parse input + String[] input = str.split(Config.getProperty("app", "general.transfer.delim")); + + // validate input + if(input.length <= 2) { + out.writeUTF(new JSONObject() + .put("response", "502") + .put("message", "Packet processed from REST API does not contain a TAG or SUB_TAG. Review REST API endpoint code.") + .toString()); + } + + // extract non-essential data + String[] data = Arrays.copyOfRange(input, 2, input.length); + String tag = input[0]; + String sub_tag = input[1]; + + // retrieve destination + String destination = ""; + for(int i = 0; i < data.length; i++) { + if(data[i].equals("destination") && data.length - 1 != i) + destination = data[i + 1]; + } + + // if no destination found then continue + if(destination.equals("")) + continue; + + // execute valid response to engine + Response response = producer.send(tag, sub_tag, data); + + // send response signifier + producer.send("OUT", "EDAT", + "data", "<<>>", + "destination", destination); + // send response details + producer.send("OUT", "EDAT", + "data", new JSONObject() + .put("response", "200") + .put("code", response.code()) + .put("message", response.message()) + .put("data", response.data()) + .toString(), + "destination", destination); + + } catch(Exception e) { + break; } - - // extract non-essential data - String[] data = Arrays.copyOfRange(input, 2, input.length); - String tag = input[0]; - String sub_tag = input[1]; - - // execute valid response to engine - Response response = producer.send(tag, sub_tag, data); - out.writeUTF(new JSONObject() - .put("response", "200") - .put("code", response.code()) - .put("message", response.message()) - .put("data", response.data()) - .toString()); - out.flush(); - - } catch(Exception e) { - break; } Logger.log(String.format("Terminating thread for Socket with key <%s>", key)); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java index 0f74e860..275bedcf 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java @@ -141,7 +141,7 @@ else if(dated) { } // send end response - return send("SRC", "EDAT", "data", "<<>>", "destination", packet.getData("destination")); + return ResponseFactory.response200(); } catch(Exception e) { return ResponseFactory.response503(Config.getProperty("app", "general.data.dateformat"), packet.getData("start_date"), packet.getData("end_date"));