diff --git a/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx b/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx index 32e084ea..86dc1b83 100644 Binary files a/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx and b/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx differ diff --git a/DeFi-Data-Engine/DeFi Data Engine/.classpath b/DeFi-Data-Engine/DeFi Data Engine/.classpath index b98fb499..4d455a8c 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/.classpath +++ b/DeFi-Data-Engine/DeFi Data Engine/.classpath @@ -25,12 +25,7 @@ - - - - - - + diff --git a/DeFi-Data-Engine/DeFi Data Engine/config/app.properties b/DeFi-Data-Engine/DeFi Data Engine/config/app.properties index 09935a87..88474e56 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/config/app.properties +++ b/DeFi-Data-Engine/DeFi Data Engine/config/app.properties @@ -1,4 +1,10 @@ # === GENERAL PROPERTIES === # delimiter used for internal processing -general.internal.delim=,., \ No newline at end of file +general.internal.delim=,., + +# enable all packet logging +general.logging.packets=true + +# enable all response logging +general.logging.responses=true \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/config/output.properties b/DeFi-Data-Engine/DeFi Data Engine/config/output.properties index 9aa49d3b..8c2b5ca9 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/config/output.properties +++ b/DeFi-Data-Engine/DeFi Data Engine/config/output.properties @@ -1,7 +1,7 @@ # === GENERAL PROPERTIES === -consumer.types=socket_consumer -producer.types=socket_producer +consumer.types=null +producer.types=null # === REST SOCKET PROPERTIES === @@ -21,3 +21,8 @@ output.socket.address=localhost # Output socket port output.socket.port=61200 + +# === LOCAL STREAM PROPERTIES === + +# local stream type used for database configuration +local.stream.type=mongo_db \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/pom.xml b/DeFi-Data-Engine/DeFi Data Engine/pom.xml index bed7d8a0..3df3471f 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/pom.xml +++ b/DeFi-Data-Engine/DeFi Data Engine/pom.xml @@ -41,21 +41,6 @@ json 20220320 - - org.apache.kafka - kafka-clients - 3.2.1 - - - org.apache.kafka - kafka-clients - 3.2.1 - - - org.apache.kafka - kafka-streams - 3.2.1 - @@ -70,8 +55,6 @@ - - diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/core/Core.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/core/Core.java index 67b11a1c..eb550b57 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/core/Core.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/core/Core.java @@ -2,6 +2,7 @@ import org.core.engine.Engine; import org.core.logger.Logger; +import org.framework.router.Response; import org.framework.router.Router; import org.out.controller.Controller; import org.out.handler.OutputHandler; @@ -14,12 +15,13 @@ public Core() { OutputHandler out = new OutputHandler(); Controller crl = new Controller(); - Logger log = new Logger(); Engine eng = new Engine(); StreamManager str = new StreamManager(); - this.connect(out, crl, log, eng, str); + this.connect(out, crl, eng, str); - this.send("ENG", "STRT", ""); + Response response = this.send("ENG", "STRT", ""); + if(response.code() != 200) + Logger.terminate(response); } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java index 668ab259..33c006a8 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java @@ -2,7 +2,9 @@ import org.framework.router.Packet; import org.framework.router.Response; +import org.framework.router.ResponseFactory; import org.framework.router.Router; +import org.properties.Config; public class Engine extends Router { @@ -11,8 +13,16 @@ public Engine() { } public Response processSTRT(Packet packet) { - // start output processes - // TODO: check all engine processes outside of OUT prior to returning response. - return send("OUT", "STRT", ""); + // start output processes: + Response out_response = send("OUT", "STRT", ""); + if(out_response.code() != 200) + return out_response; + + // start local stream handler processes: + Response lsh_response = send("LSH", "INIT", Config.getProperty("output", "local.stream.type")); + if(lsh_response.code() != 200) + return lsh_response; + + return ResponseFactory.response200(); } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java index ce5099c2..528488b9 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java @@ -1,10 +1,45 @@ package org.core.logger; -import org.framework.router.Router; +import java.time.Instant; -public class Logger extends Router { +import org.framework.router.Packet; +import org.framework.router.Response; - public Logger() { - super("logger", "LOG"); +public class Logger { + + public static final void log(Packet packet) { + System.out.println(packetFormat(packet)); + } + + public static final void log(Response response) { + System.out.println(responseFormat(response)); + } + + public static final void terminate(Packet packet) { + System.err.println(packetFormat(packet)); + System.exit(1); + } + + public static final void terminate(Response response) { + System.err.println(responseFormat(response)); + System.exit(1); + } + + private static final String packetFormat(Packet packet) { + return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%s] [%s]", + Instant.now().toString(), + Thread.currentThread().getName(), + packet.getSender(), + packet.getTag(), + packet.getData(), + packet.getSubData()); + } + + private static final String responseFormat(Response response) { + return String.format("[%s] [%-10s] RESPONSE - [%3d] [%s]", + Instant.now().toString(), + Thread.currentThread().getName(), + response.code(), + response.data()); } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Packet.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Packet.java index 59bde8d1..be8bafab 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Packet.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Packet.java @@ -1,5 +1,8 @@ package org.framework.router; +import org.core.logger.Logger; +import org.properties.Config; + /** * The {@link Packet} class represents a standardized data transfer object * used throughout the engine. It contains a series of values which help to @@ -10,6 +13,8 @@ * */ public class Packet { + + private final static boolean log = Config.getProperty("app", "general.logging.packets").equals("true"); private final String sender; private final String tag; @@ -89,7 +94,10 @@ public final String getSubData() { * @return New {@link Packet} object. */ public static Packet packet(Router router, String tag, String sub_tag, String data) { - return new Packet(router, tag, sub_tag, data, ""); + Packet packet = new Packet(router, tag, sub_tag, data, ""); + if(log) + Logger.log(packet); + return packet; } /** @@ -103,6 +111,9 @@ public static Packet packet(Router router, String tag, String sub_tag, String da * @return New {@link Packet} object. */ public static Packet packet(Router router, String tag, String sub_tag, String data, String sub_data) { - return new Packet(router, tag, sub_tag, data, sub_data); + Packet packet = new Packet(router, tag, sub_tag, data, sub_data); + if(log) + Logger.log(packet); + return packet; } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java index 6165a6aa..ffa3ecfd 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java @@ -1,5 +1,8 @@ package org.framework.router; +import org.core.logger.Logger; +import org.properties.Config; + /** * The {@link Response} class is used to relay information from a * given {@link Packet} sent through a {@link Router}. {@link Response} @@ -18,6 +21,8 @@ * */ public final class Response { + + private final static boolean log = Config.getProperty("app", "general.logging.responses").equals("true"); private final int code; private final String message; @@ -91,7 +96,10 @@ public String data() { * @return New {@link Response} object formatted based on the passed parameters. */ public static Response create(int code, String message) { - return new Response(code, message); + Response response = new Response(code, message); + if(log) + Logger.log(response); + return response; } /** @@ -104,6 +112,9 @@ public static Response create(int code, String message) { * @return New {@link Response} object formatted based on the passed parameters. */ public static Response create(int code, String message, String data) { - return new Response(code, message, data); + Response response = new Response(code, message, data); + if(log) + Logger.log(response); + return response; } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java index 79bd17bc..39228d78 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java @@ -16,8 +16,12 @@ public static Response response0() { return Response.create(0, ""); } + public static Response response200() { + return Response.create(200, "Successful Response."); + } + public static Response response200(String data) { - return Response.create(200, "Successful Response.", data); + return Response.create(200, "", data); } public static Response response220(String hash) { @@ -84,6 +88,30 @@ public static Response response429(String hash, String request, String response) return Response.create(429, String.format("Stream with hash <%s> returned an irregular response when attempting to subscribe to <%s>. Response returned is: <%s>", hash, request, response)); } + public static Response response440(String source) { + return Response.create(440, String.format("Requested data source <%s> does not exist in cache.", source)); + } + + public static Response response441(String hash) { + return Response.create(441, String.format("Requested data stream with given hash <%s> does not exist in cache.", hash)); + } + + public static Response response442(String source) { + return Response.create(442, String.format("Failed to add local data source <%s>.", source)); + } + + public static Response response443(String source) { + return Response.create(443, String.format("Local data stream with given source <%s> already exists.", source)); + } + + public static Response response444(String source) { + return Response.create(444, String.format("Failure to authorize the local data source <%s> with the given properties.", source)); + } + + public static Response response445(String source, String query) { + return Response.create(445, String.format("Local data stream with given source <%s> could not validate passed query <%s>", query)); + } + public static Response response460(String consumer) { return Response.create(460, String.format("Output consumer <%s> failed to listen to consumption channel.", consumer)); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputHandler.java index 3afe2b11..5d7cea71 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputHandler.java @@ -26,7 +26,7 @@ public Response processSTRT(Packet packet) { if(!(boolean)producer_response[0]) return ResponseFactory.response470(producer_response[1].toString()); - return ResponseFactory.response200(""); + return ResponseFactory.response200(); } catch(Exception e) { e.printStackTrace(); System.exit(1); @@ -42,6 +42,6 @@ public Response processEDAT(Packet packet) { if(!manager.send(packet.getSubData(), packet)) return ResponseFactory.response472(packet.getSubData()); - return ResponseFactory.response200(""); + return ResponseFactory.response200(); } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/ProtocolDirectory.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/ProtocolDirectory.java index fee81cba..c5558db0 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/ProtocolDirectory.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/ProtocolDirectory.java @@ -18,6 +18,7 @@ public class ProtocolDirectory { protocols.put("kill", new String[]{"SRC", "KILL"}); protocols.put("subscribe", new String[]{"SRC", "SUBS"}); protocols.put("request", new String[]{"SRC", "RQST"}); + protocols.put("scan", new String[] {"SRC", "SCAN"}); } public static String[] getProtocol(String protocol) { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java index b192dd87..e6c798e5 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java @@ -5,7 +5,6 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.HashSet; import java.util.UUID; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java index a72ccec2..c6f2c1d4 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java @@ -39,6 +39,11 @@ public static final String getProperty(String name, String property) { return properties.get(name).getProperty(property); } + public static final void setProperty(String name, String property, String value) { + validate(name, property); + properties.get(name).setProperty(property, value); + } + public static final void validate(String name, String... keys) { if(!properties.containsKey(name)) { System.err.println(String.format("Property file <%s> does not exist. Program terminating.", name)); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java index 3190d837..84772b9a 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java @@ -24,7 +24,7 @@ public AmberDataConnection(ExternalStreamManager manager, String data) { public String getUUID() { return "amber_data"; } - + public void init() { } @@ -42,7 +42,7 @@ public void defineRequestTypes() { public String getHash(String data) { try { MessageDigest md = MessageDigest.getInstance("SHA-512"); - byte[] bytes = md.digest(("salt" + data).getBytes()); + byte[] bytes = md.digest(("salt" + System.currentTimeMillis() + data).getBytes()); BigInteger signum = new BigInteger(1, bytes); String hashed = signum.toString(16); while(hashed.length() < 32) diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java similarity index 92% rename from DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateConnection.java rename to DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java index ce1f0fe2..1b989d42 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java @@ -3,13 +3,13 @@ import org.stream.external.handler.ExternalStreamConnection; import org.stream.external.handler.ExternalStreamManager; -public class TemplateConnection extends ExternalStreamConnection { +public class TemplateExternalConnection extends ExternalStreamConnection { private boolean active = false; private boolean authorized = false; private boolean override = false; - public TemplateConnection(ExternalStreamManager manager, String data) { + public TemplateExternalConnection(ExternalStreamManager manager, String data) { super(manager, data.split(",")[0]); if(data.contains(",")) diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java index af654361..f5de1bbb 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java @@ -19,14 +19,14 @@ public Response processEXSR(Packet packet) { if(packet.getData().equals("")) return ResponseFactory.response500("ExternalStreamHandler", "source"); - return Response.create(200, "", String.format("%s", manager.containsTemplate(packet.getData().toString()))); + return ResponseFactory.response200(String.format("%s", manager.containsTemplate(packet.getData().toString()))); } public Response processEXST(Packet packet) { if(packet.getData().equals("")) return ResponseFactory.response500("ExternalStreamHandler", "streamHash"); - return Response.create(200, "", String.format("%s", manager.containsStream(packet.getData().toString()))); + return ResponseFactory.response200(String.format("%s", manager.containsStream(packet.getData().toString()))); } @SuppressWarnings("deprecation") @@ -63,7 +63,7 @@ public Response processINIT(Packet packet) { // if successful authorize if(authorized) - return Response.create(200, "", String.format("true, %s", hash)); + return ResponseFactory.response200(String.format("true, %s", hash)); manager.removeStream(hash); return ResponseFactory.response422(template); @@ -76,7 +76,7 @@ public Response processIATH(Packet packet) { if(!manager.containsStream(packet.getData())) return ResponseFactory.response421(packet.getData()); - return Response.create(200, "", String.format("%s", manager.isStreamAuthorized(packet.getData()))); + return ResponseFactory.response200(String.format("%s", manager.isStreamAuthorized(packet.getData()))); } public Response processIATV(Packet packet) { @@ -86,7 +86,7 @@ public Response processIATV(Packet packet) { if(!manager.containsStream(packet.getData())) return ResponseFactory.response421(packet.getData()); - return Response.create(200, "", String.format("%s", manager.isStreamActive(packet.getData()))); + return ResponseFactory.response200(String.format("%s", manager.isStreamActive(packet.getData()))); } public Response processEXEC(Packet packet) { @@ -102,7 +102,7 @@ public Response processEXEC(Packet packet) { if(manager.isStreamActive(packet.getData())) return ResponseFactory.response424(packet.getData()); - return Response.create(200, "", String.format("%s", manager.executeStream(packet.getData()))); + return ResponseFactory.response200(String.format("%s", manager.executeStream(packet.getData()))); } public Response processKILL(Packet packet) { @@ -115,7 +115,7 @@ public Response processKILL(Packet packet) { if(!manager.isStreamActive(packet.getData())) return ResponseFactory.response425(packet.getData()); - return Response.create(200, "", String.format("%s", manager.killStream(packet.getData()))); + return ResponseFactory.response200(String.format("%s", manager.killStream(packet.getData()))); } public Response processSUBS(Packet packet) { @@ -145,7 +145,7 @@ public Response processSUBS(Packet packet) { Object[] subscription = manager.subscribe(hash, type); if((Boolean)subscription[0]) - return Response.create(200, "", String.format("%s", "true")); + return ResponseFactory.response200(String.format("%s", "true")); return ResponseFactory.response427(hash, type, (String)subscription[1]); } @@ -180,7 +180,7 @@ public Response processRQST(Packet packet) { Object[] response = manager.request(hash, packet.getSubData(), request); if((Boolean)response[0]) - return Response.create(200, "", String.format("%s", (String)response[1])); + return ResponseFactory.response200(String.format("%s", (String)response[1])); return ResponseFactory.response429(hash, request, (String)response[1]); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java new file mode 100644 index 00000000..57b73488 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java @@ -0,0 +1,5 @@ +package org.stream.local.connected.connections; + +public class TemplateLocalConnection { + +} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/DataState.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/DataState.java new file mode 100644 index 00000000..da3b46fa --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/DataState.java @@ -0,0 +1,10 @@ +package org.stream.local.handler; + +public enum DataState { + DOES_NOT_EXIST, + PARTIAL, + EXISTS, + MODIFIED, + CORRUPTED, + INVALID; +} \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java new file mode 100644 index 00000000..20251152 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java @@ -0,0 +1,22 @@ +package org.stream.local.handler; + +import org.framework.interfaces.Hash; +import org.framework.interfaces.UUID; + +public abstract class LocalStreamConnection implements UUID { + + private final LocalStreamManager manager; + + public LocalStreamConnection(LocalStreamManager manager) { + this.manager = manager; + } + + public abstract boolean authorize(); + public abstract boolean isAuthorized(); + public abstract boolean isReady(); + public abstract boolean validate(String... query); + public abstract boolean contains(String... query); + public abstract DataState state(String... query); + public abstract String get(String... query); + public abstract boolean modify(String data, String... query); +} \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java index bb88996d..01a34967 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java @@ -1,10 +1,53 @@ package org.stream.local.handler; +import org.framework.router.Packet; +import org.framework.router.Response; +import org.framework.router.ResponseFactory; import org.framework.router.Router; public class LocalStreamHandler extends Router { + private final LocalStreamManager manager; + public LocalStreamHandler() { super("local_stream_handler", "LSH"); + this.manager = new LocalStreamManager(this); } + + public Response processINIT(Packet packet) { + if(packet.getData().isEmpty()) + return ResponseFactory.response500("LocalStreamHandler", "source"); + + String source = packet.getData(); + + if(!manager.containsTemplate(source)) + return ResponseFactory.response440(source); + + if(manager.isStreamDefined()) + return ResponseFactory.response443(source); + + if(!manager.setStream(source)) + return ResponseFactory.response442(source); + + if(!manager.authorize() || !manager.isAuthorized()) + return ResponseFactory.response444(source); + + return ResponseFactory.response200(); + } + + public Response processSCAN(Packet packet) { + // data: query + if(packet.getData().isEmpty()) + return ResponseFactory.response500("LocalStreamHandler", "query"); + + if(!manager.validate(packet.getData())) + return ResponseFactory.response445(manager.streamType(), packet.getData()); + + if(manager.scan(packet.getData())) + return ResponseFactory.response200("true"); + + return ResponseFactory.response200("false"); + } + + //public Response process } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java new file mode 100644 index 00000000..fe427cee --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java @@ -0,0 +1,117 @@ +package org.stream.local.handler; + +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Set; + +import org.reflections.Reflections; +import org.stream.external.handler.ExternalStreamConnection; +import org.stream.external.handler.ExternalStreamManager; + +public class LocalStreamManager { + + private final LocalStreamHandler handler; + private final HashMap> templates; + private LocalStreamConnection stream; + + protected LocalStreamManager(LocalStreamHandler handler) { + this.handler = handler; + + templates = new HashMap>(); + + try { + reflect(); + } catch (Exception e) { + e.printStackTrace(); + System.exit(1); + } + } + + private void reflect() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException { + Reflections reflection = new Reflections("org.stream.local.connected.connections"); + Set> types = reflection.getSubTypesOf(LocalStreamConnection.class); + for(Class c : types) + templates.put(c.getDeclaredConstructor(LocalStreamManager.class, String.class).newInstance(this, "").getUUID(), c); + } + + protected boolean containsTemplate(String type) { + return templates.containsKey(type); + } + + protected boolean isStreamDefined() { + return stream != null; + } + + protected boolean setStream(String type) { + if(!templates.containsKey(type)) + return false; + + try { + this.stream = templates.get(type).getDeclaredConstructor(LocalStreamManager.class).newInstance(this); + return true; + } catch(Exception e) { + e.printStackTrace(); + System.exit(1); + } + + return false; + } + + protected String streamType() { + if(stream == null) + return null; + + return stream.getUUID(); + } + + protected boolean authorize() { + return stream.authorize(); + } + + protected boolean isAuthorized() { + return stream.isAuthorized(); + } + + protected boolean isReady() { + return stream.isReady(); + } + + protected boolean validate(String... query) { + return stream.validate(query); + } + + protected boolean scan(String... query) { + if(!stream.validate(query)) + return false; + + return stream.contains(query); + } + + protected boolean contains(String... query) { + if(!validate(query)) + return false; + + return stream.contains(query); + } + + public DataState state(String... query) { + if(!validate(query)) + return DataState.INVALID; + + return stream.state(query); + } + + protected String get(String... query) { + if(!validate(query)) + return null; + + return stream.get(query); + } + + protected boolean modify(String data, String... query) { + if(!validate(query)) + return false; + + return stream.modify(data, query); + } +} \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java new file mode 100644 index 00000000..17ca01d2 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java @@ -0,0 +1,5 @@ +package test.protocols; + +public class TestLSH { + +} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java index eaf18792..5eca334b 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java @@ -113,7 +113,7 @@ public static void testSpeed2() { } public static void main(String[] args) { - //testSpeed1(); - testSpeed2(); + testSpeed1(); + //testSpeed2(); } } diff --git a/DeFi-Data-Engine/Rest Application/.classpath b/DeFi-Data-Engine/Rest Application/.classpath index 4f20f1ad..d118e897 100644 --- a/DeFi-Data-Engine/Rest Application/.classpath +++ b/DeFi-Data-Engine/Rest Application/.classpath @@ -24,7 +24,7 @@ - + diff --git a/DeFi-Data-Engine/Rest Application/pom.xml b/DeFi-Data-Engine/Rest Application/pom.xml index 9975fbd2..9644e635 100644 --- a/DeFi-Data-Engine/Rest Application/pom.xml +++ b/DeFi-Data-Engine/Rest Application/pom.xml @@ -47,51 +47,9 @@ json 20220320 - - - - io.vertx - vertx-core - ${project.version} - - - io.vertx - vertx-web - ${project.version} - - - io.vertx - vertx-kafka-client - ${project.version} - - - - - io.debezium - debezium-core - ${debezium.version} - - - io.debezium - debezium-core - ${debezium.version} - test-jar - - - org.apache.kafka - kafka-clients - 3.2.1 - - - org.springframework.integration - spring-integration-ip + org.springframework.integration + spring-integration-ip diff --git a/DeFi-Data-Engine/Testing Environment/.classpath b/DeFi-Data-Engine/Testing Environment/.classpath index a3622af7..cddbf79a 100644 --- a/DeFi-Data-Engine/Testing Environment/.classpath +++ b/DeFi-Data-Engine/Testing Environment/.classpath @@ -6,7 +6,7 @@ - +