From 835d9b522cd7150829e518971bd5f6256511e13e Mon Sep 17 00:00:00 2001 From: Conor Flynn Date: Mon, 20 Mar 2023 22:46:19 -0400 Subject: [PATCH] update LSH for compatibility --- .../src/main/java/org/properties/Config.java | 15 +++---- .../handler/ExternalStreamHandler.java | 4 +- .../handler/ExternalStreamManager.java | 12 +++--- .../local/handler/LocalStreamHandler.java | 10 +---- .../registry/StreamRegistryController.java | 41 ++----------------- 5 files changed, 21 insertions(+), 61 deletions(-) diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java index 9ac97dc8..fede1005 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java @@ -23,16 +23,17 @@ public class Config { Properties stream_properties = new Properties(); stream_properties.put("general.consumer.types", "socket_consumer"); stream_properties.put("general.producer.types", "socket_producer"); - //stream_properties.put("rest.socket.address", "DataEngine"); - stream_properties.put("rest.socket.address", "localhost"); + stream_properties.put("rest.socket.address", "DataEngine"); + //stream_properties.put("rest.socket.address", "localhost"); stream_properties.put("rest.socket.port", "61100"); stream_properties.put("rest.socket.key", "rest-key-reserved"); - //stream_properties.put("output.socket.address", "RestApp"); - stream_properties.put("output.socket.address", "localhost"); + stream_properties.put("output.socket.address", "RestApp"); + // stream_properties.put("output.socket.address", "localhost"); stream_properties.put("output.socket.port", "61200"); - //stream_properties.put("local.stream.type", "mongo_db"); - stream_properties.put("local.stream.type", "null"); - stream_properties.put("mongodb.properties.uri", "mongodb://MONGO:27017"); + stream_properties.put("local.stream.type", "mongo_db"); + //stream_properties.put("local.stream.type", "null"); + //stream_properties.put("mongodb.properties.uri", "mongodb://MONGO:27017"); + stream_properties.put("mongodb.properties.uri", "mongodb://localhost:27017"); stream_properties.put("mongodb.database.state", "main-state-db"); stream_properties.put("mongodb.database.main", "main-db"); stream_properties.put("mongodb.auth.collection", "auth-collection"); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java index 037eb634..541d73f8 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java @@ -41,12 +41,12 @@ public Response processRQST(Packet packet) { // check to see if dated // if not - if((validate = packet.validate("startDate", "endDate")) != null) + if((validate = packet.validate("start_date", "end_date")) != null) response = manager.request(packet.getData("type"), packet.getData()); // if dated else - response = manager.request(packet.getData("type"), packet.getData(), packet.getData("startDate"), packet.getData("endDate")); + response = manager.request(packet.getData("type"), packet.getData(), packet.getData("start_date"), packet.getData("end_date")); // check to see if valid if(!((boolean)response[0])) { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java index af48a451..793cdfaf 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java @@ -2,7 +2,6 @@ import java.util.HashMap; -import org.framework.router.Response; import org.stream.external.requests.ExternalRequestFramework; import org.stream.external.requests.ExternalRequestManager; @@ -11,8 +10,6 @@ public class ExternalStreamManager { private final ExternalStreamHandler handler; private final ExternalRequestManager manager; - private final HashMap collections = new HashMap(); - protected ExternalStreamManager(ExternalStreamHandler handler) { this.handler = handler; this.manager = new ExternalRequestManager(); @@ -75,7 +72,7 @@ protected Object[] request(String type, HashMap data, String sta // submit response String response; - if(startDate == null && endDate == null) + if(startDate == null || endDate == null) response = request.request(url_path, properties, headers); else response = request.request(url_path, properties, headers, startDate, endDate); @@ -90,7 +87,10 @@ public void processRequest(String collection, HashMap data) { if(data == null || data.isEmpty()) return; - System.out.println(collection + ": " + data.toString()); - // TODO: FINISH + StringBuilder sb = new StringBuilder(); + for(String key : data.keySet()) + sb.append(key.replaceAll(",", ".") + "," + data.get(key).replaceAll(",", ".") + ","); + sb.delete(sb.length() - 1, sb.length()); + handler.send("LSH", "PUSH", "collection", collection, "data", sb.toString()); } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java index 5e799a43..71d25973 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java @@ -63,16 +63,10 @@ public Response processSCAN(Packet packet) { //public Response process public Response processRQST(Packet packet) { String validate; - if((validate = packet.validate("uuid", "request", "query", "destination")) != null) + if((validate = packet.validate("type", "destination")) != null) return ResponseFactory.response500("LocalStreamHandler", validate); - String request; - String delim = Config.getProperty("app", "general.collection.delim"); - if(packet.containsKey("date")) { - request = packet.getData("uuid") + delim + packet.getData("query") + delim + packet.getData("date"); - } else { - request = packet.getData("uuid") + delim + packet.getData("query"); - } + String request = packet.getData("query"); //String[] query = packet.getData("query").split(Config.getProperty("app", "general.data.delim")); String[] query = new String[] {"get_all", request}; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java index e76ba263..fcf7bbb0 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java @@ -22,34 +22,6 @@ public Response processEXSR(Packet packet) { return send("ESH", "EXSR", packet.getData()); } - public Response processEXST(Packet packet) { - return send("ESH", "EXST", packet.getData()); - } - - public Response processINIT(Packet packet) { - return send("ESH", "INIT", packet.getData()); - } - - public Response processIATH(Packet packet) { - return send("ESH", "IATH", packet.getData()); - } - - public Response processIATV(Packet packet) { - return send("ESH", "IATV", packet.getData()); - } - - public Response processEXEC(Packet packet) { - return send("ESH", "EXEC", packet.getData()); - } - - public Response processKILL(Packet packet) { - return send("ESH", "KILL", packet.getData()); - } - - public Response processSUBS(Packet packet) { - return send("ESH", "SUBS", packet.getData()); - } - public Response processEDAT(Packet packet) { return send("OUT", "EDAT", packet.getData()); } @@ -65,7 +37,7 @@ public Response processRQST(Packet packet) { // Not Dated: data=key, request, query, destination String validate; - if((validate = packet.validate("key", "request", "query", "destination")) != null) + if((validate = packet.validate("type", "destination")) != null) return ResponseFactory.response500("ExternalStreamHandler", validate); // check to see if request is dated @@ -80,17 +52,9 @@ public Response processRQST(Packet packet) { // extract packet data HashMap data = packet.getData(); - // retrieve stream type based on key - Response type_response = send("ESH", "TYPE", data); - if(type_response.code() != 200) - return type_response; - - // retrieve uuid of the data - String uuid = type_response.data(); - data.put("uuid", uuid); - // not dated if(!dated) { + data.put("query", String.format("get_all, %s", packet.getData("type"))); Response lsh_response = send("LSH", "RQST", data); // if data does not exist send request to external stream handler if(lsh_response.code() == 446) @@ -114,6 +78,7 @@ else if(dated) { // initial request data.put("date", date.format(formatter)); + data.put("query", String.format("%s-%s", data.get("type"), data.get("date"))); lsh_response = send("LSH", "RQST", data); if(lsh_response.code() == 200) continue;