From d3a58c644795d88de5f3f6d10396df95596ab115 Mon Sep 17 00:00:00 2001 From: Conor Flynn Date: Mon, 27 Mar 2023 23:39:30 -0400 Subject: [PATCH] Update for empty collection edge case and stability check --- DeFi-Data-Engine/DeFi Data Engine/pom.xml | 8 ++++++ .../org/out/producers/SocketProducer.java | 2 -- .../java/org/out/socket/SocketManager.java | 2 ++ .../src/main/java/org/properties/Config.java | 4 +-- .../handler/ExternalStreamManager.java | 9 +++++- .../requests/ExternalRequestREST.java | 5 ++++ .../mongodb/MongoDatabaseRequestHandler.java | 24 ++++++++++------ .../amberdata-sushiswap-protocol.properties | 28 +++++++++++++++++++ 8 files changed, 69 insertions(+), 13 deletions(-) create mode 100644 DeFi-Data-Engine/DeFi Data Engine/src/main/resources/requests/amberdata-sushiswap-protocol.properties diff --git a/DeFi-Data-Engine/DeFi Data Engine/pom.xml b/DeFi-Data-Engine/DeFi Data Engine/pom.xml index 988695ce..972a27ff 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/pom.xml +++ b/DeFi-Data-Engine/DeFi Data Engine/pom.xml @@ -46,6 +46,14 @@ junit 4.13.2 + + + + com.datastax.oss + java-driver-core + 4.9.0 + + diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java index 0d8c1102..6c70167d 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java @@ -26,13 +26,11 @@ protected boolean init() { listener = new Thread() { public void run() { while(true) { - System.out.println("accepting..."); String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port")), producer); if(key == null) { System.err.println("SocketProducer: Could not create connection to socket port."); System.exit(1); } - System.out.println("accepted!"); manager.add(new SocketDestination(key, SocketManager.write(key))); } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java index ff5f48f6..062c7e84 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java @@ -205,6 +205,8 @@ private static final String readLine(DataInputStream in) throws IOException { StringBuilder out = new StringBuilder(); char c = 0; while((c = (char)in.read()) != 10) { + if(out.length() > 200_000) + break; out.append(c); } return out.toString(); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java index 2c273ca8..cbf67676 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java @@ -16,8 +16,8 @@ public class Config { app_properties.put("general.collection.delim", "="); app_properties.put("general.transfer.delim", "&&&"); app_properties.put("general.data.dateformat", "yyyy-MM-dd"); - app_properties.put("general.logging.packets", "true"); - app_properties.put("general.logging.responses", "true"); + app_properties.put("general.logging.packets", "false"); + app_properties.put("general.logging.responses", "false"); properties.put("app", app_properties); Properties stream_properties = new Properties(); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java index e9cc21d3..3f66083d 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java @@ -88,9 +88,16 @@ protected Object[] request(String type, HashMap data, String sta } public void processRequest(String collection, HashMap data) { - if(data == null || data.isEmpty()) + // invalid hashmap + if(data == null) return; + // if empty then submit blank push + if(data.isEmpty()) { + handler.send("LSH", "PUSH", "collection", collection, "data", "<<>>"); + return; + } + StringBuilder sb = new StringBuilder(); for(String key : data.keySet()) sb.append(key.replaceAll(",", ".") + "," + data.get(key).replaceAll(",", ".") + ","); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestREST.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestREST.java index 618d562f..91ad7edc 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestREST.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestREST.java @@ -118,6 +118,11 @@ else if(obj.has(path[i])) { return "Data array retrieval had fatal error, killing process."; } + // if data is empty push empty data point + if(data.length() == 0) { + manager.processRequest(getCollection(), new HashMap()); + } + // extract and print data for(int i = 0; i < data.length(); i++) { HashMap point = parse(data.getJSONObject(i)); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java index 09d2286c..eca8ccf9 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java @@ -2,14 +2,13 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map.Entry; import java.util.Set; import org.bson.Document; -import org.core.logger.Logger; +import org.bson.conversions.Bson; import org.properties.Config; import com.mongodb.client.MongoCollection; @@ -153,7 +152,6 @@ else if(translations.get(query[0]).containsKey("collection")) } public final static boolean containsCollection(MongoDatabase db, String[] query) { - Logger.log(Arrays.toString(query)); if(!query[0].equals("contains_collection")) return false; @@ -243,19 +241,29 @@ public final static Set getItem(MongoDatabase db, String... query) { } public final static boolean push(MongoDatabase db, String data, String collection_name) { + // retrieve created collection MongoCollection collection = db.getCollection(collection_name); String[] split_data = data.split(Config.getProperty("app", "general.data.delim")); // validate that every type has an id associated. - if(split_data.length % 2 != 0) + if(split_data.length % 2 != 0 && !data.equals("<<>>")) return false; Document document = new Document(); - for(int i = 0; i < split_data.length; i+=2) - document.append(split_data[i], split_data[i + 1]); - document.append("_timestamp", System.nanoTime()); - collection.insertOne(document); + // validate not empty + if(!data.equals("<<>>")) { + for(int i = 0; i < split_data.length; i+=2) + document.append(split_data[i], split_data[i + 1]); + document.append("_timestamp", System.nanoTime()); + collection.insertOne(document); + } else { + // create temporary object then remove + document.append("_empty", "holder"); + collection.insertOne(document); + Bson holder = Filters.eq("_empty", "holder"); + collection.deleteOne(holder); + } return true; } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/resources/requests/amberdata-sushiswap-protocol.properties b/DeFi-Data-Engine/DeFi Data Engine/src/main/resources/requests/amberdata-sushiswap-protocol.properties new file mode 100644 index 00000000..59f53b4c --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/resources/requests/amberdata-sushiswap-protocol.properties @@ -0,0 +1,28 @@ +request.name= amberdata-sushiswap-protocol + +url.base= https://web3api.io/api/v2/defi/dex/sushiswap/protocol + +url.properties= size,900 + +url.headers= accept,application/json,\ + x-api-key,. + +data.path= payload,\ + data + +recursion.type= rest + +recursion.tags= -l,900,\ + -t,url + +recursion.location= payload,metadata,next + +date.valid= true + +date.location= properties + +date.start= startDate + +date.end= endDate + +date.format= yyyy-MM-dd \ No newline at end of file