diff --git a/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx b/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx index c9026799..f6cb70fd 100644 Binary files a/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx and b/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx differ diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java index 68b0bdfd..4b849f56 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java @@ -8,6 +8,10 @@ public class Logger { + public static final void log(String message) { + System.out.println(messageFormat("INFO", message)); + } + public static final void log(Packet packet) { System.out.println(packetFormat(packet)); } @@ -16,6 +20,10 @@ public static final void log(Response response) { System.out.println(responseFormat(response)); } + public static final void warn(String message) { + System.out.println(messageFormat("WARN", message)); + } + public static final void warn(Packet packet) { System.err.println(packetFormat(packet)); } @@ -24,6 +32,10 @@ public static final void warn(Response response) { System.err.println(responseFormat(response)); } + public static final void terminate(String message) { + System.err.println(messageFormat("ERROR", message)); + } + public static final void terminate(Packet packet) { System.err.println(packetFormat(packet)); System.exit(1); @@ -34,6 +46,14 @@ public static final void terminate(Response response) { System.exit(1); } + private static final String messageFormat(String type, String message) { + return String.format("[%s] [%-10s] %-9s- [%s]", + time(), + Thread.currentThread().getName(), + type, + message); + } + private static final String packetFormat(Packet packet) { return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%4s] [%s]", time(), diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java index 050e0e2c..0bbc2991 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java @@ -1,6 +1,7 @@ package org.framework.router; import java.util.Arrays; +import java.util.HashMap; public class ResponseFactory { @@ -27,7 +28,7 @@ public static Response response200(String data) { } public static Response response220(String hash) { - return Response.create(220, String.format("Stream with generated hash <%s> already exists. Using existing stream for connections.", hash), String.format("true, %s", hash)); + return Response.create(220, String.format("Stream with generated hash <%s> already exists. Using existing stream for connections.", hash), String.format("%s", hash)); } public static Response response400(String router) { @@ -96,6 +97,10 @@ public static Response response429(String hash, String request, String response) return Response.create(429, String.format("Stream with hash <%s> returned an irregular response when attempting to subscribe to <%s>. Response returned is: <%s>", hash, request, response)); } + public static Response response430(HashMap data) { + return Response.create(430, String.format("Stream hash could not be generated with the given properties: <%s>", data)); + } + public static Response response440(String source) { return Response.create(440, String.format("Requested data source <%s> does not exist in cache.", source)); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java index 59ea4084..4bd7b97e 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java @@ -59,7 +59,6 @@ public void run() { // extract non-essential data String[] data = Arrays.copyOfRange(input, 2, input.length); - String tag = input[0]; String sub_tag = input[1]; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java index e32aa95f..7148a737 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java @@ -9,6 +9,8 @@ import java.util.HashSet; import java.util.UUID; +import org.core.logger.Logger; + public class SocketManager { private static final HashMap servers = new HashMap(); @@ -53,6 +55,8 @@ public synchronized static String accept(int port) { out.write(key.getBytes()); out.write(10); + Logger.log("Successfully connected to external socket. Key <" + key + ">"); + if(connections.containsKey(key)) connections.get(key).close(); @@ -100,6 +104,8 @@ public synchronized static boolean accept(int port, String required_key) { if(!synced(key)) throw new Exception("Connection inflow and outflow not synchronized."); + + Logger.log("Successfully connected to reserved socket. Key <" + key + ">"); return true; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java index c3d199fa..46bd8dc7 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java @@ -44,7 +44,7 @@ public class Config { stream_properties.put("rest.socket.address", "localhost"); stream_properties.put("rest.socket.port", "61100"); stream_properties.put("rest.socket.key", "rest-key-reserved"); - stream_properties.put("output.socket.address", "localhost"); + stream_properties.put("output.socket.address", "defi-de.idea.rpi.edu"); stream_properties.put("output.socket.port", "61200"); stream_properties.put("local.stream.type", "mongo_db"); stream_properties.put("mongodb.properties.uri", "mongodb://localhost:27017"); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/amberdata/AmberDataRequestHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/amberdata/AmberDataRequestHandler.java index 87116925..b53b038b 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/amberdata/AmberDataRequestHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/amberdata/AmberDataRequestHandler.java @@ -1,10 +1,17 @@ package org.stream.external.connected.amberdata; +import java.io.IOException; import java.lang.reflect.Method; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.Calendar; import java.util.HashMap; +import org.core.logger.Logger; import org.framework.router.ResponseFactory; +import org.json.JSONArray; import org.json.JSONObject; +import org.properties.Config; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -23,31 +30,23 @@ public class AmberDataRequestHandler { try { // add all methods to the handler - requests.put("lending-latest", classobj.getMethod("requestLendingLatest", String.class, String.class)); - + requests.put("lending-latest", classobj.getMethod("requestLendingLatest", AmberDataRequestPacket.class)); + requests.put("aave-protocol-dated", classobj.getMethod("requestAaveProtocolDated", AmberDataRequestPacket.class)); } catch (Exception e) { e.printStackTrace(); ResponseFactory.responseNotHandled("Irregular method call for AmberDataRequestHandler."); } } - public static Object[] request(String key, String request) { - String data = request; - int splitIndex = data.indexOf(','); - String type = ""; - if(splitIndex != -1) { - type = data.substring(0, splitIndex).trim(); - data = data.substring(splitIndex + 1).trim(); - } else { - type = data.trim(); - data = ""; - } - - if(!requests.containsKey(type)) - return new Object[] {false, null}; + public static Object[] request(AmberDataRequestPacket packet) { + if(!requests.containsKey(packet.getData("request"))) + return new Object[] {false, "Request does not exist."}; + + if(packet.getKey() == null) + return new Object[] {false, " cannot be null."}; try { - return (Object[])requests.get(type).invoke(null, key, data); + return (Object[])requests.get(packet.getData("request")).invoke(null, packet); } catch (Exception e) { e.printStackTrace(); ResponseFactory.responseNotHandled("Irregular method call for AmberDataRequestHandler."); @@ -55,12 +54,12 @@ public static Object[] request(String key, String request) { } } - public static Object[] requestLendingLatest(String key, String param) { + public static Object[] requestLendingLatest(AmberDataRequestPacket packet) { Request request = new Request.Builder() .url("https://web3api.io/api/v2/market/defi/lending/exchanges/aave/latest") .get() .addHeader("Accept", "application/json") - .addHeader("x-api-key", key) + .addHeader("x-api-key", packet.getData("key")) .build(); try { @@ -79,4 +78,194 @@ public static Object[] requestLendingLatest(String key, String param) { return new Object[] {false, null}; } } + + private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.getProperty("app", "general.data.dateformat")); + public static Object[] requestAaveProtocolDated(AmberDataRequestPacket packet) { + LocalDate next = LocalDate.parse(packet.getData("date"), formatter); + next = next.plusDays(1); + String tmr = next.format(formatter); + + String url = String.format("https://web3api.io/api/v2/defi/lending/aavev2/protocol?startDate=%s&endDate=%s", + packet.getData("date") + "T01:00:00", + tmr + "T01:00:00"); + + OkHttpClient client = new OkHttpClient(); + + Request request = new Request.Builder() + .url(url) + .get() + .addHeader("accept", "application/json") + .addHeader("x-api-key", "UAK7ed69235426c360be22bfc2bde1809b6") + .build(); + + okhttp3.Response response; + try { + response = client.newCall(request).execute(); + JSONObject json = new JSONObject(response.body().string()); + + if(json.toString().equals("") || !json.has("description")) + return new Object[] {false, "JSON Object returned empty or invalid contents."}; + + if(!response.isSuccessful() || json.getInt("status") != 200 + || !json.getString("description").equals("Successful request")) + return new Object[]{false, json.getString("description")}; + + if(!json.has("payload") || !json.getJSONObject("payload").has("data")) + return new Object[] {false, "Malformed Aave packet"}; + + JSONArray arr = json.getJSONObject("payload").getJSONArray("data"); + for(int i = 0; i < arr.length(); i++) { + JSONObject obj = arr.getJSONObject(i); + String action = obj.getString("action"); + switch(action) { + + case "UseReserveAsCollateral": + packet.getConnection().processRequest( + packet.getData("request"), + packet.getData("date"), + format("action", obj.getString("action"), + "timestamp", obj.get("timestamp").toString(), + "blockNumber", obj.getLong("blockNumber"), + "transactionHash", obj.getString("transactionHash"), + "logIndex", obj.getInt("logIndex"), + "assetId", obj.getString("assetId"), + "assetSymbol", obj.getString("assetSymbol"), + "marketId", obj.getString("marketId"), + "market", obj.getString("market"), + "reserveAsCollateralEnabled", obj.getBoolean("reserveAsCollateralEnabled"), + "user", obj.getString("user"))); + break; + + case "Deposit": + packet.getConnection().processRequest( + packet.getData("request"), + packet.getData("date"), + format("action", obj.getString("action"), + "timestamp", obj.get("timestamp").toString(), + "blockNumber", obj.getLong("blockNumber"), + "transactionHash", obj.getString("transactionHash"), + "logIndex", obj.getInt("logIndex"), + "assetId", obj.getString("assetId"), + "assetSymbol", obj.getString("assetSymbol"), + "marketId", obj.getString("marketId"), + "market", obj.getString("market"), + "user", obj.getString("user"), + "onBehalfOf", obj.getString("onBehalfOf"))); + break; + + case "Withdraw": + packet.getConnection().processRequest( + packet.getData("request"), + packet.getData("date"), + format("action", obj.getString("action"), + "timestamp", obj.get("timestamp").toString(), + "blockNumber", obj.getLong("blockNumber"), + "transactionHash", obj.getString("transactionHash"), + "logIndex", obj.getInt("logIndex"), + "assetId", obj.getString("assetId"), + "assetSymbol", obj.getString("assetSymbol"), + "marketId", obj.getString("marketId"), + "market", obj.getString("market"), + "amount", obj.get("amount").toString(), + "user", obj.getString("user"), + "to", obj.getString("to"))); + break; + + case "LiquidationCall": + packet.getConnection().processRequest( + packet.getData("request"), + packet.getData("date"), + format("action", obj.getString("action"), + "timestamp", obj.get("timestamp").toString(), + "blockNumber", obj.getLong("blockNumber"), + "transactionHash", obj.getString("transactionHash"), + "logIndex", obj.getInt("logIndex"), + "collateralAssetId", obj.getString("collateralAssetId"), + "collateralAssetSymbol", obj.getString("collateralAssetSymbol"), + "principalAssetId", obj.getString("principalAssetId"), + "principalAssetSymbol", obj.getString("principalAssetSymbol"), + "principalAmount", obj.get("principalAmount").toString(), + "marketId", obj.getString("marketId"), + "market", obj.getString("market"), + "liquidator", obj.getString("liquidator"))); + break; + + case "Repay": + packet.getConnection().processRequest( + packet.getData("request"), + packet.getData("date"), + format("action", obj.getString("action"), + "timestamp", obj.get("timestamp").toString(), + "blockNumber", obj.getLong("blockNumber"), + "transactionHash", obj.getString("transactionHash"), + "logIndex", obj.getInt("logIndex"), + "assetId", obj.getString("assetId"), + "assetSymbol", obj.getString("assetSymbol"), + "marketId", obj.getString("marketId"), + "market", obj.getString("market"), + "amount", obj.get("amount").toString(), + "user", obj.getString("user"), + "repayer", obj.getString("repayer"))); + break; + + case "Borrow": + packet.getConnection().processRequest( + packet.getData("request"), + packet.getData("date"), + format("action", obj.getString("action"), + "timestamp", obj.get("timestamp").toString(), + "blockNumber", obj.getLong("blockNumber"), + "transactionHash", obj.getString("transactionHash"), + "logIndex", obj.getInt("logIndex"), + "assetId", obj.getString("assetId"), + "assetSymbol", obj.getString("assetSymbol"), + "marketId", obj.getString("marketId"), + "market", obj.getString("market"), + "amount", obj.get("amount").toString(), + "borrowRate", obj.get("borrowRate").toString(), + "debt", obj.get("debt"), + "user", obj.getString("user"), + "onBehalfOf", obj.getString("onBehalfOf"))); + break; + + case "FlashLoan": + packet.getConnection().processRequest( + packet.getData("request"), + packet.getData("date"), + format("action", obj.getString("action"), + "timestamp", obj.get("timestamp").toString(), + "blockNumber", obj.getLong("blockNumber"), + "transactionHash", obj.getString("transactionHash"), + "logIndex", obj.getInt("logIndex"), + "assetId", obj.getString("assetId"), + "assetSymbol", obj.getString("assetSymbol"), + "marketId", obj.getString("marketId"), + "market", obj.getString("market"), + "amount", obj.get("amount").toString(), + "borrowRate", obj.get("totalFee").toString(), + "target", obj.get("target"), + "initiator", obj.getString("initiator"))); + + default: + Logger.warn("Unrecognized transaction type in : " + action); + } + } + + return new Object[] {true, ""}; + } catch (IOException e) { + e.printStackTrace(); + ResponseFactory.responseNotHandled("Unhandled exception cost."); + return new Object[] {false, null}; + } + } + + private static String format(Object... data) { + StringBuilder out = new StringBuilder(); + for(int i = 0; i < data.length; i++) { + out.append(data[i].toString()); + if(i != data.length - 1) + out.append(","); + } + return out.toString(); + } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/amberdata/AmberDataRequestPacket.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/amberdata/AmberDataRequestPacket.java new file mode 100644 index 00000000..d6af5459 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/amberdata/AmberDataRequestPacket.java @@ -0,0 +1,37 @@ +package org.stream.external.connected.amberdata; + +import java.util.HashMap; + +import org.stream.external.connected.connections.AmberDataConnection; + +public class AmberDataRequestPacket { + + private final String key; + private final AmberDataConnection connection; + private final HashMap request; + + private AmberDataRequestPacket(String key, AmberDataConnection connection, HashMap request) { + this.key = key; + this.connection = connection; + this.request = request; + } + + public final String getKey() { + return key; + } + + public final AmberDataConnection getConnection() { + return connection; + } + + public final String getData(String key) { + if(!request.containsKey(key)) + return null; + + return request.get(key); + } + + public static AmberDataRequestPacket create(String key, AmberDataConnection connection, HashMap request) { + return new AmberDataRequestPacket(key, connection, request); + } +} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java index cdd57bdb..99bd8cdb 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java @@ -4,9 +4,12 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.HashMap; +import java.util.HashSet; import org.framework.router.ResponseFactory; import org.json.JSONObject; +import org.stream.external.connected.amberdata.AmberDataRequestHandler; +import org.stream.external.connected.amberdata.AmberDataRequestPacket; import org.stream.external.handler.ExternalStreamConnection; import org.stream.external.handler.ExternalStreamManager; @@ -15,6 +18,13 @@ public class AmberDataConnection extends ExternalStreamConnection { + private static final HashSet requestTypes; + +static { + requestTypes = new HashSet<>(); + requestTypes.add("aave-protocol-dated"); +} + private boolean authorized = false; public AmberDataConnection(ExternalStreamManager manager, HashMap data) { @@ -29,7 +39,11 @@ public void init() { } - public String getHash(String data) { + public String getHash(HashMap data) { + if(!data.containsKey("key")) { + return null; + } + try { MessageDigest md = MessageDigest.getInstance("SHA-512"); byte[] bytes = md.digest(("salt" + data).getBytes()); @@ -105,12 +119,6 @@ public boolean stop() { return false; } - @Override - public String getHash(HashMap data) { - // TODO Auto-generated method stub - return null; - } - @Override public boolean containsSubscriptionType(String type) { // TODO Auto-generated method stub @@ -119,19 +127,11 @@ public boolean containsSubscriptionType(String type) { @Override public boolean containsRequestType(String type) { - // TODO Auto-generated method stub - return false; + return requestTypes.contains(type); } - + @Override public Object[] request(HashMap request) { - // TODO Auto-generated method stub - return null; - } - - @Override - public Object[] request(HashMap request, String date) { - // TODO Auto-generated method stub - return null; + return AmberDataRequestHandler.request(AmberDataRequestPacket.create(data.get("key"), this, request)); } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java index b42e47c4..5396f5de 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java @@ -95,10 +95,4 @@ public Object[] request(HashMap request) { return new Object[] {false, ""}; } - - @Override - public Object[] request(HashMap request, String date) { - // TODO Auto-generated method stub - return null; - } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamConnection.java index c934d673..5f9af17f 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamConnection.java @@ -27,9 +27,7 @@ public ExternalStreamConnection(ExternalStreamManager manager, HashMap request); - public abstract Object[] request(HashMap request, String date); public abstract boolean start(); public abstract boolean stop(); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java index 2be7a9af..ba6ca306 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java @@ -33,7 +33,6 @@ public Response processEXST(Packet packet) { return ResponseFactory.response200(String.format("%s", manager.containsStream(packet.getData("key")))); } - @SuppressWarnings("deprecation") // source: source of the given stream // auth_data[]: all keys associated with data public Response processINIT(Packet packet) { @@ -45,6 +44,8 @@ public Response processINIT(Packet packet) { // validate data String tempHash = manager.getHash(source, packet.getData()); + if(tempHash == null) + return ResponseFactory.response430(packet.getData()); if(manager.containsStream(tempHash)) return ResponseFactory.response220(tempHash); @@ -166,7 +167,11 @@ public Response processRQST(Packet packet) { return ResponseFactory.response428(key, request); Object[] response = manager.request(key, packet.getData()); - + + if(response == null) + return ResponseFactory.response501("Response from Manager.request should never be null. " + + "Improper implementation of ExternalStreamConnection."); + if((Boolean)response[0]) return ResponseFactory.response200(String.format("%s", (String)response[1])); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java index e6766035..ce1b8b8f 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java @@ -216,7 +216,6 @@ protected Object[] addStream(String type, HashMap data) { * @param hash Hash of the stream returned by the {@link ExternalStreamConnection#getHash(String)} function. * @return Boolean determining if the removal was successful. */ - @Deprecated protected boolean removeStream(String hash) { if(!streams.containsKey(hash)) return false; @@ -392,8 +391,8 @@ protected void processSubscription(String hash, String subscription, String data * @param request Request which the data was received by. * @param data Data sent by the given subscription. */ - protected void processRequest(String request, String data) { - Response lsh_response = handler.send("LSH", "PUSH", "request", request, "data", data); + protected void processRequest(String collection, String data) { + Response lsh_response = handler.send("LSH", "PUSH", "collection", collection, "data", data); if(lsh_response.code() != 200) Logger.warn(lsh_response); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java index 11a337e3..5cb1ce31 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java @@ -6,6 +6,7 @@ public abstract class LocalStreamConnection implements UUID { + @SuppressWarnings("unused") private final LocalStreamManager manager; public LocalStreamConnection(LocalStreamManager manager) { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java index 49f41cab..5d651632 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java @@ -1,6 +1,5 @@ package org.stream.local.handler; -import java.util.Arrays; import java.util.Set; import org.framework.router.Packet; import org.framework.router.Response; @@ -67,42 +66,26 @@ public Response processRQST(Packet packet) { if((validate = packet.validate("uuid", "request", "query", "destination")) != null) return ResponseFactory.response500("LocalStreamHandler", validate); - String[] query = packet.getData("query").split(Config.getProperty("app", "general.data.delim")); - - // standard query -// if(data.length == 1) { -// query = data[0].split(Config.getProperty("app", "general.data.delim")); -// } -// -// // date query -// else if(data.length == 2) { -// query = data[1].split(Config.getProperty("app", "general.data.delim")); -// int collection_index = manager.getParameterTranslation(query[0], "collection"); -// if(collection_index == -1) -// return ResponseFactory.response501("Collection index is invalid."); -// -// // insert dated request into collection index -// query[collection_index] = data[0].trim(); -// -// } - String request; String delim = Config.getProperty("app", "general.collection.delim"); if(packet.containsKey("date")) { - request = "uuid" + delim + packet.getData("request") + delim + packet.getData("date"); + request = packet.getData("uuid") + delim + packet.getData("request") + delim + packet.getData("date"); } else { - request = "uuid" + delim + packet.getData("request"); + request = packet.getData("uuid") + delim + packet.getData("request"); } - if(query == null) - return ResponseFactory.response501("Query array was null when attempting to process."); + //String[] query = packet.getData("query").split(Config.getProperty("app", "general.data.delim")); + String[] query = new String[] {"get_all", request}; + +// if(query == null) +// return ResponseFactory.response501("Query array was null when attempting to process."); if(request == null || request.isEmpty()) return ResponseFactory.response501("Request was null when attempting to process."); if(!manager.validate(query)) return ResponseFactory.response445(manager.streamType(), packet.getData("query")); - System.out.println(Arrays.toString(query)); + if(!manager.scan(query)) return ResponseFactory.response446(manager.streamType(), packet.getData("query")); @@ -144,22 +127,6 @@ public Response processSTAT(Packet packet) { return ResponseFactory.response200(state.toString()); } -// public Response processMODI(Packet packet) { -// return ResponseFactory.response501(); -// if(packet.getData().isEmpty()) -// return ResponseFactory.response500("LocalStreamHandler", "query"); -// -// if(!manager.isReady()) -// return ResponseFactory.response441(manager.streamType()); -// -// if(!manager.validate(packet.getData())) -// return ResponseFactory.response445(manager.streamType(), packet.getData()); -// -// if(!manager.scan(packet.getData())) -// return ResponseFactory.response446(manager.streamType(), packet.getData()); -// -// } - public Response processPUSH(Packet packet) { // data format: data, location... String validate; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java index 7e5d83e2..e76ba263 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/registry/StreamRegistryController.java @@ -136,6 +136,9 @@ else if(dated) { return lsh_response; } + // send end response + return send("SRC", "EDAT", "data", "<<>>", "destination", packet.getData("destination")); + } catch(Exception e) { return ResponseFactory.response503(Config.getProperty("app", "general.data.dateformat"), packet.getData("start_date"), packet.getData("end_date")); } @@ -145,7 +148,5 @@ else if(dated) { else { return ResponseFactory.response501("Boolean set to non binary value in cache."); } - - return ResponseFactory.response200(); } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/stream/external/connected/connections/TestAmberDataConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/stream/external/connected/connections/TestAmberDataConnection.java index cb6b21fe..b0c41ae4 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/stream/external/connected/connections/TestAmberDataConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/stream/external/connected/connections/TestAmberDataConnection.java @@ -4,6 +4,7 @@ import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Scanner; import org.apache.log4j.Level; @@ -12,6 +13,7 @@ import org.framework.router.Response; import org.junit.BeforeClass; import org.junit.Test; +import org.stream.external.connected.amberdata.AmberDataRequestHandler; public class TestAmberDataConnection { @@ -31,7 +33,7 @@ public static void init() throws FileNotFoundException { s.close(); } - @Test + //@Test public void TestAuthorization() { Core core = new Core(); @@ -42,7 +44,7 @@ public void TestAuthorization() { assertEquals(422, core.send("SRC", "INIT", "amber_data, does_not_exist").code()); } - @Test + //@Test public void TestRequest() { Core core = new Core(); @@ -59,4 +61,26 @@ public void TestRequest() { Response invalid = core.send("SRC", "RQST", String.format("%s, %s", key, "does_not_exist")); assertEquals(428, invalid.code()); } + + @Test + public void TestAaveProtocolDated() throws IOException { +// HashMap params = new HashMap(); +// params.put("start_date", "2022-09-01"); +// params.put("end_date", "2022-09-05"); +// System.out.println(AmberDataRequestHandler.requestAaveProtocolDated("2022-09-01", "2022-09-05", "UAK7ed69235426c360be22bfc2bde1809b6")[1]); + //System.out.println(AmberDataRequestHandler.requestLendingLatest(apikey, "")[1]); + //System.out.println(AmberDataRequestHandler.request("aave-protocol-dated", apikey, params)[1]); + +// OkHttpClient client = new OkHttpClient(); +// +// Request request = new Request.Builder() +// .url("https://web3api.io/api/v2/defi/lending/aavev2/protocol?startDate=2022-09-01T01:00:00&endDate=2022-09-05T01:00:00") +// .get() +// .addHeader("accept", "application/json") +// .addHeader("x-api-key", "UAK7ed69235426c360be22bfc2bde1809b6") +// .build(); +// +// okhttp3.Response response = client.newCall(request).execute(); +// System.out.println(response.body().string().substring(0, 1000)); + } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/resources/test/stream/external/connected/connections/test_amberdata_config.txt b/DeFi-Data-Engine/DeFi Data Engine/src/test/resources/test/stream/external/connected/connections/test_amberdata_config.txt index 8fd4b506..dd0c70a9 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/resources/test/stream/external/connected/connections/test_amberdata_config.txt +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/resources/test/stream/external/connected/connections/test_amberdata_config.txt @@ -1 +1 @@ -UAKf8aa75af12e064b9b28f51c9a83d02ed \ No newline at end of file +UAK7ed69235426c360be22bfc2bde1809b6 \ No newline at end of file diff --git a/DeFi-Data-Engine/Rest Application/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/Rest Application/src/main/java/org/properties/Config.java index a8790e99..7bb68a6b 100644 --- a/DeFi-Data-Engine/Rest Application/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/Rest Application/src/main/java/org/properties/Config.java @@ -36,7 +36,8 @@ public class Config { app_properties.put("general.transfer.delim", "&&&"); app_properties.put("general.data.dateformat", "yyyy-MM-dd"); app_properties.put("spring.server.port", "8080"); - app_properties.put("spring.server.address", "localhost"); + //app_properties.put("spring.server.address", "localhost"); + app_properties.put("spring.server.address", "defi-de.idea.rpi.edu"); app_properties.put("rest.socket.address", "localhost"); app_properties.put("rest.socket.port", "61100"); app_properties.put("rest.socket.key", "rest-key-reserved"); diff --git a/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/Endpoint.java b/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/Endpoint.java index f3171e1e..ec545745 100644 --- a/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/Endpoint.java +++ b/DeFi-Data-Engine/Rest Application/src/main/java/org/rest/application/Endpoint.java @@ -47,7 +47,20 @@ public String getStreamExists(@RequestParam String key) { @GetMapping(path="/defi/v1/rest/initialize") public String getInitialize(@RequestParam String source, @RequestParam String auth_data) { - return request("SRC", "INIT", "source", source, "auth_data", auth_data); + String[] auth = auth_data.split(","); + if(auth.length % 2 != 0) + return new JSONObject() + .put("response", "500") + .put("message", "Malformed parameters. must be a list of " + + " pairs. Cannot process given response.") + .toString(); + + String[] params = new String[2 + auth.length]; + params[0] = "source"; + params[1] = source; + for(int i = 2; i < params.length; i++) + params[i] = auth[i - 2]; + return request("SRC", "INIT", params); } @GetMapping(path="/defi/v1/rest/is_authorized") diff --git a/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/LocalTest.java b/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/LocalTest.java new file mode 100644 index 00000000..323cafcf --- /dev/null +++ b/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/LocalTest.java @@ -0,0 +1,94 @@ +package test.connection.socket; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.Socket; +import java.net.URL; +import java.net.UnknownHostException; + +import javax.net.SocketFactory; + +import org.json.JSONObject; + +public class LocalTest { + + public static void main(String[] args) throws UnknownHostException, IOException { + final Socket socket = SocketFactory.getDefault().createSocket("localhost", 61200); + final DataInputStream in = new DataInputStream(socket.getInputStream()); + + String destination = readLine(in); + + Thread thread = new Thread() { + public void run() { + try { + while(true) { + System.out.println(readLine(in)); + } + } catch(Exception e) { + e.printStackTrace(); + System.exit(1); + } + } + }; + thread.start(); + + String init = request("http://localhost:8080/defi/v1/rest/initialize?" + + "source=amber_data&" + + "auth_data=key,UAK7ed69235426c360be22bfc2bde1809b6"); + System.out.println(init); + JSONObject json_init = new JSONObject(init); + String key = json_init.getString("data"); + + long s = System.nanoTime(); + String rqst = request(String.format("http://localhost:8080/defi/v1/rest/request_dated?" + + "destination=%s&" + + "key=%s&" + + "request=aave-protocol-dated&" + + "query=get_all,aave_protocol-dated&" + + "start_date=%s&" + + "end_date=%s", + destination, + key, + "2021-09-01", + "2022-09-01")); + long e = System.nanoTime(); + System.out.println(rqst); + System.out.println(e - s); + } + + public static String request(String str) throws IOException { + URL obj = new URL(str); + HttpURLConnection con = (HttpURLConnection) obj.openConnection(); + con.setRequestMethod("GET"); + int responseCode = con.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_OK) { // success + BufferedReader in = new BufferedReader(new InputStreamReader( + con.getInputStream())); + String inputLine; + StringBuffer response = new StringBuffer(); + + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); + } + in.close(); + + return response.toString(); + } else { + System.err.println(String.format("Request Failure code <%d> url <%s>\nmsg <%s>", responseCode, obj.toString(), con.toString())); + System.exit(1); + } + return ""; + } + + public static final String readLine(DataInputStream in) throws IOException { + StringBuilder out = new StringBuilder(); + char c = 0; + while((c = (char)in.read()) != 10) + out.append(c); + + return out.toString(); + } +} \ No newline at end of file diff --git a/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/SocketConnectionTest.java b/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/ServerTest.java similarity index 75% rename from DeFi-Data-Engine/Testing Environment/src/test/connection/socket/SocketConnectionTest.java rename to DeFi-Data-Engine/Testing Environment/src/test/connection/socket/ServerTest.java index 29df1025..fa162d29 100644 --- a/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/SocketConnectionTest.java +++ b/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/ServerTest.java @@ -11,9 +11,7 @@ import javax.net.SocketFactory; -import org.json.JSONObject; - -public class SocketConnectionTest { +public class ServerTest { public static void main(String[] args) throws UnknownHostException, IOException { final Socket socket = SocketFactory.getDefault().createSocket("defi-de.idea.rpi.edu", 61200); @@ -35,19 +33,9 @@ public void run() { } }; thread.start(); - - String rqst = request("http://128.113.28.46:8080/defi/v1/rest/initialize?source=polygon&auth_data=vtTMMRtEywJ_owkHdqoRUmv9vf2hWkrV"); - JSONObject init = new JSONObject(rqst); - System.out.println(rqst); - String hash = init.getString("data"); - System.out.println("RESPONSE: " + request( - String.format("http://128.113.28.46:8080/defi/v1/rest/request_dated?destination=%s" - + "&key=%s" - + "&request=bar-AAPL-15m" - + "&query=get_all,bar-AAPL-15m" - + "&start_date=2022-09-20" - + "&end_date=2022-09-23", key, hash))); + String sample = request("http://defi-de.idea.rpi.edu:8080/defi/v1/rest/source_exists?source=amber_data"); + System.out.println(sample); } public static String request(String str) throws IOException {