From 7dd6627cb83d05c1f9ddaf9ce4bccc0c96fe9a73 Mon Sep 17 00:00:00 2001 From: Conor Flynn Date: Mon, 20 Mar 2023 22:46:19 -0400 Subject: [PATCH 1/2] update LSH for compatibility --- .../src/main/java/org/properties/Config.java | 15 +++---- .../handler/ExternalStreamHandler.java | 4 +- .../handler/ExternalStreamManager.java | 12 +++--- .../local/handler/LocalStreamHandler.java | 10 +---- .../registry/StreamRegistryController.java | 41 ++----------------- .../requests/TestExternalRequestManager.java | 16 ++++++-- 6 files changed, 34 insertions(+), 64 deletions(-) diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java index 9ac97dc8..fede1005 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java @@ -23,16 +23,17 @@ public class Config { Properties stream_properties = new Properties(); stream_properties.put("general.consumer.types", "socket_consumer"); stream_properties.put("general.producer.types", "socket_producer"); - //stream_properties.put("rest.socket.address", "DataEngine"); - stream_properties.put("rest.socket.address", "localhost"); + stream_properties.put("rest.socket.address", "DataEngine"); + //stream_properties.put("rest.socket.address", "localhost"); stream_properties.put("rest.socket.port", "61100"); stream_properties.put("rest.socket.key", "rest-key-reserved"); - //stream_properties.put("output.socket.address", "RestApp"); - stream_properties.put("output.socket.address", "localhost"); + stream_properties.put("output.socket.address", "RestApp"); + // stream_properties.put("output.socket.address", "localhost"); stream_properties.put("output.socket.port", "61200"); - //stream_properties.put("local.stream.type", "mongo_db"); - stream_properties.put("local.stream.type", "null"); - stream_properties.put("mongodb.properties.uri", "mongodb://MONGO:27017"); + stream_properties.put("local.stream.type", "mongo_db"); + //stream_properties.put("local.stream.type", "null"); + //stream_properties.put("mongodb.properties.uri", "mongodb://MONGO:27017"); + stream_properties.put("mongodb.properties.uri", "mongodb://localhost:27017"); stream_properties.put("mongodb.database.state", "main-state-db"); stream_properties.put("mongodb.database.main", "main-db"); stream_properties.put("mongodb.auth.collection", "auth-collection"); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java index 037eb634..541d73f8 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java @@ -41,12 +41,12 @@ public Response processRQST(Packet packet) { // check to see if dated // if not - if((validate = packet.validate("startDate", "endDate")) != null) + if((validate = packet.validate("start_date", "end_date")) != null) response = manager.request(packet.getData("type"), packet.getData()); // if dated else - response = manager.request(packet.getData("type"), packet.getData(), packet.getData("startDate"), packet.getData("endDate")); + response = manager.request(packet.getData("type"), packet.getData(), packet.getData("start_date"), packet.getData("end_date")); // check to see if valid if(!((boolean)response[0])) { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java index af48a451..793cdfaf 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java @@ -2,7 +2,6 @@ import java.util.HashMap; -import org.framework.router.Response; import org.stream.external.requests.ExternalRequestFramework; import org.stream.external.requests.ExternalRequestManager; @@ -11,8 +10,6 @@ public class ExternalStreamManager { private final ExternalStreamHandler handler; private final ExternalRequestManager manager; - private final HashMap collections = new HashMap(); - protected ExternalStreamManager(ExternalStreamHandler handler) { this.handler = handler; this.manager = new ExternalRequestManager(); @@ -75,7 +72,7 @@ protected Object[] request(String type, HashMap data, String sta // submit response String response; - if(startDate == null && endDate == null) + if(startDate == null || endDate == null) response = request.request(url_path, properties, headers); else response = request.request(url_path, properties, headers, startDate, endDate); @@ -90,7 +87,10 @@ public void processRequest(String collection, HashMap data) { if(data == null || data.isEmpty()) return; - System.out.println(collection + ": " + data.toString()); - // TODO: FINISH + StringBuilder sb = new StringBuilder(); + for(String key : data.keySet()) + sb.append(key.replaceAll(",", ".") + "," + data.get(key).replaceAll(",", ".") + ","); + sb.delete(sb.length() - 1, sb.length()); + handler.send("LSH", "PUSH", "collection", collection, "data", sb.toString()); } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java index 5e799a43..71d25973 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java @@ -63,16 +63,10 @@ public Response processSCAN(Packet packet) { //public Response process public Response processRQST(Packet packet) { String validate; - if((validate = packet.validate("uuid", "request", "query", "destination")) != null) + if((validate = packet.validate("type", "destination")) != null) return ResponseFactory.response500("LocalStreamHandler", validate); - String request; - String delim = Config.getProperty("app", "general.collection.delim"); - if(packet.containsKey("date")) { - request = packet.getData("uuid") + delim + packet.getData("query") + delim + packet.getData("date"); - } else { - request = packet.getData("uuid") + delim + packet.getData("query"); - } + String request = packet.getData("query"); //String[] query = packet.getData("query").split(Config.getProperty("app", "general.data.delim")); String[] query = new String[] {"get_all", request}; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java index e76ba263..fcf7bbb0 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java @@ -22,34 +22,6 @@ public Response processEXSR(Packet packet) { return send("ESH", "EXSR", packet.getData()); } - public Response processEXST(Packet packet) { - return send("ESH", "EXST", packet.getData()); - } - - public Response processINIT(Packet packet) { - return send("ESH", "INIT", packet.getData()); - } - - public Response processIATH(Packet packet) { - return send("ESH", "IATH", packet.getData()); - } - - public Response processIATV(Packet packet) { - return send("ESH", "IATV", packet.getData()); - } - - public Response processEXEC(Packet packet) { - return send("ESH", "EXEC", packet.getData()); - } - - public Response processKILL(Packet packet) { - return send("ESH", "KILL", packet.getData()); - } - - public Response processSUBS(Packet packet) { - return send("ESH", "SUBS", packet.getData()); - } - public Response processEDAT(Packet packet) { return send("OUT", "EDAT", packet.getData()); } @@ -65,7 +37,7 @@ public Response processRQST(Packet packet) { // Not Dated: data=key, request, query, destination String validate; - if((validate = packet.validate("key", "request", "query", "destination")) != null) + if((validate = packet.validate("type", "destination")) != null) return ResponseFactory.response500("ExternalStreamHandler", validate); // check to see if request is dated @@ -80,17 +52,9 @@ public Response processRQST(Packet packet) { // extract packet data HashMap data = packet.getData(); - // retrieve stream type based on key - Response type_response = send("ESH", "TYPE", data); - if(type_response.code() != 200) - return type_response; - - // retrieve uuid of the data - String uuid = type_response.data(); - data.put("uuid", uuid); - // not dated if(!dated) { + data.put("query", String.format("get_all, %s", packet.getData("type"))); Response lsh_response = send("LSH", "RQST", data); // if data does not exist send request to external stream handler if(lsh_response.code() == 446) @@ -114,6 +78,7 @@ else if(dated) { // initial request data.put("date", date.format(formatter)); + data.put("query", String.format("%s-%s", data.get("type"), data.get("date"))); lsh_response = send("LSH", "RQST", data); if(lsh_response.code() == 200) continue; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/stream/external/requests/TestExternalRequestManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/stream/external/requests/TestExternalRequestManager.java index 8c0b50cd..0916c4c4 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/stream/external/requests/TestExternalRequestManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/stream/external/requests/TestExternalRequestManager.java @@ -4,12 +4,22 @@ import java.util.HashMap; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; import org.core.core.Core; import org.junit.BeforeClass; import org.junit.Test; +import org.properties.Config; public class TestExternalRequestManager { +static { + // disable loggers + LogManager.getRootLogger().setLevel(Level.OFF); + Config.setProperty("stream", "general.consumer.types", "null"); + Config.setProperty("stream", "general.producer.types", "null"); +} + public static Core core; @BeforeClass @@ -29,14 +39,14 @@ public void TestEXSR() { assertEquals("false", core.send("ESH", "EXSR", data).data()); } - @Test + //@Test public void TestRQST() { HashMap data = new HashMap(); data.put("type", "amberdata-uniswap-pool"); data.put("url_path", "poolAddress,0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc"); data.put("headers", "x-api-key,UAK7ed69235426c360be22bfc2bde1809b6"); - data.put("startDate", "2022-09-01"); - data.put("endDate", "2022-09-02"); + data.put("start_date", "2022-09-01"); + data.put("end_date", "2022-09-03"); assertEquals(200, core.send("ESH", "RQST", data).code()); } From bde001e9e1c1e1efba02bdf7f721a591948abcfe Mon Sep 17 00:00:00 2001 From: Conor Flynn Date: Tue, 21 Mar 2023 00:07:58 -0400 Subject: [PATCH 2/2] add properties and url_path to LSH collection name --- .../handler/ExternalStreamHandler.java | 180 ------------------ .../handler/ExternalStreamManager.java | 10 +- .../requests/ExternalRequestFramework.java | 32 +++- .../requests/ExternalRequestManager.java | 3 +- .../local/handler/LocalStreamHandler.java | 4 - .../registry/StreamRegistryController.java | 47 ++++- 6 files changed, 79 insertions(+), 197 deletions(-) diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java index 541d73f8..f70fa63f 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java @@ -56,184 +56,4 @@ public Response processRQST(Packet packet) { // return valid return ResponseFactory.response200(); } - -// // source: source of data -// public Response processEXSR(Packet packet) { -// String validate; -// if((validate = packet.validate("source")) != null) -// return ResponseFactory.response500("ExternalStreamHandler", validate); -// -// return ResponseFactory.response200(String.format("%s", manager.containsTemplate(packet.getData("source")))); -// } -// -// // key: key of the given stream -// public Response processEXST(Packet packet) { -// String validate; -// if((validate = packet.validate("key")) != null) -// return ResponseFactory.response500("ExternalStreamHandler", validate); -// -// return ResponseFactory.response200(String.format("%s", manager.containsStream(packet.getData("key")))); -// } -// -// // source: source of the given stream -// // auth_data[]: all keys associated with data -// public Response processINIT(Packet packet) { -// String validate; -// if((validate = packet.validate("source")) != null) -// return ResponseFactory.response500("ExternalStreamHandler", validate); -// -// String source = packet.getData("source"); -// -// // validate data -// String tempHash = manager.getHash(source, packet.getData()); -// if(tempHash == null) -// return ResponseFactory.response430(packet.getData()); -// if(manager.containsStream(tempHash)) -// return ResponseFactory.response220(tempHash); -// -// // attempt to add stream -// Object[] output = manager.addStream(source, packet.getData()); -// boolean success = (Boolean) output[0]; -// String hash = (String) output[1]; -// -// if(!success) -// return ResponseFactory.response420(source); -// -// boolean authorized = manager.authorizeStream(hash); -// -// // if successful authorize -// if(authorized) -// return ResponseFactory.response200(String.format("%s", hash)); -// -// manager.removeStream(hash); -// return ResponseFactory.response422(source); -// } -// -// // key: stream key -// public Response processIATH(Packet packet) { -// String validate; -// if((validate = packet.validate("key")) != null) -// return ResponseFactory.response500("ExternalStreamHandler", validate); -// -// if(!manager.containsStream(packet.getData("key"))) -// return ResponseFactory.response421(packet.getData("key")); -// -// return ResponseFactory.response200(String.format("%s", manager.isStreamAuthorized(packet.getData("key")))); -// } -// -// public Response processIATV(Packet packet) { -// String validate; -// if((validate = packet.validate("key")) != null) -// return ResponseFactory.response500("ExternalStreamHandler", validate); -// -// String key = packet.getData("key"); -// if(!manager.containsStream(key)) -// return ResponseFactory.response421(key); -// -// return ResponseFactory.response200(String.format("%s", manager.isStreamActive(key))); -// } -// -// public Response processEXEC(Packet packet) { -// String validate; -// if((validate = packet.validate("key")) != null) -// return ResponseFactory.response500("ExternalStreamHandler", validate); -// -// String key = packet.getData("key"); -// if(!manager.containsStream(key)) -// return ResponseFactory.response421(key); -// -// if(!manager.isStreamReady(key)) -// return ResponseFactory.response423(key); -// -// if(manager.isStreamActive(key)) -// return ResponseFactory.response424(key); -// -// return ResponseFactory.response200(String.format("%s", manager.executeStream(key))); -// } -// -// public Response processKILL(Packet packet) { -// String validate; -// if((validate = packet.validate("key")) != null) -// return ResponseFactory.response500("ExternalStreamHandler", validate); -// -// String key = packet.getData("key"); -// if(!manager.containsStream(key)) -// return ResponseFactory.response421(key); -// -// if(!manager.isStreamActive(key)) -// return ResponseFactory.response425(key); -// -// return ResponseFactory.response200(String.format("%s", manager.killStream(key))); -// } -// -// public Response processSUBS(Packet packet) { -// String validate; -// if((validate = packet.validate("key", "subscription")) != null) -// return ResponseFactory.response500("ExternalStreamHandler", validate); -// -// String key = packet.getData("key"); -// String subscription = packet.getData("subscription"); -// -// if(!manager.containsStream(key)) -// return ResponseFactory.response421(key); -// -// if(!manager.isStreamActive(key)) -// return ResponseFactory.response425(key); -// -// if(!manager.containsSubscriptionType(key, subscription)) -// return ResponseFactory.response426(key, subscription); -// -// Object[] response = manager.subscribe(key, subscription); -// -// if((Boolean)response[0]) -// return ResponseFactory.response200(String.format("%s", "true")); -// -// return ResponseFactory.response427(key, response[0].toString(), response[1].toString()); -// } -// -// public Response processRQST(Packet packet) { -// String validate; -// if((validate = packet.validate("key", "request", "query", "destination")) != null) -// return ResponseFactory.response500("ExternalStreamHandler", validate); -// -// String key = packet.getData("key"); -// String request = packet.getData("request"); -// -// if(!manager.containsStream(key)) -// return ResponseFactory.response421(key); -// -// if(!manager.isStreamReady(key)) -// return ResponseFactory.response423(key); -// -// if(!manager.containsRequestType(key, request)) -// return ResponseFactory.response428(key, request); -// -// Object[] response = manager.request(key, packet.getData()); -// -// if(response == null) -// return ResponseFactory.response501("Response from Manager.request should never be null. " -// + "Improper implementation of ExternalStreamConnection."); -// -// if((Boolean)response[0]) -// return ResponseFactory.response200(String.format("%s", (String)response[1])); -// -// return ResponseFactory.response429(key, request, response[1].toString()); -// } -// -// public Response processTYPE(Packet packet) { -// String validate; -// if((validate = packet.validate("key")) != null) -// return ResponseFactory.response500("ExternalStreamHandler", validate); -// -// String key = packet.getData("key"); -// if(!manager.containsStream(key)) -// return ResponseFactory.response421(key); -// -// String uuid = manager.getStreamType(key); -// if(uuid == null) -// return ResponseFactory.response501("Stream was removed in different thread mid observation."); -// -// return ResponseFactory.response200(uuid); -// -// } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java index 793cdfaf..e9cc21d3 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java @@ -1,6 +1,7 @@ package org.stream.external.handler; import java.util.HashMap; +import java.util.TreeMap; import org.stream.external.requests.ExternalRequestFramework; import org.stream.external.requests.ExternalRequestManager; @@ -47,13 +48,16 @@ protected Object[] request(String type, HashMap data, String sta // retrieve properties HashMap properties = new HashMap(); + TreeMap user_properties = new TreeMap(); if(data.containsKey("properties")) { String[] raw_properties = data.get("properties").split(","); if(raw_properties.length % 2 != 0) return new Object[] {false, String.format("Properties must be in pairs.")}; - for(int i = 0; i < raw_properties.length; i+=2) + for(int i = 0; i < raw_properties.length; i+=2) { properties.put(raw_properties[i], raw_properties[i + 1]); + user_properties.put(raw_properties[i], raw_properties[i + 1]); + } } // retrieve headers @@ -73,9 +77,9 @@ protected Object[] request(String type, HashMap data, String sta // submit response String response; if(startDate == null || endDate == null) - response = request.request(url_path, properties, headers); + response = request.request(url_path, properties, headers, user_properties); else - response = request.request(url_path, properties, headers, startDate, endDate); + response = request.request(url_path, properties, headers, startDate, endDate, user_properties); if(response != null) return new Object[] {false, response}; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestFramework.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestFramework.java index f5cfb1b2..e9fe300b 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestFramework.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestFramework.java @@ -4,8 +4,10 @@ import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.TreeMap; import java.util.stream.Stream; import org.json.JSONArray; @@ -65,7 +67,6 @@ public ExternalRequestFramework(ExternalStreamManager manager, String name, Stri HashMap tags, String[] recursive_location, String recursive_replacement, String[] path, boolean is_dated, String date_location, String date_start_var, String date_end_var, String date_format) { this.manager = manager; - this.collection = name; this.name = name; this.url = url; this.url_path = url_path; @@ -178,7 +179,6 @@ protected final Request getRequest(String url, HashMap propertie return builder.build(); } - public final synchronized String request(String[] url_path, HashMap properties, HashMap headers) { // validate that url_path is correctly formatted if(url_path.length % 2 != 0) { @@ -208,8 +208,21 @@ public final synchronized String request(String[] url_path, HashMap properties, HashMap headers, TreeMap user_properties) { + // update collection to be dated + StringBuilder sb = new StringBuilder(); + sb.append(name); + if(!user_properties.isEmpty()) + sb.append("-").append(user_properties.toString()); + if(url_path.length != 0) + sb.append("-").append(Arrays.toString(url_path)); + this.collection = sb.toString(); + collection = sb.toString(); + return request(url_path, properties, headers); + } + public final synchronized String request(String[] url_path, HashMap properties, HashMap headers, - String startDate, String endDate) { + String startDate, String endDate, TreeMap user_properties) { // validate that the request can be dated if(!is_dated) { @@ -278,9 +291,18 @@ public final synchronized String request(String[] url_path, HashMap ordered_properties = new TreeMap(); + String[] properties = data.get("properties").split(","); + if(properties.length % 2 != 0) + return ResponseFactory.response407("SRC", "RQST", "", data.toString()); + for(int i = 0; i < properties.length; i+=2) + ordered_properties.put(properties[i], properties[i + 1]); + sb.append("-").append(ordered_properties.toString()); + } + + if(packet.containsKey("url_path")) { + String[] url_path = data.get("url_path").split(","); + sb.append("-").append(Arrays.toString(url_path)); + } + data.put("query", String.format("%s", sb)); + Response lsh_response = send("LSH", "RQST", data); // if data does not exist send request to external stream handler if(lsh_response.code() == 446) @@ -74,11 +94,30 @@ else if(dated) { Response lsh_response, esh_response; for(LocalDate date : dates) { - // perform requests - // initial request data.put("date", date.format(formatter)); - data.put("query", String.format("%s-%s", data.get("type"), data.get("date"))); + + // format collection name + StringBuilder sb = new StringBuilder(); + sb.append(data.get("type")); + sb.append("-").append(data.get("date")); + if(packet.containsKey("properties")) { + TreeMap ordered_properties = new TreeMap(); + String[] properties = data.get("properties").split(","); + if(properties.length % 2 != 0) + return ResponseFactory.response407("SRC", "RQST", "", data.toString()); + for(int i = 0; i < properties.length; i+=2) + ordered_properties.put(properties[i], properties[i + 1]); + sb.append("-").append(ordered_properties.toString()); + } + + if(packet.containsKey("url_path")) { + String[] url_path = data.get("url_path").split(","); + sb.append("-").append(Arrays.toString(url_path)); + } + data.put("query", sb.toString()); + + // initiate request lsh_response = send("LSH", "RQST", data); if(lsh_response.code() == 200) continue;