diff --git a/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx b/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx index bdbad7c0..7220a67c 100644 Binary files a/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx and b/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx differ diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/core/Core.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/core/Core.java index eb550b57..36debac0 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/core/Core.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/core/Core.java @@ -20,7 +20,7 @@ public Core() { this.connect(out, crl, eng, str); - Response response = this.send("ENG", "STRT", ""); + Response response = this.send("ENG", "STRT"); if(response.code() != 200) Logger.terminate(response); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java index 34196b7b..42decc2f 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java @@ -12,16 +12,17 @@ public Engine() { super("engine", "ENG"); } + // source: source of the local stream to initialize public Response processSTRT(Packet packet) { // start output processes: - Response out_response = send("OUT", "STRT", ""); + Response out_response = send("OUT", "STRT"); if(out_response.code() != 200) return out_response; // start local stream handler processes: String lsh_type = Config.getProperty("stream", "local.stream.type"); if(!lsh_type.equals("null")) { - Response lsh_response = send("LSH", "INIT", lsh_type); + Response lsh_response = send("LSH", "INIT", "local_source", lsh_type); if(lsh_response.code() != 200) return lsh_response; } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java index 718da906..68b0bdfd 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java @@ -35,14 +35,13 @@ public static final void terminate(Response response) { } private static final String packetFormat(Packet packet) { - return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%4s] [%s] [%s]", + return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%4s] [%s]", time(), Thread.currentThread().getName(), packet.getSender(), packet.getTag(), packet.getSubTag(), - packet.getData(), - packet.getSubData()); + packet.getData()); } private static final String responseFormat(Response response) { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/interfaces/Hash.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/interfaces/Hash.java index 375f8c5b..4be60859 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/interfaces/Hash.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/interfaces/Hash.java @@ -1,5 +1,7 @@ package org.framework.interfaces; +import java.util.HashMap; + /** * Interface used for requiring components to have a unique hash based on the passed data. * The hash does not require any standard formatting so long as it is unique. @@ -13,8 +15,8 @@ public interface Hash { * Unique hash based on the passed data for identification. Algorithm is recommended to be * a salted SHA-512. * - * @param data {@link String} which holds all data, primarily that used for authorization. + * @param data {@link HashMap} which holds all data, primarily that used for authorization. * @return {@link String} that contains the newly created hash. */ - public String getHash(String data); + public String getHash(HashMap data); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Packet.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Packet.java index be8bafab..6ed0eff8 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Packet.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Packet.java @@ -1,5 +1,7 @@ package org.framework.router; +import java.util.HashMap; + import org.core.logger.Logger; import org.properties.Config; @@ -19,8 +21,7 @@ public class Packet { private final String sender; private final String tag; private final String sub_tag; - private final String data; - private final String sub_data; + private final HashMap data; /** * Initializes a new {@link Packet} object. @@ -29,14 +30,12 @@ public class Packet { * @param tag Tag of the destination the {@link Packet} will be sent to. * @param sub_tag Sub tag describing the action performed at the destination. * @param data Data transmitted through the {@link Packet} for processing at the destination. - * @param sub_data Supporting data used for processing and transmitting from {@code data}. This parameter is optional. */ - private Packet(Router router, String tag, String sub_tag, String data, String sub_data) { + private Packet(Router router, String tag, String sub_tag, HashMap data) { this.sender = router.getTag(); this.tag = tag; this.sub_tag = sub_tag; this.data = data; - this.sub_data = sub_data; } /** @@ -66,22 +65,41 @@ public final String getSubTag() { return sub_tag; } + public final boolean containsKey(String key) { + return data.containsKey(key); + } + /** * Data transmitted through the {@link Packet} for processing at the destination. * * @return String containing all data sent. */ - public final String getData() { + public final HashMap getData() { return data; } /** - * Supporting data used for processing and transmitting from {@code data}. + * Retrieves data point stored within {@link Packet}. * - * @return String containing all sub_data. + * @param key Key that the data point is stored under. + * @return {@link String} containing data stored. */ - public final String getSubData() { - return sub_data; + public final String getData(String key) { + if(data.containsKey(key)) + return data.get(key); + + return null; + } + + public final String validate(String... keys) { + if(keys == null) + return null; + + for(String key : keys) + if(!data.containsKey(key) || data.get(key).equals("")) + return key; + + return null; } /** @@ -93,8 +111,11 @@ public final String getSubData() { * @param data Data transmitted through the {@link Packet} for processing at the destination. * @return New {@link Packet} object. */ - public static Packet packet(Router router, String tag, String sub_tag, String data) { - Packet packet = new Packet(router, tag, sub_tag, data, ""); + public static Packet packet(Router router, String tag, String sub_tag, HashMap data) { + if(data == null) + return null; + + Packet packet = new Packet(router, tag, sub_tag, data); if(log) Logger.log(packet); return packet; @@ -110,8 +131,15 @@ public static Packet packet(Router router, String tag, String sub_tag, String da * @param sub_data Supporting data used for processing and transmitting from {@code data}. * @return New {@link Packet} object. */ - public static Packet packet(Router router, String tag, String sub_tag, String data, String sub_data) { - Packet packet = new Packet(router, tag, sub_tag, data, sub_data); + public static Packet packet(Router router, String tag, String sub_tag, String... data) { + if(data.length % 2 != 0) + return null; + + HashMap map = new HashMap(); + for(int i = 0; i < data.length; i+=2) + map.put(data[i], data[i + 1]); + + Packet packet = new Packet(router, tag, sub_tag, map); if(log) Logger.log(packet); return packet; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java index c2d24e95..050e0e2c 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java @@ -45,6 +45,12 @@ public static Response response405(String router, String subtag) { + "is written properly.", router, subtag)); } + public static Response response407(String router, String tag, String sub_tag, String... data) { + return Response.create(407, String.format("Malformed packet when sending data from <%s> to <%s> using protocol " + + "<%s>. Data does not contain an even amount of pairs. Data <%s>.", + router, tag, sub_tag, Arrays.toString(data))); + } + public static Response response410(String router, String subtag) { return Response.create(410, String.format("Router <%s> process <%s> is formatted incorrectly. Check to make sure the process' " + "method contains the proper format of .", router, subtag)); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java index e440021c..f503c2e1 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java @@ -171,8 +171,15 @@ protected final Collection connectedTags() { * @param data Data transmitted through the {@link Packet} for processing at the destination. * @return Integer representing the return code of the sent {@link Packet}. */ - public final Response send(String tag, String sub_tag, String data) { - return send(tag, sub_tag, data, ""); + public final Response send(String tag, String sub_tag, HashMap data) { + if(manager == null) + return ResponseFactory.response400(this.getTag()); + + Packet packet = Packet.packet(this, tag, sub_tag, data); + if(packet == null) + return ResponseFactory.response407(this.tag, tag, sub_tag, data.toString()); + + return manager.send(packet); } /** @@ -184,12 +191,15 @@ public final Response send(String tag, String sub_tag, String data) { * @param sub_data Supporting data used for processing and transmitting from {@code data}. This parameter is optional. * @return Integer representing the return code of the sent {@link Packet}. */ - public final Response send(String tag, String sub_tag, String data, String sub_data) { + public final Response send(String tag, String sub_tag, String... data) { if(manager == null) return ResponseFactory.response400(this.getTag()); // create packet and push to receive method - Packet packet = Packet.packet(this, tag, sub_tag, data, sub_data); + Packet packet = Packet.packet(this, tag, sub_tag, data); + if(packet == null) + return ResponseFactory.response407(this.tag, tag, sub_tag, data); + return manager.send(packet); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java index daab980b..239bdd4a 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java @@ -19,7 +19,7 @@ public SocketDestination(String key, DataOutputStream out) { public final synchronized boolean send(Packet packet) { try { - out.write(packet.getData().getBytes()); + out.write(packet.getData("data").getBytes()); out.write(10); } catch (JSONException | IOException e) { return false; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputHandler.java index 9f818188..26d99db9 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputHandler.java @@ -36,14 +36,18 @@ public Response processSTRT(Packet packet) { } public Response processEDAT(Packet packet) { - if(packet.getSubData().equals("null")) + String destination = packet.getData("destination"); + if(destination == null) + return ResponseFactory.response500("OutputHandler", "destination"); + + if(destination.equals("null")) return ResponseFactory.response200(); - if(!manager.containsDestination(packet.getSubData())) - return ResponseFactory.response471(packet.getSubData()); + if(!manager.containsDestination(destination)) + return ResponseFactory.response471(destination); - if(!manager.send(packet.getSubData(), packet)) - return ResponseFactory.response472(packet.getSubData()); + if(!manager.send(destination, packet)) + return ResponseFactory.response472(destination); return ResponseFactory.response200(); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java index d274dca0..65ca05de 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java @@ -111,14 +111,10 @@ public Object[] producerListen() { return new Object[] {true, ""}; } - protected final Response send(String tag, String sub_tag, String data) { + protected final Response send(String tag, String sub_tag, String... data) { return handler.send(tag, sub_tag, data); } - protected final Response send(String tag, String sub_tag, String data, String sub_data) { - return handler.send(tag, sub_tag, data, sub_data); - } - public final synchronized void add(OutputDestination destination) { destinations.put(destination.getKey(), destination); //destination.send(Packet.packet(handler, "", "", "Connected: " + destination.getKey())); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java index e248a87c..cdd57bdb 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java @@ -3,10 +3,10 @@ import java.math.BigInteger; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.HashMap; import org.framework.router.ResponseFactory; import org.json.JSONObject; -import org.stream.external.connected.amberdata.AmberDataRequestHandler; import org.stream.external.handler.ExternalStreamConnection; import org.stream.external.handler.ExternalStreamManager; @@ -17,7 +17,7 @@ public class AmberDataConnection extends ExternalStreamConnection { private boolean authorized = false; - public AmberDataConnection(ExternalStreamManager manager, String data) { + public AmberDataConnection(ExternalStreamManager manager, HashMap data) { super(manager, data); } @@ -28,16 +28,6 @@ public String getUUID() { public void init() { } - - @Override - public void defineSubscriptionTypes() { - - } - - @Override - public void defineRequestTypes() { - addRequestType("lending-latest"); - } public String getHash(String data) { try { @@ -65,7 +55,7 @@ public boolean authorize() { .url("https://web3api.io/api/v2/market/defi/lending/exchanges/aave/latest") .get() .addHeader("Accept", "application/json") - .addHeader("x-api-key", data) + .addHeader("x-api-key", data.get("key")) .build(); try { @@ -114,14 +104,33 @@ public boolean start() { public boolean stop() { return false; } - + + @Override + public String getHash(HashMap data) { + // TODO Auto-generated method stub + return null; + } + @Override - public Object[] request(String destination, String data) { - return AmberDataRequestHandler.request(this.data, data); + public boolean containsSubscriptionType(String type) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean containsRequestType(String type) { + // TODO Auto-generated method stub + return false; + } + + @Override + public Object[] request(HashMap request) { + // TODO Auto-generated method stub + return null; } @Override - public Object[] request(String destination, String request, String date) { + public Object[] request(HashMap request, String date) { // TODO Auto-generated method stub return null; } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java index 74e0afa8..0fae45f0 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java @@ -1,5 +1,7 @@ package org.stream.external.connected.connections; +import java.util.HashMap; + import org.stream.external.handler.ExternalStreamConnection; import org.stream.external.handler.ExternalStreamManager; @@ -9,11 +11,11 @@ public class TemplateExternalConnection extends ExternalStreamConnection { private boolean authorized = false; private boolean override = false; - public TemplateExternalConnection(ExternalStreamManager manager, String data) { - super(manager, data.split(",")[0]); + public TemplateExternalConnection(ExternalStreamManager manager, HashMap data) { + super(manager, data); - if(data.contains(",")) - override = Boolean.parseBoolean(data.split(",")[1].trim()); + if(data.containsKey("override")) + override = true; } public String getUUID() { @@ -26,10 +28,8 @@ public String getHash(String data) { @Override public boolean authorize() { - if(override) - authorized = true; - else - authorized = this.data.equals("key"); + if(this.data.containsKey("key")) + authorized = data.get("key").equals("key") || data.get("key").equals("not_ready"); return authorized; } @@ -37,21 +37,6 @@ public boolean authorize() { public boolean isAuthorized() { return authorized; } - - @Override - public void defineSubscriptionTypes() { - addSubscriptionType("correct"); - addSubscriptionType("irregular"); - addSubscriptionType("external"); - } - - @Override - public void defineRequestTypes() { - addRequestType("correct"); - addRequestType("irregular"); - addRequestType("external"); - addRequestType("template-external-request"); - } @Override public Object[] subscribe(String data) { @@ -84,29 +69,32 @@ public boolean stop() { active = false; return true; } - + + @Override + public String getHash(HashMap data) { + return data.get("key"); + } + + @Override + public boolean containsSubscriptionType(String type) { + return type.equals("correct") || type.equals("irregular"); + } + + @Override + public boolean containsRequestType(String type) { + // TODO Auto-generated method stub + return false; + } + + @Override + public Object[] request(HashMap request) { + // TODO Auto-generated method stub + return null; + } + @Override - public Object[] request(String destination, String data, String date) { - if(date == null) { - if(data.equals("correct")) - return new Object[] {true, "success"}; - - else if(data.equals("template-external-request")) { - for(int i = 0; i < 3; i++) - this.processRequest(data, date, destination, String.format("element, %d, mult7, %d", i, i * 7)); - return new Object[] {true, "success"}; - } - } else { - if(data.equals("correct")) - return new Object[] {true, "success"}; - - else if(data.equals("template-external-request")) { - for(int i = 0; i < 3; i++) - this.processRequest(data, date, destination, String.format("element, %d, mult10, %d", i, i * 10)); - return new Object[] {true, "success"}; - } - } - - return new Object[] {false, "Request handled irregularly"}; + public Object[] request(HashMap request, String date) { + // TODO Auto-generated method stub + return null; } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamConnection.java index 8fba47b4..34c596ea 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamConnection.java @@ -1,5 +1,6 @@ package org.stream.external.handler; +import java.util.HashMap; import java.util.TreeSet; import org.framework.interfaces.Hash; @@ -10,11 +11,11 @@ public abstract class ExternalStreamConnection implements UUID, Hash { private final String hash; private final ExternalStreamManager manager; - protected final String data; + protected final HashMap data; protected final TreeSet subscriptionTypes; protected final TreeSet requestTypes; - public ExternalStreamConnection(ExternalStreamManager manager, String data) { + public ExternalStreamConnection(ExternalStreamManager manager, HashMap data) { this.hash = getHash(data); this.manager = manager; this.data = data; @@ -23,8 +24,6 @@ public ExternalStreamConnection(ExternalStreamManager manager, String data) { if(data != null) { init(); - defineSubscriptionTypes(); - defineRequestTypes(); } } @@ -32,21 +31,19 @@ public void init() { } - public void processSubscription(String subscription, String destination, String data) { - manager.processSubscription(hash, subscription, destination, data); + public void processSubscription(String subscription, String data) { + manager.processSubscription(hash, subscription, data); } - public void processRequest(String request, String date, String destination, String data) { + public void processRequest(String request, String date, String data) { if(date != null) manager.processRequest(hash, - getUUID() + Config.getProperty("app", "general.collection.delim") + request + Config.getProperty("app", "general.collection.delim") + date, - destination, + getUUID() + Config.getProperty("app", "general.collection.delim") + request + Config.getProperty("app", "general.collection.delim") + date, data); else manager.processRequest(hash, - getUUID() + Config.getProperty("app", "general.collection.delim") + request, - destination, + getUUID() + Config.getProperty("app", "general.collection.delim") + request, data); } @@ -59,16 +56,12 @@ public final String getHash() { public abstract boolean isReady(); public abstract boolean isActive(); - public abstract void defineSubscriptionTypes(); - public final void addSubscriptionType(String type) { subscriptionTypes.add(type); } - public final boolean containsSubscriptionType(String type) { return subscriptionTypes.contains(type); } + public abstract boolean containsSubscriptionType(String type); public abstract Object[] subscribe(String data); - public abstract void defineRequestTypes(); - public final void addRequestType(String type) { requestTypes.add(type); } - public final boolean containsRequestType(String type) { return requestTypes.contains(type); } - public Object[] request(String destination, String request) { return request(destination, request, null); } - public abstract Object[] request(String destination, String request, String date); + public abstract boolean containsRequestType(String type); + public abstract Object[] request(HashMap request); + public abstract Object[] request(HashMap request, String date); public abstract boolean start(); public abstract boolean stop(); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java index 74d4114d..2be7a9af 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java @@ -4,7 +4,6 @@ import org.framework.router.Response; import org.framework.router.ResponseFactory; import org.framework.router.Router; -import org.properties.Config; public final class ExternalStreamHandler extends Router { @@ -16,51 +15,46 @@ public ExternalStreamHandler() { manager = new ExternalStreamManager(this); } + // source: source of data public Response processEXSR(Packet packet) { - if(packet.getData().equals("")) - return ResponseFactory.response500("ExternalStreamHandler", "source"); + String validate; + if((validate = packet.validate("source")) != null) + return ResponseFactory.response500("ExternalStreamHandler", validate); - return ResponseFactory.response200(String.format("%s", manager.containsTemplate(packet.getData().toString()))); + return ResponseFactory.response200(String.format("%s", manager.containsTemplate(packet.getData("source")))); } + // key: key of the given stream public Response processEXST(Packet packet) { - if(packet.getData().equals("")) - return ResponseFactory.response500("ExternalStreamHandler", "streamHash"); + String validate; + if((validate = packet.validate("key")) != null) + return ResponseFactory.response500("ExternalStreamHandler", validate); - return ResponseFactory.response200(String.format("%s", manager.containsStream(packet.getData().toString()))); + return ResponseFactory.response200(String.format("%s", manager.containsStream(packet.getData("key")))); } @SuppressWarnings("deprecation") + // source: source of the given stream + // auth_data[]: all keys associated with data public Response processINIT(Packet packet) { - // extract template from data - String data = packet.getData(); - String delim = Config.getProperty("app", "general.internal.delim"); - int delim_len = delim.length(); - int splitIndex = data.indexOf(delim); - String template = ""; - if(splitIndex != -1) { - template = data.substring(0, splitIndex).trim(); - data = data.substring(splitIndex + delim_len).trim(); - } else { - template = data; - data = ""; - } + String validate; + if((validate = packet.validate("source")) != null) + return ResponseFactory.response500("ExternalStreamHandler", validate); + + String source = packet.getData("source"); // validate data - if(template.isEmpty()) - return ResponseFactory.response500("ExternalStreamHandler", "source"); - - String tempHash = manager.getHash(template, data); + String tempHash = manager.getHash(source, packet.getData()); if(manager.containsStream(tempHash)) return ResponseFactory.response220(tempHash); // attempt to add stream - Object[] output = manager.addStream(template, data); + Object[] output = manager.addStream(source, packet.getData()); boolean success = (Boolean) output[0]; String hash = (String) output[1]; if(!success) - return ResponseFactory.response420(template); + return ResponseFactory.response420(source); boolean authorized = manager.authorizeStream(hash); @@ -69,135 +63,122 @@ public Response processINIT(Packet packet) { return ResponseFactory.response200(String.format("%s", hash)); manager.removeStream(hash); - return ResponseFactory.response422(template); + return ResponseFactory.response422(source); } + // key: stream key public Response processIATH(Packet packet) { - if(packet.getData().equals("")) - return ResponseFactory.response500("ExternalStreamHandler", "streamHash"); + String validate; + if((validate = packet.validate("key")) != null) + return ResponseFactory.response500("ExternalStreamHandler", validate); - if(!manager.containsStream(packet.getData())) - return ResponseFactory.response421(packet.getData()); + if(!manager.containsStream(packet.getData("key"))) + return ResponseFactory.response421(packet.getData("key")); - return ResponseFactory.response200(String.format("%s", manager.isStreamAuthorized(packet.getData()))); + return ResponseFactory.response200(String.format("%s", manager.isStreamAuthorized(packet.getData("key")))); } public Response processIATV(Packet packet) { - if(packet.getData().equals("")) - return ResponseFactory.response500("ExternalStreamHandler", "streamHash"); + String validate; + if((validate = packet.validate("key")) != null) + return ResponseFactory.response500("ExternalStreamHandler", validate); - if(!manager.containsStream(packet.getData())) - return ResponseFactory.response421(packet.getData()); + String key = packet.getData("key"); + if(!manager.containsStream(key)) + return ResponseFactory.response421(key); - return ResponseFactory.response200(String.format("%s", manager.isStreamActive(packet.getData()))); + return ResponseFactory.response200(String.format("%s", manager.isStreamActive(key))); } public Response processEXEC(Packet packet) { - if(packet.getData().equals("")) - return ResponseFactory.response500("ExternalStreamHandler", "streamHash"); + String validate; + if((validate = packet.validate("key")) != null) + return ResponseFactory.response500("ExternalStreamHandler", validate); - if(!manager.containsStream(packet.getData())) - return ResponseFactory.response421(packet.getData()); + String key = packet.getData("key"); + if(!manager.containsStream(key)) + return ResponseFactory.response421(key); - if(!manager.isStreamReady(packet.getData())) - return ResponseFactory.response423(packet.getData()); + if(!manager.isStreamReady(key)) + return ResponseFactory.response423(key); - if(manager.isStreamActive(packet.getData())) - return ResponseFactory.response424(packet.getData()); + if(manager.isStreamActive(key)) + return ResponseFactory.response424(key); - return ResponseFactory.response200(String.format("%s", manager.executeStream(packet.getData()))); + return ResponseFactory.response200(String.format("%s", manager.executeStream(key))); } public Response processKILL(Packet packet) { - if(packet.getData().equals("")) - return ResponseFactory.response500("ExternalStreamHandler", "streamHash"); + String validate; + if((validate = packet.validate("key")) != null) + return ResponseFactory.response500("ExternalStreamHandler", validate); - if(!manager.containsStream(packet.getData())) - return ResponseFactory.response421(packet.getData()); + String key = packet.getData("key"); + if(!manager.containsStream(key)) + return ResponseFactory.response421(key); - if(!manager.isStreamActive(packet.getData())) - return ResponseFactory.response425(packet.getData()); + if(!manager.isStreamActive(key)) + return ResponseFactory.response425(key); - return ResponseFactory.response200(String.format("%s", manager.killStream(packet.getData()))); + return ResponseFactory.response200(String.format("%s", manager.killStream(key))); } public Response processSUBS(Packet packet) { - if(packet.getData().equals("")) - return ResponseFactory.response500("ExternalStreamHandler", "streamHash"); + String validate; + if((validate = packet.validate("key", "subscription")) != null) + return ResponseFactory.response500("ExternalStreamHandler", validate); - // extract hash from data - String type = packet.getData(); - int splitIndex = type.indexOf(','); - String hash = ""; - if(splitIndex != -1) { - hash = type.substring(0, splitIndex).trim(); - type = type.substring(splitIndex + 1).trim(); - } else { - return ResponseFactory.response500("ExternalStreamHandler", "subscriptionType"); - } + String key = packet.getData("key"); + String subscription = packet.getData("subscription"); - if(!manager.containsStream(hash)) - return ResponseFactory.response421(hash); + if(!manager.containsStream(key)) + return ResponseFactory.response421(key); - if(!manager.isStreamActive(hash)) - return ResponseFactory.response425(hash); + if(!manager.isStreamActive(key)) + return ResponseFactory.response425(key); - if(!manager.containsSubscriptionType(hash, type)) - return ResponseFactory.response426(hash, type); + if(!manager.containsSubscriptionType(key, subscription)) + return ResponseFactory.response426(key, subscription); - Object[] subscription = manager.subscribe(hash, type); + Object[] response = manager.subscribe(key, subscription); - if((Boolean)subscription[0]) + if((Boolean)response[0]) return ResponseFactory.response200(String.format("%s", "true")); - return ResponseFactory.response427(hash, type, (String)subscription[1]); + return ResponseFactory.response427(key, response[0].toString(), response[1].toString()); } public Response processRQST(Packet packet) { - if(packet.getData().equals("")) - return ResponseFactory.response500("ExternalStreamHandler", "key, query"); - - if(packet.getSubData().equals("")) - return ResponseFactory.response500("ExternalStreamHandler", "destination"); + String validate; + if((validate = packet.validate("key", "request", "query", "destination")) != null) + return ResponseFactory.response500("ExternalStreamHandler", validate); - String[] data = packet.getData().split(Config.getProperty("app", "general.internal.delim")); + String key = packet.getData("key"); + String request = packet.getData("request"); - // standard request length is 2: - // key,.,request - if(data.length != 2) - return ResponseFactory.response501("Request is not formatted properly."); - - if(data[0].isEmpty() || data[1].isEmpty()) - return ResponseFactory.response500("ExternalStreamHandler", "key, query"); - - String hash = data[0]; - String[] request = data[1].split(Config.getProperty("app", "general.collection.delim")); - if(!manager.containsStream(hash)) - return ResponseFactory.response421(hash); + if(!manager.containsStream(key)) + return ResponseFactory.response421(key); - if(!manager.isStreamReady(hash)) - return ResponseFactory.response423(hash); + if(!manager.isStreamReady(key)) + return ResponseFactory.response423(key); - if(!manager.containsRequestType(hash, request[1])) - return ResponseFactory.response428(hash, request[1]); + if(!manager.containsRequestType(key, request)) + return ResponseFactory.response428(key, request); - Object[] response; - if(request.length == 2) - response = manager.request(hash, packet.getSubData(), request[1]); - else - response = manager.request(hash, packet.getSubData(), request[1], request[2]); + Object[] response = manager.request(key, packet.getData()); if((Boolean)response[0]) return ResponseFactory.response200(String.format("%s", (String)response[1])); - return ResponseFactory.response429(hash, request[0], (String)response[1]); + return ResponseFactory.response429(key, request, response[1].toString()); } public Response processTYPE(Packet packet) { - if(packet.getData().equals("")) - return ResponseFactory.response500("ExternalStreamHandler", "key"); + String validate; + if((validate = packet.validate("key")) != null) + return ResponseFactory.response500("ExternalStreamHandler", validate); - String key = packet.getData(); + String key = packet.getData("key"); if(!manager.containsStream(key)) return ResponseFactory.response421(key); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java index 78d45c4b..681a0a4d 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java @@ -67,7 +67,7 @@ private void reflect() throws InstantiationException, IllegalAccessException, Il Reflections reflection = new Reflections("org.stream.external.connected.connections"); Set> types = reflection.getSubTypesOf(ExternalStreamConnection.class); for(Class c : types) - templates.put(c.getDeclaredConstructor(ExternalStreamManager.class, String.class).newInstance(this, "").getUUID(), c); + templates.put(c.getDeclaredConstructor(ExternalStreamManager.class, HashMap.class).newInstance(this, new HashMap()).getUUID(), c); } /** @@ -81,12 +81,12 @@ private void reflect() throws InstantiationException, IllegalAccessException, Il * stream will be unable to authorize. * @return String containing the hashed {@code data}. */ - protected String getHash(String template, String data) { + protected String getHash(String template, HashMap data) { if(!templates.containsKey(template)) return null; try { - return templates.get(template).getDeclaredConstructor(ExternalStreamManager.class, String.class).newInstance(this, "").getHash(data); + return templates.get(template).getDeclaredConstructor(ExternalStreamManager.class, HashMap.class).newInstance(this, new HashMap()).getHash(data); } catch (Exception e) { e.printStackTrace(); } @@ -191,13 +191,13 @@ protected boolean isStreamActive(String hash) { * the action was successful. The second object will return a {@link String} which contains the generated hash of the new stream. * If the initialization is unsuccessful, the second object will be {@code null}. */ - protected Object[] addStream(String type, String data) { + protected Object[] addStream(String type, HashMap data) { if(!templates.containsKey(type)) return new Object[] {false, null}; ExternalStreamConnection stream = null; try { - stream = templates.get(type).getDeclaredConstructor(ExternalStreamManager.class, String.class).newInstance(this, data); + stream = templates.get(type).getDeclaredConstructor(ExternalStreamManager.class, HashMap.class).newInstance(this, data); streams.put(stream.getHash(), stream); } catch (Exception e) { e.printStackTrace(); @@ -312,7 +312,7 @@ protected boolean containsRequestType(String hash, String type) { * @param request Request data used for processing the single request. * @return Returns a string object containing all data returned by the request. */ - protected Object[] request(String hash, String destination, String request) { + protected Object[] request(String hash, HashMap request) { if(!streams.containsKey(hash)) return new Object[] {false, null}; @@ -320,30 +320,7 @@ protected Object[] request(String hash, String destination, String request) { if(!stream.isAuthorized() || !stream.isReady()) return new Object[] {false, null}; - return stream.request(destination, request); - } - - /** - * Sends a data request from the stream with the given hash. This request is in the form of a single - * (typically REST API) request, which will then return a series of data presented. - *
- * If a stream with the given hash does not exist, this function returns false. - * - * @param hash Hash of the stream returned by the {@link ExternalStreamConnection#getHash(String)} function. - * @param destination Destination of the request to be processed by the {@link ExternalStreamManager#process(String, String, String)} function. - * @param request Request data used for processing the single request. - * @param date Date of the {@code request} parameter to be accessed. - * @return Returns a string object containing all data returned by the request. - */ - protected Object[] request(String hash, String destination, String request, String date) { - if(!streams.containsKey(hash)) - return new Object[] {false, null}; - - ExternalStreamConnection stream = streams.get(hash); - if(!stream.isAuthorized() || !stream.isReady()) - return new Object[] {false, null}; - - return stream.request(destination, request, date); + return stream.request(request); } /** @@ -398,11 +375,11 @@ protected boolean killStream(String hash) { * @param data Data sent by the given subscription. */ private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.getProperty("app", "general.data.dateformat")); - protected void processSubscription(String hash, String subscription, String destination, String data) { + protected void processSubscription(String hash, String subscription, String data) { // define subscribed date String date = LocalDate.now().format(formatter); String collection = subscription + Config.getProperty("app", "general.collection.delim") + date; - Response lsh_response = handler.send("LSH", "PUSH", String.format("%s%s%s", data, Config.getProperty("app", "general.internal.delim"), collection)); + Response lsh_response = handler.send("LSH", "PUSH", "data", data, "collection", collection); if(lsh_response.code() != 200) Logger.warn(lsh_response); } @@ -415,8 +392,8 @@ protected void processSubscription(String hash, String subscription, String dest * @param request Request which the data was received by. * @param data Data sent by the given subscription. */ - protected void processRequest(String hash, String request, String destination, String data) { - Response lsh_response = handler.send("LSH", "PUSH", String.format("%s%s%s", data, Config.getProperty("app", "general.internal.delim"), request)); + protected void processRequest(String hash, String request, String data) { + Response lsh_response = handler.send("LSH", "PUSH", "request", request, "data", data); if(lsh_response.code() != 200) Logger.warn(lsh_response); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java index 958b8de0..18983fcb 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java @@ -18,10 +18,11 @@ public LocalStreamHandler() { } public Response processINIT(Packet packet) { - if(packet.getData().isEmpty()) - return ResponseFactory.response500("LocalStreamHandler", "source"); + String validate; + if((validate = packet.validate("source")) != null) + return ResponseFactory.response500("LocalStreamHandler", validate); - String source = packet.getData(); + String source = packet.getData("source"); if(!manager.containsTemplate(source)) return ResponseFactory.response440(source); @@ -42,16 +43,17 @@ public Response processINIT(Packet packet) { } public Response processSCAN(Packet packet) { - if(packet.getData().isEmpty()) - return ResponseFactory.response500("LocalStreamHandler", "query"); + String validate; + if((validate = packet.validate("query")) != null) + return ResponseFactory.response500("LocalStreamHandler", validate); if(!manager.isReady()) return ResponseFactory.response441(manager.streamType()); - String[] query = packet.getData().split(Config.getProperty("stream", "mongodb.query.delim")); + String[] query = packet.getData("query").split(Config.getProperty("stream", "mongodb.query.delim")); if(!manager.validate(query)) - return ResponseFactory.response445(manager.streamType(), packet.getData()); + return ResponseFactory.response445(manager.streamType(), packet.getData("query")); if(manager.scan(query)) return ResponseFactory.response200("true"); @@ -61,53 +63,57 @@ public Response processSCAN(Packet packet) { //public Response process public Response processRQST(Packet packet) { - if(packet.getData().isEmpty()) - return ResponseFactory.response500("LocalStreamHandler", "request, query"); + String validate; + if((validate = packet.validate("uuid", "request", "query", "destination")) != null) + return ResponseFactory.response500("LocalStreamHandler", validate); - if(packet.getSubData().isEmpty()) - return ResponseFactory.response500("LocalStreamHandler", "destination"); - - String[] data = packet.getData().split(Config.getProperty("app", "general.internal.delim")); - String[] query; + String[] query = packet.getData("query").split(Config.getProperty("app", "general.data.delim")); // standard query - if(data.length == 1) { - query = data[0].split(Config.getProperty("app", "general.data.delim")); - } - - // date query - else if(data.length == 2) { - query = data[1].split(Config.getProperty("app", "general.data.delim")); - int collection_index = manager.getParameterTranslation(query[0], "collection"); - if(collection_index == -1) - return ResponseFactory.response501("Collection index is invalid."); - - // insert dated request into collection index - query[collection_index] = data[0].trim(); - +// if(data.length == 1) { +// query = data[0].split(Config.getProperty("app", "general.data.delim")); +// } +// +// // date query +// else if(data.length == 2) { +// query = data[1].split(Config.getProperty("app", "general.data.delim")); +// int collection_index = manager.getParameterTranslation(query[0], "collection"); +// if(collection_index == -1) +// return ResponseFactory.response501("Collection index is invalid."); +// +// // insert dated request into collection index +// query[collection_index] = data[0].trim(); +// +// } + + String request; + String delim = Config.getProperty("app", "general.collection.delim"); + if(packet.containsKey("date")) { + request = packet.getData("uuid" + delim + packet.getData("request") + delim + packet.getData("date")); + } else { + request = packet.getData("uuid" + delim + packet.getData("request")); } - // invalid internal language - else - return ResponseFactory.response502(); - if(query == null) return ResponseFactory.response501("Query array was null when attempting to process."); + if(request == null || request.isEmpty()) + return ResponseFactory.response501("Request was null when attempting to process."); + if(!manager.validate(query)) - return ResponseFactory.response445(manager.streamType(), packet.getData()); + return ResponseFactory.response445(manager.streamType(), packet.getData("query")); System.out.println(Arrays.toString(query)); if(!manager.scan(query)) - return ResponseFactory.response446(manager.streamType(), packet.getData()); + return ResponseFactory.response446(manager.streamType(), packet.getData("query")); Set output = manager.get(query); if(output == null) - return ResponseFactory.response447(manager.streamType(), packet.getData()); + return ResponseFactory.response447(manager.streamType(), packet.getData("query")); Response response; for(String line : output) { - response = send("SRC", "EDAT", String.format("%s", line), packet.getSubData()); + response = send("SRC", "EDAT", "data", line, "destination", packet.getData("destination")); if(response.code() != 200) return response; } @@ -116,29 +122,30 @@ else if(data.length == 2) { } public Response processSTAT(Packet packet) { - if(packet.getData().isEmpty()) - return ResponseFactory.response500("LocalStreamHandler", "query"); + String validate; + if((validate = packet.validate("query")) != null) + return ResponseFactory.response500("LocalStreamHandler", validate); if(!manager.isReady()) return ResponseFactory.response441(manager.streamType()); - String[] query = packet.getData().split(Config.getProperty("stream", "mongodb.query.delim")); + String[] query = packet.getData("query").split(Config.getProperty("stream", "mongodb.query.delim")); if(!manager.validate(query)) - return ResponseFactory.response445(manager.streamType(), packet.getData()); + return ResponseFactory.response445(manager.streamType(), packet.getData("query")); if(!manager.scan(query)) - return ResponseFactory.response446(manager.streamType(), packet.getData()); + return ResponseFactory.response446(manager.streamType(), packet.getData("query")); DataState state = manager.state(query); if(state == DataState.INVALID) - return ResponseFactory.response448(manager.streamType(), packet.getData()); + return ResponseFactory.response448(manager.streamType(), packet.getData("query")); return ResponseFactory.response200(state.toString()); } - public Response processMODI(Packet packet) { - return ResponseFactory.response501(); +// public Response processMODI(Packet packet) { +// return ResponseFactory.response501(); // if(packet.getData().isEmpty()) // return ResponseFactory.response500("LocalStreamHandler", "query"); // @@ -151,23 +158,19 @@ public Response processMODI(Packet packet) { // if(!manager.scan(packet.getData())) // return ResponseFactory.response446(manager.streamType(), packet.getData()); // - } +// } public Response processPUSH(Packet packet) { // data format: data, location... - if(packet.getData().isEmpty()) - return ResponseFactory.response500("LocalStreamHandler", "data, collection"); - - String[] data = packet.getData().split(Config.getProperty("app", "general.internal.delim")); - if(data.length != 2) - return ResponseFactory.response500("LocalStreamHandler", "collection"); - String collection = data[1]; + String validate; + if((validate = packet.validate("data", "collection")) != null) + return ResponseFactory.response500("LocalStreamHandler", validate); if(!manager.isReady()) return ResponseFactory.response441(manager.streamType()); - if(!manager.push(data[0], collection)) - return ResponseFactory.response449(manager.streamType(), data[0], collection); + if(!manager.push(packet.getData("data"), packet.getData("collection"))) + return ResponseFactory.response449(manager.streamType(), packet.getData("data"), packet.getData("collection")); return ResponseFactory.response200(); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java index 45a00550..7e5d83e2 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java @@ -2,6 +2,7 @@ import java.time.LocalDate; import java.time.format.DateTimeFormatter; +import java.util.HashMap; import java.util.List; import java.util.stream.Collectors; @@ -18,136 +19,133 @@ public StreamRegistryController() { } public Response processEXSR(Packet packet) { - return send("ESH", "EXSR", packet.getData(), packet.getSubData()); + return send("ESH", "EXSR", packet.getData()); } public Response processEXST(Packet packet) { - return send("ESH", "EXST", packet.getData(), packet.getSubData()); + return send("ESH", "EXST", packet.getData()); } public Response processINIT(Packet packet) { - return send("ESH", "INIT", packet.getData(), packet.getSubData()); + return send("ESH", "INIT", packet.getData()); } public Response processIATH(Packet packet) { - return send("ESH", "IATH", packet.getData(), packet.getSubData()); + return send("ESH", "IATH", packet.getData()); } public Response processIATV(Packet packet) { - return send("ESH", "IATV", packet.getData(), packet.getSubData()); + return send("ESH", "IATV", packet.getData()); } public Response processEXEC(Packet packet) { - return send("ESH", "EXEC", packet.getData(), packet.getSubData()); + return send("ESH", "EXEC", packet.getData()); } public Response processKILL(Packet packet) { - return send("ESH", "KILL", packet.getData(), packet.getSubData()); + return send("ESH", "KILL", packet.getData()); } public Response processSUBS(Packet packet) { - return send("ESH", "SUBS", packet.getData(), packet.getSubData()); + return send("ESH", "SUBS", packet.getData()); } public Response processEDAT(Packet packet) { - return send("OUT", "EDAT", packet.getData(), packet.getSubData()); + return send("OUT", "EDAT", packet.getData()); } public Response processSCAN(Packet packet) { - return send("LSH", "SCAN", packet.getData(), packet.getSubData()); + return send("LSH", "SCAN", packet.getData()); } private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.getProperty("app", "general.data.dateformat")); public Response processRQST(Packet packet) { // Define passed properties. Types: - // Dated: data=key,.,start_date,.,end_date,.,request,.,query - // subdata=destination - // Not Dated: data=key,.,request,.,query - // subdata=destination + // Dated: data=key, start_date, end_date, request, query, destination + // Not Dated: data=key, request, query, destination - if(packet.getData().isEmpty()) - return ResponseFactory.response500("StreamRegistryController", "key, start_date, end_date, request, query"); + String validate; + if((validate = packet.validate("key", "request", "query", "destination")) != null) + return ResponseFactory.response500("ExternalStreamHandler", validate); - if(packet.getSubData().isEmpty()) - return ResponseFactory.response500("StreamRegistryController", "destination"); - - String[] data = packet.getData().split(Config.getProperty("app", "general.internal.delim")); + // check to see if request is dated + boolean dated = false; + if(packet.containsKey("start_date") || packet.containsKey("end_date")) { + if((validate = packet.validate("start_date", "end_date")) != null) + return ResponseFactory.response500("StreamRegistryController", validate); + + dated = true; + } - for(String d : data) - if(d.isEmpty()) - return ResponseFactory.response500("StreamRegistryController", "key, start_date, end_date, request, query"); + // extract packet data + HashMap data = packet.getData(); - // retrieve stream key - Response type_response = send("ESH", "TYPE", data[0]); + // retrieve stream type based on key + Response type_response = send("ESH", "TYPE", data); if(type_response.code() != 200) return type_response; + // retrieve uuid of the data String uuid = type_response.data(); + data.put("uuid", uuid); // not dated - if(data.length == 3) { - Response lsh_response = send("LSH", "RQST", data[2]); + if(!dated) { + Response lsh_response = send("LSH", "RQST", data); + // if data does not exist send request to external stream handler if(lsh_response.code() == 446) - return send("ESH", "RQST", format(data[0], data[1]), packet.getSubData()); - return lsh_response; + send("ESH", "RQST", data); + return send("LSH", "RQST", data); } // dated - else if(data.length == 5) { + else if(dated) { try { - LocalDate start = LocalDate.parse(data[1], formatter); - LocalDate end = LocalDate.parse(data[2], formatter); + LocalDate start = LocalDate.parse(packet.getData("start_date"), formatter); + LocalDate end = LocalDate.parse(packet.getData("end_date"), formatter); List dates = start.datesUntil(end).collect(Collectors.toList()); // invalid date processing if(dates.isEmpty()) throw new Exception(); - String request; Response lsh_response, esh_response; for(LocalDate date : dates) { // perform requests - request = uuid + Config.getProperty("app", "general.collection.delim") + data[3] + Config.getProperty("app", "general.collection.delim") + date.format(formatter); - lsh_response = send("LSH", "RQST", format(request, data[4]), packet.getSubData()); + + // initial request + data.put("date", date.format(formatter)); + lsh_response = send("LSH", "RQST", data); if(lsh_response.code() == 200) continue; + // if data does not exist if(lsh_response.code() == 446) { - esh_response = send("ESH", "RQST", format(data[0], request), packet.getSubData()); + esh_response = send("ESH", "RQST", data); if(esh_response.code() != 200) return esh_response; - } else { + } + + // invalid request + else { return lsh_response; } - - lsh_response = send("LSH", "RQST", format(request, data[4]), packet.getSubData()); + // request again + lsh_response = send("LSH", "RQST", data); if(lsh_response.code() != 200) return lsh_response; } } catch(Exception e) { - return ResponseFactory.response503(Config.getProperty("app", "general.data.dateformat"), data[1], data[2]); + return ResponseFactory.response503(Config.getProperty("app", "general.data.dateformat"), packet.getData("start_date"), packet.getData("end_date")); } } // invalid query else { - return ResponseFactory.response500("StreamRegistryController", "key, start_date, end_date, request, query"); + return ResponseFactory.response501("Boolean set to non binary value in cache."); } return ResponseFactory.response200(); } - - private static String format(String... objects) { - StringBuilder out = new StringBuilder(); - String delim = Config.getProperty("app", "general.internal.delim"); - for(int i = 0; i < objects.length; i++) { - out.append(objects[i].trim()); - if(i != objects.length - 1) - out.append(delim); - } - - return out.toString(); - - } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java index 957d7c71..5bae1cab 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java @@ -14,133 +14,136 @@ public class TestSRC { static { // disable loggers LogManager.getRootLogger().setLevel(Level.OFF); - Config.setProperty("output", "consumer.types", "null"); - Config.setProperty("output", "producer.types", "null"); - Config.setProperty("output", "local.stream.type", "null"); + Config.setProperty("stream", "general.consumer.types", "null"); + Config.setProperty("stream", "general.producer.types", "null"); + Config.setProperty("stream", "local.stream.type", "null"); } @Test public void TestEXSR() { Core core = new Core(); - assertEquals(200, core.send("SRC", "EXSR", "external_template").code()); - assertEquals("true", core.send("SRC", "EXSR", "external_template").data()); - assertEquals("false", core.send("SRC", "EXSR", "template").data()); + assertEquals(200, core.send("SRC", "EXSR", "source", "external_template").code()); + assertEquals("true", core.send("SRC", "EXSR", "source", "external_template").data()); + assertEquals("false", core.send("SRC", "EXSR", "source", "template").data()); - assertEquals(500, core.send("SRC", "EXSR", "").code()); + assertEquals(500, core.send("SRC", "EXSR", "source", "").code()); + assertEquals(500, core.send("SRC", "EXSR", "dne", "").code()); } @Test public void TestEXST() { Core core = new Core(); - assertEquals(200, core.send("SRC", "INIT", "external_template, key").code()); + assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); - assertEquals("true", core.send("SRC", "EXST", "key").data()); - assertEquals("false", core.send("SRC", "EXST", "key1").data()); + assertEquals("true", core.send("SRC", "EXST", "key", "key").data()); + assertEquals("false", core.send("SRC", "EXST", "key", "key1").data()); - assertEquals(500, core.send("SRC", "EXST", "").code()); + assertEquals(500, core.send("SRC", "EXST", "key", "").code()); + assertEquals(500, core.send("SRC", "EXST", "dne", "").code()); } @Test public void TestINIT() { Core core = new Core(); // successful authorization - Response valid = core.send("SRC", "INIT", "external_template, key"); + Response valid = core.send("SRC", "INIT", "source", "external_template", "key", "key"); assertEquals(200, valid.code()); - assertEquals("true, key", valid.data()); - assertEquals("true", core.send("SRC", "EXST", "key").data()); + assertEquals("key", valid.data()); + assertEquals("true", core.send("SRC", "EXST", "key", "key").data()); - assertEquals(220, core.send("SRC", "INIT", "external_template, key").code()); + assertEquals(220, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); - assertEquals(420, core.send("SRC", "INIT", "does_not_exist").code()); - assertEquals(422, core.send("SRC", "INIT", "external_template, wrong").code()); - assertEquals(422, core.send("SRC", "INIT", "external_template").code()); - assertEquals(500, core.send("SRC", "INIT", "").code()); + assertEquals(420, core.send("SRC", "INIT", "source", "does_not_exist").code()); + assertEquals(422, core.send("SRC", "INIT", "source", "external_template", "key", "wrong").code()); + assertEquals(422, core.send("SRC", "INIT", "source", "external_template").code()); + assertEquals(500, core.send("SRC", "INIT", "dne", "").code()); } @Test public void TestIATH() { Core core = new Core(); - assertEquals(200, core.send("SRC", "INIT", "external_template, key").code()); + assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); - assertEquals(200, core.send("SRC", "IATH", "key").code()); - assertEquals("true", core.send("SRC", "IATH", "key").data()); + assertEquals(200, core.send("SRC", "IATH", "key", "key").code()); + assertEquals("true", core.send("SRC", "IATH", "key", "key").data()); - assertEquals(421, core.send("SRC", "IATH", "does_not_exist").code()); - assertEquals(500, core.send("SRC", "IATH", "").code()); + assertEquals(421, core.send("SRC", "IATH", "key", "does_not_exist").code()); + assertEquals(500, core.send("SRC", "IATH", "key", "").code()); } @Test public void TestIATV() { Core core = new Core(); - assertEquals(200, core.send("SRC", "INIT", "external_template, key").code()); - assertEquals("false", core.send("SRC", "IATV", "key").data()); - assertEquals(200, core.send("SRC", "EXEC", "key").code()); - assertEquals("true", core.send("SRC", "IATV", "key").data()); + assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); + assertEquals("false", core.send("SRC", "IATV", "key", "key").data()); + assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); + assertEquals("true", core.send("SRC", "IATV", "key", "key").data()); - assertEquals(421, core.send("SRC", "IATV", "does_not_exist").code()); - assertEquals(500, core.send("SRC", "IATV", "").code()); + assertEquals(421, core.send("SRC", "IATV", "key", "does_not_exist").code()); + assertEquals(500, core.send("SRC", "IATV", "key", "").code()); } @Test public void TestEXEC() { Core core = new Core(); - assertEquals(200, core.send("SRC", "INIT", "external_template, key").code()); - assertEquals(200, core.send("SRC", "INIT", "external_template, not_ready, true").code()); + assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); + assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "not_ready", "override", "true").code()); - assertEquals(200, core.send("SRC", "EXEC", "key").code()); - assertEquals("true", core.send("SRC", "IATH", "key").data()); - assertEquals("true", core.send("SRC", "IATV", "key").data()); + assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); + assertEquals("true", core.send("SRC", "IATH", "key", "key").data()); + assertEquals("true", core.send("SRC", "IATV", "key", "key").data()); - assertEquals(421, core.send("SRC", "EXEC", "does_not_exist").code()); - assertEquals(423, core.send("SRC", "EXEC", "not_ready").code()); - assertEquals(424, core.send("SRC", "EXEC", "key").code()); + assertEquals(421, core.send("SRC", "EXEC", "key", "does_not_exist").code()); + assertEquals(423, core.send("SRC", "EXEC", "key", "not_ready").code()); + assertEquals(424, core.send("SRC", "EXEC", "key", "key").code()); } @Test public void TestKILL() { Core core = new Core(); - assertEquals(200, core.send("SRC", "INIT", "external_template, key").code()); - assertEquals(200, core.send("SRC", "EXEC", "key").code()); + assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); + assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); - assertEquals("true", core.send("SRC", "IATV", "key").data()); - assertEquals("true", core.send("SRC", "KILL", "key").data()); - assertEquals("false", core.send("SRC", "IATV", "key").data()); + assertEquals("true", core.send("SRC", "IATV", "key", "key").data()); + assertEquals("true", core.send("SRC", "KILL", "key", "key").data()); + assertEquals("false", core.send("SRC", "IATV", "key", "key").data()); - assertEquals(421, core.send("SRC", "KILL", "does_not_exist").code()); - assertEquals(425, core.send("SRC", "KILL", "key").code()); - assertEquals(500, core.send("SRC", "KILL", "").code()); + assertEquals(421, core.send("SRC", "KILL", "key", "does_not_exist").code()); + assertEquals(425, core.send("SRC", "KILL", "key", "key").code()); + assertEquals(500, core.send("SRC", "KILL", "key", "").code()); } @Test public void TestSUBS() { Core core = new Core(); - assertEquals(200, core.send("SRC", "INIT", "external_template, key").code()); - assertEquals(200, core.send("SRC", "EXEC", "key").code()); + assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); + assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); - assertEquals("true", core.send("SRC", "SUBS", "key, correct").data()); + assertEquals("true", core.send("SRC", "SUBS", "key", "key", "subscription", "correct").data()); - assertEquals(421, core.send("SRC", "SUBS", "does_not_exist, correct").code()); - assertEquals(200, core.send("SRC", "KILL", "key").code()); - assertEquals(425, core.send("SRC", "SUBS", "key, correct").code()); - assertEquals(200, core.send("SRC", "EXEC", "key").code()); - assertEquals(426, core.send("SRC", "SUBS", "key, does_not_exist").code()); - assertEquals(427, core.send("SRC", "SUBS", "key, irregular").code()); + assertEquals(421, core.send("SRC", "SUBS", "key", "does_not_exist", "subscription", "correct").code()); + assertEquals(200, core.send("SRC", "KILL", "key", "key").code()); + assertEquals(425, core.send("SRC", "SUBS", "key", "key", "subscription", "correct").code()); + assertEquals(200, core.send("SRC", "EXEC", "key", "key").code()); + assertEquals(426, core.send("SRC", "SUBS", "key", "key", "subscription", "does_not_exist").code()); + assertEquals(427, core.send("SRC", "SUBS", "key", "key", "subscription", "irregular").code()); } @Test public void TestRQST() { Core core = new Core(); - assertEquals(200, core.send("SRC", "INIT", "external_template, key").code()); - assertEquals(200, core.send("SRC", "INIT", "external_template, not_ready, true").code()); + assertEquals(200, core.send("SRC", "INIT", "source", "external_template", "key", "key").code()); + assertEquals(200, core.send("SRC", "INIT", "external_template", "key", "not_ready").code()); + assertEquals(200, core.send("SRC", "RQST", "key, correct", "destination").code()); assertEquals(200, core.send("SRC", "RQST", "key, correct", "destination").code()); assertEquals(421, core.send("SRC", "RQST", "does_not_exist, correct", "destination").code()); diff --git a/DeFi-Data-Engine/Rest Application/pom.xml b/DeFi-Data-Engine/Rest Application/pom.xml index 9644e635..4937af40 100644 --- a/DeFi-Data-Engine/Rest Application/pom.xml +++ b/DeFi-Data-Engine/Rest Application/pom.xml @@ -11,14 +11,13 @@ org.out.connections rest-connection 4.3.3 - war + jar restconnection Rest Connection for DeFi Data Engine - 18 - 1.18 - 1.18 - 2.6.0 + 17 + 17 + 17 0.8.3.Final @@ -53,8 +52,6 @@ - - @@ -63,5 +60,4 @@ - diff --git a/DeFi-Data-Engine/Rest Application/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/Rest Application/src/main/java/org/properties/Config.java index 6d51c933..90708126 100644 --- a/DeFi-Data-Engine/Rest Application/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/Rest Application/src/main/java/org/properties/Config.java @@ -1,7 +1,5 @@ package org.properties; -import java.io.File; -import java.io.FileInputStream; import java.util.HashMap; import java.util.Properties; @@ -10,23 +8,40 @@ public class Config { private static final HashMap properties; static { +// properties = new HashMap(); +// +// String[] files = new File("config").list(); +// +// for(String file : files) { +// if(file.lastIndexOf(".properties") == file.length() - 11) { +// String name = file.substring(0, file.length() - 11); +// properties.put(name, new Properties()); +// +// try (FileInputStream in = new FileInputStream("config/" + file)) { +// properties.get(name).load(in); +// } catch(Exception e) { +// e.printStackTrace(); +// System.exit(1); +// } +// } +// } + properties = new HashMap(); - String[] files = new File("src/main/resources/config").list(); + Properties app_properties = new Properties(); + app_properties.put("", ""); + app_properties.put("general.internal.delim", ":::"); + app_properties.put("general.data.delim", ","); + app_properties.put("general.collection.delim", "="); + app_properties.put("general.transfer.delim", "&&&"); + app_properties.put("general.data.dateformat", "yyyy-MM-dd"); + app_properties.put("spring.server.port", "8080"); + app_properties.put("rest.socket.address", "localhost"); + app_properties.put("rest.socket.port", "61100"); + app_properties.put("rest.socket.key", "rest-key-reserved"); + properties.put("app", app_properties); - for(String file : files) { - if(file.lastIndexOf(".properties") == file.length() - 11) { - String name = file.substring(0, file.length() - 11); - properties.put(name, new Properties()); - - try (FileInputStream in = new FileInputStream("src/main/resources/config/" + file)) { - properties.get(name).load(in); - } catch(Exception e) { - e.printStackTrace(); - System.exit(1); - } - } - } + //properties.put("testing", new Properties()); } public static final Properties getProperties(String name) { diff --git a/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/RestApplication.java b/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/RestApplication.java index c37eb161..19e0d62a 100644 --- a/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/RestApplication.java +++ b/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/RestApplication.java @@ -4,7 +4,7 @@ import java.net.UnknownHostException; import java.util.Collections; -import org.properties.Config; + import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -14,7 +14,7 @@ public class RestApplication { public static void main(String[] args) throws InterruptedException, UnknownHostException, IOException { SpringApplication app = new SpringApplication(RestApplication.class); app.setDefaultProperties(Collections.singletonMap( - "server.port", Config.getProperty("app", "spring.server.port"))); + "server.port", "8080")); app.run(args); } } \ No newline at end of file diff --git a/DeFi-Data-Engine/Testing Environment/pom.xml b/DeFi-Data-Engine/Testing Environment/pom.xml index bdff32dc..e38f29b7 100644 --- a/DeFi-Data-Engine/Testing Environment/pom.xml +++ b/DeFi-Data-Engine/Testing Environment/pom.xml @@ -40,6 +40,11 @@ spring-kafka 2.9.0 + + org.json + json + 20220320 + diff --git a/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/SocketConnectionTest.java b/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/SocketConnectionTest.java index 1b139929..4d447d4c 100644 --- a/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/SocketConnectionTest.java +++ b/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/SocketConnectionTest.java @@ -8,8 +8,11 @@ import java.net.Socket; import java.net.URL; import java.net.UnknownHostException; + import javax.net.SocketFactory; +import org.json.JSONObject; + public class SocketConnectionTest { public static void main(String[] args) throws UnknownHostException, IOException { @@ -33,21 +36,26 @@ public void run() { thread.start(); //TODO FIX INTEGRATION WITH REST API - System.out.println("SOURCE_EXISTS RESPONSE: " + request("http://localhost:8080/defi/v1/rest/source_exists?source=external_template")); - System.out.println("INIT RESPONSE: " + request("http://localhost:8080/defi/v1/rest/initialize?source=external_template&auth_data=key")); - System.out.println("STREAM_EXISTS RESPONSE: " + request("http://localhost:8080/defi/v1/rest/stream_exists?key=key")); - System.out.println("IS_AUTHORIZED RESPONSE: " + request("http://localhost:8080/defi/v1/rest/is_authorized?key=key")); +// System.out.println("SOURCE_EXISTS RESPONSE: " + request("http://localhost:8080/defi/v1/rest/source_exists?source=polygon")); +// System.out.println("INIT RESPONSE: " + request("http://localhost:8080/defi/v1/rest/initialize?source=polygon&auth_data=key")); +// System.out.println("STREAM_EXISTS RESPONSE: " + request("http://localhost:8080/defi/v1/rest/stream_exists?key=key")); +// System.out.println("IS_AUTHORIZED RESPONSE: " + request("http://localhost:8080/defi/v1/rest/is_authorized?key=key")); + + String rqst = request("http://localhost:8080/defi/v1/rest/initialize?source=polygon&auth_data=vtTMMRtEywJ_owkHdqoRUmv9vf2hWkrV"); + JSONObject init = new JSONObject(rqst); + System.out.println(rqst); + String hash = init.getString("data"); - System.out.println("RQST RESPONSE: " + request( + System.out.println("RESPONSE: " + request( String.format("http://localhost:8080/defi/v1/rest/request_dated?destination=%s" - + "&key=key" - + "&request=template-external-request" - + "&query=get_all,template-external-request" - + "&start_date=10-09-2022" - + "&end_date=14-09-2022", key))); + + "&key=%s" + + "&request=bar-AAPL-15m" + + "&query=get_all,bar-AAPL-15m" + + "&start_date=2022-09-20" + + "&end_date=2022-09-23", key, hash))); } - public static String request(String str, String... params) throws IOException { + public static String request(String str) throws IOException { URL obj = new URL(str); HttpURLConnection con = (HttpURLConnection) obj.openConnection(); con.setRequestMethod("GET");