diff --git a/DeFi-Data-Engine/DeFi Data Engine/pom.xml b/DeFi-Data-Engine/DeFi Data Engine/pom.xml
index 988695ce..972a27ff 100644
--- a/DeFi-Data-Engine/DeFi Data Engine/pom.xml
+++ b/DeFi-Data-Engine/DeFi Data Engine/pom.xml
@@ -46,6 +46,14 @@
junit
4.13.2
+
+
+
+ com.datastax.oss
+ java-driver-core
+ 4.9.0
+
+
diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java
index 0d8c1102..6c70167d 100644
--- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java
+++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java
@@ -26,13 +26,11 @@ protected boolean init() {
listener = new Thread() {
public void run() {
while(true) {
- System.out.println("accepting...");
String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port")), producer);
if(key == null) {
System.err.println("SocketProducer: Could not create connection to socket port.");
System.exit(1);
}
- System.out.println("accepted!");
manager.add(new SocketDestination(key, SocketManager.write(key)));
}
}
diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java
index ff5f48f6..062c7e84 100644
--- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java
+++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java
@@ -205,6 +205,8 @@ private static final String readLine(DataInputStream in) throws IOException {
StringBuilder out = new StringBuilder();
char c = 0;
while((c = (char)in.read()) != 10) {
+ if(out.length() > 200_000)
+ break;
out.append(c);
}
return out.toString();
diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java
index 2c273ca8..cbf67676 100644
--- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java
+++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java
@@ -16,8 +16,8 @@ public class Config {
app_properties.put("general.collection.delim", "=");
app_properties.put("general.transfer.delim", "&&&");
app_properties.put("general.data.dateformat", "yyyy-MM-dd");
- app_properties.put("general.logging.packets", "true");
- app_properties.put("general.logging.responses", "true");
+ app_properties.put("general.logging.packets", "false");
+ app_properties.put("general.logging.responses", "false");
properties.put("app", app_properties);
Properties stream_properties = new Properties();
diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java
index e9cc21d3..3f66083d 100644
--- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java
+++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java
@@ -88,9 +88,16 @@ protected Object[] request(String type, HashMap data, String sta
}
public void processRequest(String collection, HashMap data) {
- if(data == null || data.isEmpty())
+ // invalid hashmap
+ if(data == null)
return;
+ // if empty then submit blank push
+ if(data.isEmpty()) {
+ handler.send("LSH", "PUSH", "collection", collection, "data", "<<>>");
+ return;
+ }
+
StringBuilder sb = new StringBuilder();
for(String key : data.keySet())
sb.append(key.replaceAll(",", ".") + "," + data.get(key).replaceAll(",", ".") + ",");
diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestREST.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestREST.java
index 618d562f..91ad7edc 100644
--- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestREST.java
+++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestREST.java
@@ -118,6 +118,11 @@ else if(obj.has(path[i])) {
return "Data array retrieval had fatal error, killing process.";
}
+ // if data is empty push empty data point
+ if(data.length() == 0) {
+ manager.processRequest(getCollection(), new HashMap());
+ }
+
// extract and print data
for(int i = 0; i < data.length(); i++) {
HashMap point = parse(data.getJSONObject(i));
diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java
index 09d2286c..eca8ccf9 100644
--- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java
+++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java
@@ -2,14 +2,13 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import org.bson.Document;
-import org.core.logger.Logger;
+import org.bson.conversions.Bson;
import org.properties.Config;
import com.mongodb.client.MongoCollection;
@@ -153,7 +152,6 @@ else if(translations.get(query[0]).containsKey("collection"))
}
public final static boolean containsCollection(MongoDatabase db, String[] query) {
- Logger.log(Arrays.toString(query));
if(!query[0].equals("contains_collection"))
return false;
@@ -243,19 +241,29 @@ public final static Set getItem(MongoDatabase db, String... query) {
}
public final static boolean push(MongoDatabase db, String data, String collection_name) {
+ // retrieve created collection
MongoCollection collection = db.getCollection(collection_name);
String[] split_data = data.split(Config.getProperty("app", "general.data.delim"));
// validate that every type has an id associated.
- if(split_data.length % 2 != 0)
+ if(split_data.length % 2 != 0 && !data.equals("<<>>"))
return false;
Document document = new Document();
- for(int i = 0; i < split_data.length; i+=2)
- document.append(split_data[i], split_data[i + 1]);
- document.append("_timestamp", System.nanoTime());
- collection.insertOne(document);
+ // validate not empty
+ if(!data.equals("<<>>")) {
+ for(int i = 0; i < split_data.length; i+=2)
+ document.append(split_data[i], split_data[i + 1]);
+ document.append("_timestamp", System.nanoTime());
+ collection.insertOne(document);
+ } else {
+ // create temporary object then remove
+ document.append("_empty", "holder");
+ collection.insertOne(document);
+ Bson holder = Filters.eq("_empty", "holder");
+ collection.deleteOne(holder);
+ }
return true;
}
}
diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/resources/requests/amberdata-sushiswap-protocol.properties b/DeFi-Data-Engine/DeFi Data Engine/src/main/resources/requests/amberdata-sushiswap-protocol.properties
new file mode 100644
index 00000000..59f53b4c
--- /dev/null
+++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/resources/requests/amberdata-sushiswap-protocol.properties
@@ -0,0 +1,28 @@
+request.name= amberdata-sushiswap-protocol
+
+url.base= https://web3api.io/api/v2/defi/dex/sushiswap/protocol
+
+url.properties= size,900
+
+url.headers= accept,application/json,\
+ x-api-key,.
+
+data.path= payload,\
+ data
+
+recursion.type= rest
+
+recursion.tags= -l,900,\
+ -t,url
+
+recursion.location= payload,metadata,next
+
+date.valid= true
+
+date.location= properties
+
+date.start= startDate
+
+date.end= endDate
+
+date.format= yyyy-MM-dd
\ No newline at end of file