diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java index 9ac97dc8..fede1005 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java @@ -23,16 +23,17 @@ public class Config { Properties stream_properties = new Properties(); stream_properties.put("general.consumer.types", "socket_consumer"); stream_properties.put("general.producer.types", "socket_producer"); - //stream_properties.put("rest.socket.address", "DataEngine"); - stream_properties.put("rest.socket.address", "localhost"); + stream_properties.put("rest.socket.address", "DataEngine"); + //stream_properties.put("rest.socket.address", "localhost"); stream_properties.put("rest.socket.port", "61100"); stream_properties.put("rest.socket.key", "rest-key-reserved"); - //stream_properties.put("output.socket.address", "RestApp"); - stream_properties.put("output.socket.address", "localhost"); + stream_properties.put("output.socket.address", "RestApp"); + // stream_properties.put("output.socket.address", "localhost"); stream_properties.put("output.socket.port", "61200"); - //stream_properties.put("local.stream.type", "mongo_db"); - stream_properties.put("local.stream.type", "null"); - stream_properties.put("mongodb.properties.uri", "mongodb://MONGO:27017"); + stream_properties.put("local.stream.type", "mongo_db"); + //stream_properties.put("local.stream.type", "null"); + //stream_properties.put("mongodb.properties.uri", "mongodb://MONGO:27017"); + stream_properties.put("mongodb.properties.uri", "mongodb://localhost:27017"); stream_properties.put("mongodb.database.state", "main-state-db"); stream_properties.put("mongodb.database.main", "main-db"); stream_properties.put("mongodb.auth.collection", "auth-collection"); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java index 037eb634..541d73f8 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java @@ -41,12 +41,12 @@ public Response processRQST(Packet packet) { // check to see if dated // if not - if((validate = packet.validate("startDate", "endDate")) != null) + if((validate = packet.validate("start_date", "end_date")) != null) response = manager.request(packet.getData("type"), packet.getData()); // if dated else - response = manager.request(packet.getData("type"), packet.getData(), packet.getData("startDate"), packet.getData("endDate")); + response = manager.request(packet.getData("type"), packet.getData(), packet.getData("start_date"), packet.getData("end_date")); // check to see if valid if(!((boolean)response[0])) { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java index af48a451..793cdfaf 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java @@ -2,7 +2,6 @@ import java.util.HashMap; -import org.framework.router.Response; import org.stream.external.requests.ExternalRequestFramework; import org.stream.external.requests.ExternalRequestManager; @@ -11,8 +10,6 @@ public class ExternalStreamManager { private final ExternalStreamHandler handler; private final ExternalRequestManager manager; - private final HashMap collections = new HashMap(); - protected ExternalStreamManager(ExternalStreamHandler handler) { this.handler = handler; this.manager = new ExternalRequestManager(); @@ -75,7 +72,7 @@ protected Object[] request(String type, HashMap data, String sta // submit response String response; - if(startDate == null && endDate == null) + if(startDate == null || endDate == null) response = request.request(url_path, properties, headers); else response = request.request(url_path, properties, headers, startDate, endDate); @@ -90,7 +87,10 @@ public void processRequest(String collection, HashMap data) { if(data == null || data.isEmpty()) return; - System.out.println(collection + ": " + data.toString()); - // TODO: FINISH + StringBuilder sb = new StringBuilder(); + for(String key : data.keySet()) + sb.append(key.replaceAll(",", ".") + "," + data.get(key).replaceAll(",", ".") + ","); + sb.delete(sb.length() - 1, sb.length()); + handler.send("LSH", "PUSH", "collection", collection, "data", sb.toString()); } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java index 5e799a43..71d25973 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java @@ -63,16 +63,10 @@ public Response processSCAN(Packet packet) { //public Response process public Response processRQST(Packet packet) { String validate; - if((validate = packet.validate("uuid", "request", "query", "destination")) != null) + if((validate = packet.validate("type", "destination")) != null) return ResponseFactory.response500("LocalStreamHandler", validate); - String request; - String delim = Config.getProperty("app", "general.collection.delim"); - if(packet.containsKey("date")) { - request = packet.getData("uuid") + delim + packet.getData("query") + delim + packet.getData("date"); - } else { - request = packet.getData("uuid") + delim + packet.getData("query"); - } + String request = packet.getData("query"); //String[] query = packet.getData("query").split(Config.getProperty("app", "general.data.delim")); String[] query = new String[] {"get_all", request}; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java index e76ba263..fcf7bbb0 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java @@ -22,34 +22,6 @@ public Response processEXSR(Packet packet) { return send("ESH", "EXSR", packet.getData()); } - public Response processEXST(Packet packet) { - return send("ESH", "EXST", packet.getData()); - } - - public Response processINIT(Packet packet) { - return send("ESH", "INIT", packet.getData()); - } - - public Response processIATH(Packet packet) { - return send("ESH", "IATH", packet.getData()); - } - - public Response processIATV(Packet packet) { - return send("ESH", "IATV", packet.getData()); - } - - public Response processEXEC(Packet packet) { - return send("ESH", "EXEC", packet.getData()); - } - - public Response processKILL(Packet packet) { - return send("ESH", "KILL", packet.getData()); - } - - public Response processSUBS(Packet packet) { - return send("ESH", "SUBS", packet.getData()); - } - public Response processEDAT(Packet packet) { return send("OUT", "EDAT", packet.getData()); } @@ -65,7 +37,7 @@ public Response processRQST(Packet packet) { // Not Dated: data=key, request, query, destination String validate; - if((validate = packet.validate("key", "request", "query", "destination")) != null) + if((validate = packet.validate("type", "destination")) != null) return ResponseFactory.response500("ExternalStreamHandler", validate); // check to see if request is dated @@ -80,17 +52,9 @@ public Response processRQST(Packet packet) { // extract packet data HashMap data = packet.getData(); - // retrieve stream type based on key - Response type_response = send("ESH", "TYPE", data); - if(type_response.code() != 200) - return type_response; - - // retrieve uuid of the data - String uuid = type_response.data(); - data.put("uuid", uuid); - // not dated if(!dated) { + data.put("query", String.format("get_all, %s", packet.getData("type"))); Response lsh_response = send("LSH", "RQST", data); // if data does not exist send request to external stream handler if(lsh_response.code() == 446) @@ -114,6 +78,7 @@ else if(dated) { // initial request data.put("date", date.format(formatter)); + data.put("query", String.format("%s-%s", data.get("type"), data.get("date"))); lsh_response = send("LSH", "RQST", data); if(lsh_response.code() == 200) continue; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java index 26495c31..eddf74eb 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java @@ -2,10 +2,11 @@ import static org.junit.Assert.assertEquals; +import java.util.HashMap; + import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.core.core.Core; -import org.framework.router.Response; import org.junit.Test; import org.properties.Config; @@ -16,143 +17,138 @@ public class TestSRC { LogManager.getRootLogger().setLevel(Level.OFF); Config.setProperty("stream", "general.consumer.types", "null"); Config.setProperty("stream", "general.producer.types", "null"); - Config.setProperty("stream", "local.stream.type", "local_template"); + Config.setProperty("stream", "mongodb.properties.uri", "mongodb://localhost:27017"); } - @Test - public void TestEXSR() { - Core core = new Core(); - - assertEquals(200, core.send("SRC", "EXSR", "source", "external_template").code()); - assertEquals("true", core.send("SRC", "EXSR", "source", "external_template").data()); - assertEquals("false", core.send("SRC", "EXSR", "source", "template").data()); - - assertEquals(500, core.send("SRC", "EXSR", "source", "").code()); - assertEquals(500, core.send("SRC", "EXSR", "dne", "").code()); - } - - @Test - public void TestEXST() { - Core core = new Core(); - - assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); - - assertEquals("true", core.send("SRC", "EXST", "key", "key").data()); - assertEquals("false", core.send("SRC", "EXST", "key", "key1").data()); - - assertEquals(500, core.send("SRC", "EXST", "key", "").code()); - assertEquals(500, core.send("SRC", "EXST", "dne", "").code()); - } - - @Test - public void TestINIT() { - Core core = new Core(); - // successful authorization - Response valid = core.send("SRC", "INIT", "source", "external_template", "key", "key"); - assertEquals(200, valid.code()); - assertEquals("key", valid.data()); - assertEquals("true", core.send("SRC", "EXST", "key", "key").data()); - - assertEquals(220, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); - - assertEquals(420, core.send("SRC", "INIT", "source", "does_not_exist").code()); - assertEquals(422, core.send("SRC", "INIT", "source", "external_template", "key", "wrong").code()); - assertEquals(422, core.send("SRC", "INIT", "source", "external_template").code()); - assertEquals(500, core.send("SRC", "INIT", "dne", "").code()); - } - - @Test - public void TestIATH() { - Core core = new Core(); - - assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); - - assertEquals(200, core.send("SRC", "IATH", "key", "key").code()); - assertEquals("true", core.send("SRC", "IATH", "key", "key").data()); - - assertEquals(421, core.send("SRC", "IATH", "key", "does_not_exist").code()); - assertEquals(500, core.send("SRC", "IATH", "key", "").code()); - } - - @Test - public void TestIATV() { - Core core = new Core(); - - assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); - assertEquals("false", core.send("SRC", "IATV", "key", "key").data()); - assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); - assertEquals("true", core.send("SRC", "IATV", "key", "key").data()); - - assertEquals(421, core.send("SRC", "IATV", "key", "does_not_exist").code()); - assertEquals(500, core.send("SRC", "IATV", "key", "").code()); - } - - @Test - public void TestEXEC() { - Core core = new Core(); - - assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); - assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "not_ready", "override", "true").code()); - - assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); - assertEquals("true", core.send("SRC", "IATH", "key", "key").data()); - assertEquals("true", core.send("SRC", "IATV", "key", "key").data()); - - assertEquals(421, core.send("SRC", "EXEC", "key", "does_not_exist").code()); - assertEquals(423, core.send("SRC", "EXEC", "key", "not_ready").code()); - assertEquals(424, core.send("SRC", "EXEC", "key", "key").code()); - } - - @Test - public void TestKILL() { - Core core = new Core(); - - assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); - assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); - - assertEquals("true", core.send("SRC", "IATV", "key", "key").data()); - assertEquals("true", core.send("SRC", "KILL", "key", "key").data()); - assertEquals("false", core.send("SRC", "IATV", "key", "key").data()); - - assertEquals(421, core.send("SRC", "KILL", "key", "does_not_exist").code()); - assertEquals(425, core.send("SRC", "KILL", "key", "key").code()); - assertEquals(500, core.send("SRC", "KILL", "key", "").code()); - } - - @Test - public void TestSUBS() { - Core core = new Core(); - - assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); - assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); - - assertEquals("true", core.send("SRC", "SUBS", "key", "key", "subscription", "correct").data()); - - assertEquals(421, core.send("SRC", "SUBS", "key", "does_not_exist", "subscription", "correct").code()); - assertEquals(200, core.send("SRC", "KILL", "key", "key").code()); - assertEquals(425, core.send("SRC", "SUBS", "key", "key", "subscription", "correct").code()); - assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); - assertEquals(426, core.send("SRC", "SUBS", "key", "key", "subscription", "does_not_exist").code()); - assertEquals(427, core.send("SRC", "SUBS", "key", "key", "subscription", "irregular").code()); - } +// @Test +// public void TestEXSR() { +// Core core = new Core(); +// +// assertEquals(200, core.send("SRC", "EXSR", "source", "external_template").code()); +// assertEquals("true", core.send("SRC", "EXSR", "source", "external_template").data()); +// assertEquals("false", core.send("SRC", "EXSR", "source", "template").data()); +// +// assertEquals(500, core.send("SRC", "EXSR", "source", "").code()); +// assertEquals(500, core.send("SRC", "EXSR", "dne", "").code()); +// } +// +// @Test +// public void TestEXST() { +// Core core = new Core(); +// +// assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); +// +// assertEquals("true", core.send("SRC", "EXST", "key", "key").data()); +// assertEquals("false", core.send("SRC", "EXST", "key", "key1").data()); +// +// assertEquals(500, core.send("SRC", "EXST", "key", "").code()); +// assertEquals(500, core.send("SRC", "EXST", "dne", "").code()); +// } +// +// @Test +// public void TestINIT() { +// Core core = new Core(); +// // successful authorization +// Response valid = core.send("SRC", "INIT", "source", "external_template", "key", "key"); +// assertEquals(200, valid.code()); +// assertEquals("key", valid.data()); +// assertEquals("true", core.send("SRC", "EXST", "key", "key").data()); +// +// assertEquals(220, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); +// +// assertEquals(420, core.send("SRC", "INIT", "source", "does_not_exist").code()); +// assertEquals(422, core.send("SRC", "INIT", "source", "external_template", "key", "wrong").code()); +// assertEquals(422, core.send("SRC", "INIT", "source", "external_template").code()); +// assertEquals(500, core.send("SRC", "INIT", "dne", "").code()); +// } +// +// @Test +// public void TestIATH() { +// Core core = new Core(); +// +// assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); +// +// assertEquals(200, core.send("SRC", "IATH", "key", "key").code()); +// assertEquals("true", core.send("SRC", "IATH", "key", "key").data()); +// +// assertEquals(421, core.send("SRC", "IATH", "key", "does_not_exist").code()); +// assertEquals(500, core.send("SRC", "IATH", "key", "").code()); +// } +// +// @Test +// public void TestIATV() { +// Core core = new Core(); +// +// assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); +// assertEquals("false", core.send("SRC", "IATV", "key", "key").data()); +// assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); +// assertEquals("true", core.send("SRC", "IATV", "key", "key").data()); +// +// assertEquals(421, core.send("SRC", "IATV", "key", "does_not_exist").code()); +// assertEquals(500, core.send("SRC", "IATV", "key", "").code()); +// } +// +// @Test +// public void TestEXEC() { +// Core core = new Core(); +// +// assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); +// assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "not_ready", "override", "true").code()); +// +// assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); +// assertEquals("true", core.send("SRC", "IATH", "key", "key").data()); +// assertEquals("true", core.send("SRC", "IATV", "key", "key").data()); +// +// assertEquals(421, core.send("SRC", "EXEC", "key", "does_not_exist").code()); +// assertEquals(423, core.send("SRC", "EXEC", "key", "not_ready").code()); +// assertEquals(424, core.send("SRC", "EXEC", "key", "key").code()); +// } +// +// @Test +// public void TestKILL() { +// Core core = new Core(); +// +// assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); +// assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); +// +// assertEquals("true", core.send("SRC", "IATV", "key", "key").data()); +// assertEquals("true", core.send("SRC", "KILL", "key", "key").data()); +// assertEquals("false", core.send("SRC", "IATV", "key", "key").data()); +// +// assertEquals(421, core.send("SRC", "KILL", "key", "does_not_exist").code()); +// assertEquals(425, core.send("SRC", "KILL", "key", "key").code()); +// assertEquals(500, core.send("SRC", "KILL", "key", "").code()); +// } +// +// @Test +// public void TestSUBS() { +// Core core = new Core(); +// +// assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); +// assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); +// +// assertEquals("true", core.send("SRC", "SUBS", "key", "key", "subscription", "correct").data()); +// +// assertEquals(421, core.send("SRC", "SUBS", "key", "does_not_exist", "subscription", "correct").code()); +// assertEquals(200, core.send("SRC", "KILL", "key", "key").code()); +// assertEquals(425, core.send("SRC", "SUBS", "key", "key", "subscription", "correct").code()); +// assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); +// assertEquals(426, core.send("SRC", "SUBS", "key", "key", "subscription", "does_not_exist").code()); +// assertEquals(427, core.send("SRC", "SUBS", "key", "key", "subscription", "irregular").code()); +// } @Test public void TestRQST() { Core core = new Core(); - assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); - - assertEquals(200, core.send("SRC", "RQST", "key", "key", "request", "correct", "query", "valid", "destination", "null").code()); - assertEquals(200, core.send("SRC", "RQST", "key", "key", "request", "correct", "query", "valid", "destination", "null", - "start_date", "2020-01-01", "end_date", "2020-02-01").code()); + HashMap data = new HashMap(); + data.put("type", "amberdata-uniswap-pool"); + data.put("url_path", "poolAddress,0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc"); + data.put("headers", "x-api-key,UAK7ed69235426c360be22bfc2bde1809b6"); + data.put("start_date", "2022-09-01"); + data.put("end_date", "2022-09-03"); + data.put("destination", "null"); - assertEquals(421, core.send("SRC", "RQST", "key", "does_not_exist", "request", "correct", "query", "valid", "destination", "null").code()); - Config.setProperty("testing", "lsh.ready", "false"); - assertEquals(447, core.send("SRC", "RQST", "key", "key", "request", "correct", "query", "valid", "destination", "null").code()); - Config.setProperty("testing", "lsh.ready", "true"); - assertEquals(421, core.send("SRC", "RQST", "key", "dne", "request", "correct", "query", "valid", "destination", "null").code()); - assertEquals(447, core.send("SRC", "RQST", "key", "key", "request", "correct", "query", "irregular", "destination", "null").code()); - assertEquals(500, core.send("SRC", "RQST", "key", "key", "request", "correct", "query", "valid", "destination", "null", - "start_date", "2020-01-01").code()); + assertEquals(200, core.send("SRC", "RQST", data)); } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/stream/external/requests/TestExternalRequestManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/stream/external/requests/TestExternalRequestManager.java index 8c0b50cd..0916c4c4 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/stream/external/requests/TestExternalRequestManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/stream/external/requests/TestExternalRequestManager.java @@ -4,12 +4,22 @@ import java.util.HashMap; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; import org.core.core.Core; import org.junit.BeforeClass; import org.junit.Test; +import org.properties.Config; public class TestExternalRequestManager { +static { + // disable loggers + LogManager.getRootLogger().setLevel(Level.OFF); + Config.setProperty("stream", "general.consumer.types", "null"); + Config.setProperty("stream", "general.producer.types", "null"); +} + public static Core core; @BeforeClass @@ -29,14 +39,14 @@ public void TestEXSR() { assertEquals("false", core.send("ESH", "EXSR", data).data()); } - @Test + //@Test public void TestRQST() { HashMap data = new HashMap(); data.put("type", "amberdata-uniswap-pool"); data.put("url_path", "poolAddress,0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc"); data.put("headers", "x-api-key,UAK7ed69235426c360be22bfc2bde1809b6"); - data.put("startDate", "2022-09-01"); - data.put("endDate", "2022-09-02"); + data.put("start_date", "2022-09-01"); + data.put("end_date", "2022-09-03"); assertEquals(200, core.send("ESH", "RQST", data).code()); }