diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java index 4bd7b97e..366a0ad8 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java @@ -49,32 +49,45 @@ public void run() { while(true) { String[] input = ((String)in.readUTF()).split(Config.getProperty("app", "general.transfer.delim")); - // validate length is greater than 2 - if(input.length <= 2) { - out.writeUTF(new JSONObject() - .put("response", "502") - .put("message", "Packet processed from REST API does not contain a TAG or SUB_TAG. Review REST API endpoint code.") - .toString()); - } + Thread thread = new Thread() { + public void run() { + try { + // validate length is greater than 2 + if(input.length <= 2) { + out.writeUTF(new JSONObject() + .put("response", "502") + .put("message", "Packet processed from REST API does not contain a TAG or SUB_TAG. Review REST API endpoint code.") + .toString()); + } + + // extract non-essential data + String[] data = Arrays.copyOfRange(input, 2, input.length); + String tag = input[0]; + String sub_tag = input[1]; + + // execute valid response to engine + Response response = send(tag, sub_tag, data); + out.writeUTF(new JSONObject() + .put("response", "200") + .put("code", response.code()) + .put("message", response.message()) + .put("data", response.data()) + .toString()); + out.flush(); + + } catch(SocketException e) { + System.err.println("Rest Application has unexpectedly closed."); + System.exit(1); + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); + } + } + }; - // extract non-essential data - String[] data = Arrays.copyOfRange(input, 2, input.length); - String tag = input[0]; - String sub_tag = input[1]; - - // execute valid response to engine - Response response = send(tag, sub_tag, data); - out.writeUTF(new JSONObject() - .put("response", "200") - .put("code", response.code()) - .put("message", response.message()) - .put("data", response.data()) - .toString()); + thread.start(); } - } catch(SocketException e) { - System.err.println("Rest Application has unexpectedly closed."); - System.exit(1); - } catch (IOException e) { + } catch(Exception e) { e.printStackTrace(); System.exit(1); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java index c0587a74..5b90b025 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java @@ -44,8 +44,8 @@ public class Config { stream_properties.put("rest.socket.address", "localhost"); stream_properties.put("rest.socket.port", "61100"); stream_properties.put("rest.socket.key", "rest-key-reserved"); - //stream_properties.put("output.socket.address", "defi-de.idea.rpi.edu"); - stream_properties.put("output.socket.address", "localhost"); + stream_properties.put("output.socket.address", "defi-de.idea.rpi.edu"); + //stream_properties.put("output.socket.address", "localhost"); stream_properties.put("output.socket.port", "61200"); stream_properties.put("local.stream.type", "mongo_db"); stream_properties.put("mongodb.properties.uri", "mongodb://localhost:27017"); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/amberdata/AmberDataRequestHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/amberdata/AmberDataRequestHandler.java index 0fdda531..b9ec167e 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/amberdata/AmberDataRequestHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/amberdata/AmberDataRequestHandler.java @@ -20,6 +20,7 @@ public class AmberDataRequestHandler { private volatile static OkHttpClient client; private final static HashMap requests; + private static final int request_size = 999; private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.getProperty("app", "general.data.dateformat")); @@ -91,55 +92,7 @@ public static Object[] requestAaveProtocolDated(AmberDataRequestPacket packet) { packet.getData("date") + "T01:00:00", tmr + "T01:00:00"); - return requestAaveProtocolDated(packet, url); - } - - private static final int request_size = 999; - private static Object[] requestAaveProtocolDated(AmberDataRequestPacket packet, String url) { - OkHttpClient client = new OkHttpClient(); - - Request request = new Request.Builder() - .url(url + "&size=" + request_size) - .get() - .addHeader("accept", "application/json") - .addHeader("x-api-key", packet.getKey()) - .build(); - - okhttp3.Response response; - try { - response = client.newCall(request).execute(); - JSONObject json = new JSONObject(response.body().string()); - - if(json.toString().equals("") || !json.has("description")) - return new Object[] {false, "JSON Object returned empty or invalid contents."}; - - if(!response.isSuccessful() || json.getInt("status") != 200 - || !json.getString("description").equals("Successful request")) - return new Object[]{false, json.getString("description")}; - - if(!json.has("payload") || !json.getJSONObject("payload").has("data")) - return new Object[] {false, "Malformed Aave packet"}; - - JSONObject payload = json.getJSONObject("payload"); - JSONArray arr = payload.getJSONArray("data"); - for(int i = 0; i < arr.length(); i++) { - processAaveJsonRequest(packet, arr.getJSONObject(i)); - } - - if(arr.length() >= request_size) { - JSONObject metadata = payload.getJSONObject("metadata"); - if(metadata.has("next")) - return requestAaveProtocolDated(packet, metadata.getString("next")); - else - return new Object[] {false, "Response did not contain next API cursor"}; - } else { - return new Object[] {true, ""}; - } - } catch (IOException e) { - e.printStackTrace(); - ResponseFactory.responseNotHandled("Unhandled exception cost."); - return new Object[] {false, null}; - } + return requestAaveURL(packet, url); } public static Object[] requestAaveAssetDated(AmberDataRequestPacket packet) { @@ -155,10 +108,18 @@ public static Object[] requestAaveAssetDated(AmberDataRequestPacket packet) { packet.getData("date") + "T01:00:00", tmr + "T01:00:00"); + return requestAaveURL(packet, url, packet.getData("asset")); + } + + private static Object[] requestAaveURL(AmberDataRequestPacket packet, String url) { + return requestAaveURL(packet, url, null); + } + + private static Object[] requestAaveURL(AmberDataRequestPacket packet, String url, String request_specifier) { OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() - .url(url) + .url(url + "&size=" + request_size) .get() .addHeader("accept", "application/json") .addHeader("x-api-key", packet.getKey()) @@ -181,10 +142,23 @@ public static Object[] requestAaveAssetDated(AmberDataRequestPacket packet) { JSONObject payload = json.getJSONObject("payload"); JSONArray arr = payload.getJSONArray("data"); - for(int i = 0; i < arr.length(); i++) - processAaveJsonRequest(packet, arr.getJSONObject(i), packet.getData("asset")); + for(int i = 0; i < arr.length(); i++) { + if(request_specifier == null) { + processAaveJsonRequest(packet, arr.getJSONObject(i)); + } else { + processAaveJsonRequest(packet, arr.getJSONObject(i), request_specifier); + } + } - return new Object[] {true, ""}; + if(arr.length() >= request_size) { + JSONObject metadata = payload.getJSONObject("metadata"); + if(metadata.has("next")) + return requestAaveURL(packet, metadata.getString("next")); + else + return new Object[] {false, "Response did not contain next API cursor"}; + } else { + return new Object[] {true, ""}; + } } catch (IOException e) { e.printStackTrace(); ResponseFactory.responseNotHandled("Unhandled exception cost."); diff --git a/DeFi-Data-Engine/Executables/DeFi-Data-Engine.jar b/DeFi-Data-Engine/Executables/DeFi-Data-Engine.jar new file mode 100644 index 00000000..53654496 Binary files /dev/null and b/DeFi-Data-Engine/Executables/DeFi-Data-Engine.jar differ diff --git a/DeFi-Data-Engine/Executables/rest-connection-4.3.3.jar b/DeFi-Data-Engine/Executables/rest-connection-4.3.3.jar new file mode 100644 index 00000000..7c15f46f Binary files /dev/null and b/DeFi-Data-Engine/Executables/rest-connection-4.3.3.jar differ diff --git a/DeFi-Data-Engine/Rest Application/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/Rest Application/src/main/java/org/properties/Config.java index 3cfee27f..7bb68a6b 100644 --- a/DeFi-Data-Engine/Rest Application/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/Rest Application/src/main/java/org/properties/Config.java @@ -36,8 +36,8 @@ public class Config { app_properties.put("general.transfer.delim", "&&&"); app_properties.put("general.data.dateformat", "yyyy-MM-dd"); app_properties.put("spring.server.port", "8080"); - app_properties.put("spring.server.address", "localhost"); - //app_properties.put("spring.server.address", "defi-de.idea.rpi.edu"); + //app_properties.put("spring.server.address", "localhost"); + app_properties.put("spring.server.address", "defi-de.idea.rpi.edu"); app_properties.put("rest.socket.address", "localhost"); app_properties.put("rest.socket.port", "61100"); app_properties.put("rest.socket.key", "rest-key-reserved"); diff --git a/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/LocalTest.java b/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/LocalTest.java index 029c5180..1b3a2b35 100644 --- a/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/LocalTest.java +++ b/DeFi-Data-Engine/Testing Environment/src/test/connection/socket/LocalTest.java @@ -15,7 +15,7 @@ public class LocalTest { - private static final String host = "localhost"; + private static final String host = "defi-de.idea.rpi.edu"; public static void main(String[] args) throws UnknownHostException, IOException { final Socket socket = SocketFactory.getDefault().createSocket(host, 61200); @@ -45,7 +45,7 @@ public void run() { JSONObject json_init = new JSONObject(init); String key = json_init.getString("data"); - String rqst = request(String.format("http://%s:8080/defi/v1/rest/request_dated?" + String rqst_protocol_dated = String.format("http://%s:8080/defi/v1/rest/request_dated?" + "destination=%s&" + "key=%s&" + "request=request,aave-protocol-dated&" @@ -56,19 +56,22 @@ public void run() { destination, key, "2022-08-01", - "2022-08-02")); -// String rqst = request(String.format("http://%s:8080/defi/v1/rest/request_dated?" -// + "destination=%s&" -// + "key=%s&" -// + "request=aave-protocol-dated&" -// + "query=aave-protocol-dated&" -// + "start_date=%s&" -// + "end_date=%s", -// host, -// destination, -// key, -// "2022-08-01", -// "2022-08-02")); + "2022-08-02"); + + String rqst_asset_dated = String.format("http://%s:8080/defi/v1/rest/request_dated?" + + "destination=%s&" + + "key=%s&" + + "request=request,aave-protocol-dated&" + + "query=aave-protocol-dated&" + + "start_date=%s&" + + "end_date=%s", + host, + destination, + key, + "2022-08-01", + "2022-08-02"); + + String rqst = request(rqst_asset_dated); System.out.println(rqst); }