Skip to content

Commit

Permalink
Update for empty collection edge case and stability check
Browse files Browse the repository at this point in the history
  • Loading branch information
Conor Flynn committed Mar 28, 2023
1 parent 43e90aa commit e91e4e7
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 13 deletions.
8 changes: 8 additions & 0 deletions DeFi-Data-Engine/DeFi Data Engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.datastax.oss/java-driver-core -->
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.9.0</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ protected boolean init() {
listener = new Thread() {
public void run() {
while(true) {
System.out.println("accepting...");
String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port")), producer);
if(key == null) {
System.err.println("SocketProducer: Could not create connection to socket port.");
System.exit(1);
}
System.out.println("accepted!");
manager.add(new SocketDestination(key, SocketManager.write(key)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ private static final String readLine(DataInputStream in) throws IOException {
StringBuilder out = new StringBuilder();
char c = 0;
while((c = (char)in.read()) != 10) {
if(out.length() > 200_000)
break;
out.append(c);
}
return out.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ public class Config {
app_properties.put("general.collection.delim", "=");
app_properties.put("general.transfer.delim", "&&&");
app_properties.put("general.data.dateformat", "yyyy-MM-dd");
app_properties.put("general.logging.packets", "true");
app_properties.put("general.logging.responses", "true");
app_properties.put("general.logging.packets", "false");
app_properties.put("general.logging.responses", "false");
properties.put("app", app_properties);

Properties stream_properties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,16 @@ protected Object[] request(String type, HashMap<String, String> data, String sta
}

public void processRequest(String collection, HashMap<String, String> data) {
if(data == null || data.isEmpty())
// invalid hashmap
if(data == null)
return;

// if empty then submit blank push
if(data.isEmpty()) {
handler.send("LSH", "PUSH", "collection", collection, "data", "<<<empty>>>");
return;
}

StringBuilder sb = new StringBuilder();
for(String key : data.keySet())
sb.append(key.replaceAll(",", ".") + "," + data.get(key).replaceAll(",", ".") + ",");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ else if(obj.has(path[i])) {
return "Data array retrieval had fatal error, killing process.";
}

// if data is empty push empty data point
if(data.length() == 0) {
manager.processRequest(getCollection(), new HashMap<String, String>());
}

// extract and print data
for(int i = 0; i < data.length(); i++) {
HashMap<String, String> point = parse(data.getJSONObject(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;

import org.bson.Document;
import org.core.logger.Logger;
import org.bson.conversions.Bson;
import org.properties.Config;

import com.mongodb.client.MongoCollection;
Expand Down Expand Up @@ -153,7 +152,6 @@ else if(translations.get(query[0]).containsKey("collection"))
}

public final static boolean containsCollection(MongoDatabase db, String[] query) {
Logger.log(Arrays.toString(query));
if(!query[0].equals("contains_collection"))
return false;

Expand Down Expand Up @@ -243,19 +241,29 @@ public final static Set<String> getItem(MongoDatabase db, String... query) {
}

public final static boolean push(MongoDatabase db, String data, String collection_name) {
// retrieve created collection
MongoCollection<Document> collection = db.getCollection(collection_name);

String[] split_data = data.split(Config.getProperty("app", "general.data.delim"));
// validate that every type has an id associated.
if(split_data.length % 2 != 0)
if(split_data.length % 2 != 0 && !data.equals("<<<empty>>>"))
return false;

Document document = new Document();
for(int i = 0; i < split_data.length; i+=2)
document.append(split_data[i], split_data[i + 1]);

document.append("_timestamp", System.nanoTime());
collection.insertOne(document);
// validate not empty
if(!data.equals("<<<empty>>>")) {
for(int i = 0; i < split_data.length; i+=2)
document.append(split_data[i], split_data[i + 1]);
document.append("_timestamp", System.nanoTime());
collection.insertOne(document);
} else {
// create temporary object then remove
document.append("_empty", "holder");
collection.insertOne(document);
Bson holder = Filters.eq("_empty", "holder");
collection.deleteOne(holder);
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
request.name= amberdata-sushiswap-protocol

url.base= https://web3api.io/api/v2/defi/dex/sushiswap/protocol

url.properties= size,900

url.headers= accept,application/json,\
x-api-key,.

data.path= payload,\
data

recursion.type= rest

recursion.tags= -l,900,\
-t,url

recursion.location= payload,metadata,next

date.valid= true

date.location= properties

date.start= startDate

date.end= endDate

date.format= yyyy-MM-dd

0 comments on commit e91e4e7

Please sign in to comment.