diff --git a/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx b/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx index 86dc1b83..fed61c89 100644 Binary files a/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx and b/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx differ diff --git a/DeFi-Data-Engine/DeFi Data Engine/config/app.properties b/DeFi-Data-Engine/DeFi Data Engine/config/app.properties index 88474e56..1b081086 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/config/app.properties +++ b/DeFi-Data-Engine/DeFi Data Engine/config/app.properties @@ -3,6 +3,9 @@ # delimiter used for internal processing general.internal.delim=,., +# data delimiter +general.data.delim=, + # enable all packet logging general.logging.packets=true diff --git a/DeFi-Data-Engine/DeFi Data Engine/config/output.properties b/DeFi-Data-Engine/DeFi Data Engine/config/output.properties deleted file mode 100644 index 8c2b5ca9..00000000 --- a/DeFi-Data-Engine/DeFi Data Engine/config/output.properties +++ /dev/null @@ -1,28 +0,0 @@ -# === GENERAL PROPERTIES === - -consumer.types=null -producer.types=null - -# === REST SOCKET PROPERTIES === - -# Rest socket address -rest.socket.address=localhost - -# Rest socket port -rest.socket.port=61100 - -# Rest socket key -rest.socket.key=rest-key-reserved - -# === OUTPUT SOCKET PROPERTIES === - -# Output socket address -output.socket.address=localhost - -# Output socket port -output.socket.port=61200 - -# === LOCAL STREAM PROPERTIES === - -# local stream type used for database configuration -local.stream.type=mongo_db \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/config/stream.properties b/DeFi-Data-Engine/DeFi Data Engine/config/stream.properties new file mode 100644 index 00000000..ec810c17 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/config/stream.properties @@ -0,0 +1,48 @@ +# === GENERAL PROPERTIES === + +# consumer types for accepting input +general.consumer.types=null + +# producer types for writing output +general.producer.types=null + +# === REST SOCKET PROPERTIES === + +# Rest socket address +rest.socket.address=localhost + +# Rest socket port +rest.socket.port=61100 + +# Rest socket key +rest.socket.key=rest-key-reserved + +# === OUTPUT SOCKET PROPERTIES === + +# Output socket address +output.socket.address=localhost + +# Output socket port +output.socket.port=61200 + +# === LOCAL STREAM PROPERTIES === + +# local stream type used for database configuration +local.stream.type=mongo_db + +# === MONGO DB PROPERTIES === + +# local MongoDB client URI +mongodb.properties.uri=mongodb://localhost:27017 + +# local MongoDB state database name +mongodb.database.state=main-state-db + +# local MongoDB main database name +mongodb.database.main=main-db + +# authorization collection +mongodb.auth.collection=auth-collection + +# query delim +mongodb.query.delim=, \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/config/testing.properties b/DeFi-Data-Engine/DeFi Data Engine/config/testing.properties new file mode 100644 index 00000000..ff2bbc03 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/config/testing.properties @@ -0,0 +1,3 @@ +# === TESTING PROPERTIES === +lsh.authorized=true +lsh.ready=true \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/pom.xml b/DeFi-Data-Engine/DeFi Data Engine/pom.xml index 3df3471f..a1b5e418 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/pom.xml +++ b/DeFi-Data-Engine/DeFi Data Engine/pom.xml @@ -29,7 +29,7 @@ org.mongodb mongo-java-driver - 2.12.3 + 3.12.11 com.squareup.okhttp3 diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java index 33c006a8..34196b7b 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java @@ -19,9 +19,12 @@ public Response processSTRT(Packet packet) { return out_response; // start local stream handler processes: - Response lsh_response = send("LSH", "INIT", Config.getProperty("output", "local.stream.type")); - if(lsh_response.code() != 200) - return lsh_response; + String lsh_type = Config.getProperty("stream", "local.stream.type"); + if(!lsh_type.equals("null")) { + Response lsh_response = send("LSH", "INIT", lsh_type); + if(lsh_response.code() != 200) + return lsh_response; + } return ResponseFactory.response200(); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java index 528488b9..e9e326a0 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java @@ -1,6 +1,7 @@ package org.core.logger; -import java.time.Instant; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import org.framework.router.Packet; import org.framework.router.Response; @@ -26,20 +27,26 @@ public static final void terminate(Response response) { } private static final String packetFormat(Packet packet) { - return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%s] [%s]", - Instant.now().toString(), + return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%4s] [%s] [%s]", + time(), Thread.currentThread().getName(), packet.getSender(), packet.getTag(), + packet.getSubTag(), packet.getData(), packet.getSubData()); } private static final String responseFormat(Response response) { return String.format("[%s] [%-10s] RESPONSE - [%3d] [%s]", - Instant.now().toString(), + time(), Thread.currentThread().getName(), response.code(), response.data()); } + + private static final String time() { + return LocalDateTime.now() + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.nnnnnnnnn")); + } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java index ffa3ecfd..cc71631a 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java @@ -95,7 +95,7 @@ public String data() { * @param message Response message of the {@link Response} object. Uses {@link String#format(String, Object...)} for formatting with {@code args} parameter. * @return New {@link Response} object formatted based on the passed parameters. */ - public static Response create(int code, String message) { + protected static Response create(int code, String message) { Response response = new Response(code, message); if(log) Logger.log(response); @@ -111,7 +111,7 @@ public static Response create(int code, String message) { * @param data {@link String} of data to be returned in the response. * @return New {@link Response} object formatted based on the passed parameters. */ - public static Response create(int code, String message, String data) { + protected static Response create(int code, String message, String data) { Response response = new Response(code, message, data); if(log) Logger.log(response); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java index 39228d78..7cdb11db 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java @@ -1,5 +1,7 @@ package org.framework.router; +import java.util.Arrays; + public class ResponseFactory { public static void responseNotHandled(String message) { @@ -92,8 +94,8 @@ public static Response response440(String source) { return Response.create(440, String.format("Requested data source <%s> does not exist in cache.", source)); } - public static Response response441(String hash) { - return Response.create(441, String.format("Requested data stream with given hash <%s> does not exist in cache.", hash)); + public static Response response441(String source) { + return Response.create(441, String.format("Local data stream with source <%s> is not ready to handle queries.", source)); } public static Response response442(String source) { @@ -101,7 +103,7 @@ public static Response response442(String source) { } public static Response response443(String source) { - return Response.create(443, String.format("Local data stream with given source <%s> already exists.", source)); + return Response.create(443, String.format("Local data stream with source <%s> already exists.", source)); } public static Response response444(String source) { @@ -109,7 +111,23 @@ public static Response response444(String source) { } public static Response response445(String source, String query) { - return Response.create(445, String.format("Local data stream with given source <%s> could not validate passed query <%s>", query)); + return Response.create(445, String.format("Local data stream with source <%s> could not validate passed query <%s>.", source, query)); + } + + public static Response response446(String source, String query) { + return Response.create(446, String.format("Local data stream with source <%s> does not contain data from requested query <%s>.", source, query)); + } + + public static Response response447(String source, String query) { + return Response.create(447, String.format("Local data stream with source <%s> failed to process the query <%s>.", source, query)); + } + + public static Response response448(String source, String query) { + return Response.create(448, String.format("Local data stream with source <%s> failed to retrieve state with the query <%s>.", source, query)); + } + + public static Response response449(String source, String data, String... location) { + return Response.create(449, String.format("Local data stream with source <%s> failed to push data point <%s> to given location <%s>", source, data, Arrays.toString(location))); } public static Response response460(String consumer) { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java index 95e00451..eee258fc 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java @@ -26,14 +26,14 @@ public String getUUID() { @Override protected boolean init() { - int port = Integer.parseInt(Config.getProperty("output", "rest.socket.port")); + int port = Integer.parseInt(Config.getProperty("stream", "rest.socket.port")); // create server if(!SocketManager.createServer(port)) return false; // accept inflow from REST - final String key = Config.getProperty("output", "rest.socket.key"); + final String key = Config.getProperty("stream", "rest.socket.key"); if(!SocketManager.accept(port, key)) return false; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java index d9818b63..d274dca0 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java @@ -61,7 +61,7 @@ private void reflect() throws InstantiationException, IllegalAccessException, Il private void load() { // load consumers: - String[] consumer_types = Config.getProperty("output", "consumer.types").replaceAll(" ", "").split(","); + String[] consumer_types = Config.getProperty("stream", "general.consumer.types").replaceAll(" ", "").split(","); for(String type : consumer_types) { if(type.equals("null")) continue; @@ -77,7 +77,7 @@ private void load() { } // load producers: - String[] producer_types = Config.getProperty("output", "producer.types").replaceAll(" ", "").split(","); + String[] producer_types = Config.getProperty("stream", "general.producer.types").replaceAll(" ", "").split(","); for(String type : producer_types) { if(type.equals("null")) continue; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java index c59aea56..d2fd265f 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java @@ -24,7 +24,7 @@ protected boolean init() { listener = new Thread() { public void run() { while(true) { - String key = SocketManager.accept(Integer.parseInt(Config.getProperty("output", "output.socket.port"))); + String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port"))); if(key == null) { System.err.println("SocketProducer: Could not create connection to socket port."); System.exit(1); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/MongoDatabaseConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/MongoDatabaseConnection.java new file mode 100644 index 00000000..545b6805 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/MongoDatabaseConnection.java @@ -0,0 +1,111 @@ +package org.stream.local.connected.connections; + +import java.util.Set; + +import org.bson.Document; +import org.bson.conversions.Bson; +import org.properties.Config; +import org.stream.local.connected.mongodb.MongoDatabaseRequestHandler; +import org.stream.local.handler.DataState; +import org.stream.local.handler.LocalStreamConnection; +import org.stream.local.handler.LocalStreamManager; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; + +// https://www.mongodb.com/docs/drivers/java/sync/current/fundamentals/builders/filters/ +public class MongoDatabaseConnection extends LocalStreamConnection { + + private MongoClient client; + private MongoDatabase state_db; + private MongoDatabase main_db; + + private boolean authorized = false; + + public MongoDatabaseConnection(LocalStreamManager manager) { + super(manager); + } + + @Override + public String getUUID() { + return "mongo_db"; + } + + public boolean init() { + client = new MongoClient(new MongoClientURI(Config.getProperty("stream", "mongodb.properties.uri"))); + state_db = client.getDatabase(Config.getProperty("stream", "mongodb.database.state")); + main_db = client.getDatabase(Config.getProperty("stream", "mongodb.database.main")); + return true; + } + + @Override + public boolean authorize() { + if(client == null || state_db == null || main_db == null) + return false; + + Bson filter = Filters.eq("title", "test"); + MongoCollection collection = main_db.getCollection(Config.getProperty("stream", "mongodb.auth.collection")); + collection.deleteOne(filter); + + Document document = new Document("title", "test"); + collection.insertOne(document); + + Document resolved = collection.find().filter(filter).first(); + + authorized = resolved != null && resolved.get("title").equals("test"); + + return authorized; + } + + @Override + public boolean isAuthorized() { + return authorized; + } + + @Override + public boolean isReady() { + return authorized; + } + + @Override + public boolean validate(String... query) { + // parse query + return MongoDatabaseRequestHandler.validate(query); + } + + @Override + public boolean contains(String... query) { + if(!validate(query)) + return false; + + return MongoDatabaseRequestHandler.contains(main_db, query); + } + + @Override + public DataState state(String... query) { + // TODO Integrate state handler + return null; + } + + @Override + public Set get(String... query) { + if(!validate(query)) + return null; + + return MongoDatabaseRequestHandler.get(main_db, query); + } + + @Override + public boolean push(String data, String collection) { + return MongoDatabaseRequestHandler.push(main_db, data, collection); + } + + @Override + public boolean modify(String data, String... query) { + // TODO Auto-generated method stub + return false; + } +} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateConnection.java deleted file mode 100644 index 05a9c049..00000000 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateConnection.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.stream.local.connected.connections; - -public class TemplateConnection { - -} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java index 57b73488..03df5ffc 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java @@ -1,5 +1,86 @@ package org.stream.local.connected.connections; -public class TemplateLocalConnection { +import java.util.HashSet; +import java.util.Set; +import org.properties.Config; +import org.stream.local.handler.DataState; +import org.stream.local.handler.LocalStreamConnection; +import org.stream.local.handler.LocalStreamManager; + +public class TemplateLocalConnection extends LocalStreamConnection { + + public TemplateLocalConnection(LocalStreamManager manager) { + super(manager); + } + + @Override + public String getUUID() { + return "local_template"; + } + + public boolean init() { + return true; + } + + @Override + public boolean authorize() { + Config.setProperty("testing", "lsh.authorized", "true"); + return true; + } + + @Override + public boolean isAuthorized() { + return Config.getProperty("testing", "lsh.authorized").equals("true"); + } + + @Override + public boolean isReady() { + return Config.getProperty("testing", "lsh.ready").equals("true"); + } + + @Override + public boolean validate(String... query) { + return query[0].equals("valid") || + query[0].equals("dne") || + query[0].equals("irregular"); + } + + @Override + public boolean contains(String... query) { + return query[0].equals("valid") || query[0].equals("irregular"); + } + + @Override + public DataState state(String... query) { + if(query[0].equals("inv")) + return DataState.INVALID; + else if(query[0].equals("dne")) + return DataState.DOES_NOT_EXIST; + else if(query[0].equals("partial")) + return DataState.PARTIAL; + else if(query[0].equals("valid")) + return DataState.EXISTS; + else if(query[0].equals("modified")) + return DataState.MODIFIED; + else if(query[0].equals("corrupted")) + return DataState.CORRUPTED; + return DataState.INVALID; + } + + @Override + public Set get(String... query) { + if(query[0].equals("irregular")) + return null; + return new HashSet(); + } + + public boolean push(String data, String collection) { + return collection.equals("valid"); + } + + @Override + public boolean modify(String data, String... query) { + return true; + } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java new file mode 100644 index 00000000..feb11bad --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java @@ -0,0 +1,214 @@ +package org.stream.local.connected.mongodb; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Set; + +import org.bson.Document; +import org.properties.Config; + +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; + +public class MongoDatabaseRequestHandler { + + private final static HashMap translations; + private final static HashMap queries; + private final static HashMap requests; + +// define translation +// note translation maps all get, modify, and set queries to contains queries +// note alerts the engine to skip contains protocol +static { + translations = new HashMap(); + + translations.put("get_all", "contains_collection"); + translations.put("get_item", "contains_item"); +} + +// define queries +static { + queries = new HashMap(); + + // contains queries: + queries.put("contains_collection", 1); + queries.put("contains_type", 2); + queries.put("contains_item", 3); + + // get queries: + queries.put("get_all", 1); + queries.put("get_item", 3); +} + +// define requests +static { + requests = new HashMap(); + + Class classobj = MongoDatabaseRequestHandler.class; + + try { + // contains requests: + requests.put("contains_collection", classobj.getMethod("containsCollection", MongoDatabase.class, String[].class)); + requests.put("contains_type", classobj.getMethod("containsType", MongoDatabase.class, String[].class)); + requests.put("contains_item", classobj.getMethod("containsItem", MongoDatabase.class, String[].class)); + + // get requests: + requests.put("get_all", classobj.getMethod("getAll", MongoDatabase.class, String[].class)); + requests.put("get_item", classobj.getMethod("getItem", MongoDatabase.class, String[].class)); + } catch(Exception e) { + e.printStackTrace(); + System.exit(1); + } +} + + public final static boolean validate(String... query) { + // validate not empty + if(query.length == 0) + return false; + + // validate contains query name + if(!queries.containsKey(query[0]) || !requests.containsKey(query[0])) + return false; + + // validate parameter length + return query.length == queries.get(query[0]) + 1; + } + + // contains functions: + public final static boolean contains(MongoDatabase db, String... query) { + if(!validate(query)) + return false; + + for(int i = 0; i < query.length; i++) + query[i] = query[i].trim(); + + String[] copy = Arrays.copyOfRange(query, 0, query.length); + if(translations.containsKey(copy[0])) { + if(translations.get(copy[0]).equals("skip")) + return true; + else + copy[0] = translations.get(copy[0]); + } + + try { + return (boolean)requests.get(copy[0]).invoke(null, db, copy); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + e.printStackTrace(); + System.exit(1); + } + + return false; + } + + public final static boolean containsCollection(MongoDatabase db, String[] query) { + if(!query[0].equals("contains_collection")) + return false; + + MongoCursor itr = db.listCollectionNames().iterator(); + while(itr.hasNext()) { + if(itr.next().equals(query[1])) + return true; + } + + return false; + } + + public final static boolean containsType(MongoDatabase db, String[] query) { + if(!query[0].equals("contains_type")) + return false; + + if(!containsCollection(db, new String[] {"contains_collection", query[1]})) + return false; + + Document document = db.getCollection(query[1]).find().first(); + for(Entry type : document.entrySet()) + if(type.getKey().equals(query[2])) + return true; + + return false; + } + + public final static boolean containsItem(MongoDatabase db, String[] query) { + if(!query[0].equals("contains_item")) + return false; + + if(!containsType(db, new String[] {"contains_type", query[1], query[2]})) + return false; + + return db.getCollection(query[1]).find(Filters.eq(query[2], query[3])).first() != null; + } + + // get functions: + @SuppressWarnings("unchecked") + public final static Set get(MongoDatabase db, String[] query) { + if(!validate(query)) + return null; + + try { + return (Set)requests.get(query[0]).invoke(null, db, query); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + e.printStackTrace(); + System.exit(1); + } + + return null; + } + + public final static Set getAll(MongoDatabase db, String[] query) { + if(!query[0].equals("get_all")) + return null; + + if(!containsCollection(db, new String[] {"contains_collection", query[1]})) + return null; + + MongoCollection collection = db.getCollection(query[1]); + Set out = new HashSet(); + MongoCursor itr = collection.find().iterator(); + while(itr.hasNext()) + out.add(itr.next().toJson()); + + return out; + } + + public final static Set getItem(MongoDatabase db, String... query) { + if(!query[0].equals("get_item")) + return null; + + if(!containsItem(db, new String[] {"contains_item", query[1], query[2], query[3]})) + return null; + + for(int i = 0; i < query.length; i++) + query[i] = query[i].trim(); + + MongoCollection collection = db.getCollection(query[1]); + Set out = new HashSet(); + MongoCursor itr = collection.find(Filters.eq(query[2], query[3])).iterator(); + while(itr.hasNext()) + out.add(itr.next().toJson()); + + return out; + } + + public final static boolean push(MongoDatabase db, String data, String collection_name) { + MongoCollection collection = db.getCollection(collection_name); + + String[] split_data = data.split(Config.getProperty("app", "general.data.delim")); + // validate that every type has an id associated. + if(split_data.length % 2 != 0) + return false; + + Document document = new Document(); + for(int i = 0; i < split_data.length; i+=2) + document.append(split_data[i], split_data[i + 1]); + + document.append("_timestamp", System.nanoTime()); + collection.insertOne(document); + return true; + } +} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java index 20251152..37b711f1 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java @@ -1,6 +1,7 @@ package org.stream.local.handler; -import org.framework.interfaces.Hash; +import java.util.Set; + import org.framework.interfaces.UUID; public abstract class LocalStreamConnection implements UUID { @@ -11,12 +12,14 @@ public LocalStreamConnection(LocalStreamManager manager) { this.manager = manager; } + public abstract boolean init(); public abstract boolean authorize(); public abstract boolean isAuthorized(); public abstract boolean isReady(); public abstract boolean validate(String... query); public abstract boolean contains(String... query); public abstract DataState state(String... query); - public abstract String get(String... query); + public abstract Set get(String... query); + public abstract boolean push(String data, String collection); public abstract boolean modify(String data, String... query); } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java index 01a34967..0f4a6879 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java @@ -1,9 +1,12 @@ package org.stream.local.handler; +import java.util.Set; + import org.framework.router.Packet; import org.framework.router.Response; import org.framework.router.ResponseFactory; import org.framework.router.Router; +import org.properties.Config; public class LocalStreamHandler extends Router { @@ -32,22 +35,110 @@ public Response processINIT(Packet packet) { if(!manager.authorize() || !manager.isAuthorized()) return ResponseFactory.response444(source); + if(!manager.isReady()) + return ResponseFactory.response441(source); + return ResponseFactory.response200(); } public Response processSCAN(Packet packet) { - // data: query if(packet.getData().isEmpty()) return ResponseFactory.response500("LocalStreamHandler", "query"); - if(!manager.validate(packet.getData())) + if(!manager.isReady()) + return ResponseFactory.response441(manager.streamType()); + + String[] query = packet.getData().split(Config.getProperty("stream", "mongodb.query.delim")); + + if(!manager.validate(query)) return ResponseFactory.response445(manager.streamType(), packet.getData()); - if(manager.scan(packet.getData())) + if(manager.scan(query)) return ResponseFactory.response200("true"); return ResponseFactory.response200("false"); } //public Response process + public Response processRQST(Packet packet) { + if(packet.getData().isEmpty()) + return ResponseFactory.response500("LocalStreamHandler", "query"); + + if(!manager.isReady()) + return ResponseFactory.response441(manager.streamType()); + + String[] query = packet.getData().split(Config.getProperty("stream", "mongodb.query.delim")); + + if(!manager.validate(query)) + return ResponseFactory.response445(manager.streamType(), packet.getData()); + + if(!manager.scan(query)) + return ResponseFactory.response446(manager.streamType(), packet.getData()); + + Set out = manager.get(query); + + if(out == null) + return ResponseFactory.response447(manager.streamType(), packet.getData()); + + // TODO: push to output + + return ResponseFactory.response200(); + } + + public Response processSTAT(Packet packet) { + if(packet.getData().isEmpty()) + return ResponseFactory.response500("LocalStreamHandler", "query"); + + if(!manager.isReady()) + return ResponseFactory.response441(manager.streamType()); + + String[] query = packet.getData().split(Config.getProperty("stream", "mongodb.query.delim")); + + if(!manager.validate(query)) + return ResponseFactory.response445(manager.streamType(), packet.getData()); + + if(!manager.scan(query)) + return ResponseFactory.response446(manager.streamType(), packet.getData()); + + DataState state = manager.state(query); + if(state == DataState.INVALID) + return ResponseFactory.response448(manager.streamType(), packet.getData()); + + return ResponseFactory.response200(state.toString()); + } + + public Response processMODI(Packet packet) { + return ResponseFactory.response501(); +// if(packet.getData().isEmpty()) +// return ResponseFactory.response500("LocalStreamHandler", "query"); +// +// if(!manager.isReady()) +// return ResponseFactory.response441(manager.streamType()); +// +// if(!manager.validate(packet.getData())) +// return ResponseFactory.response445(manager.streamType(), packet.getData()); +// +// if(!manager.scan(packet.getData())) +// return ResponseFactory.response446(manager.streamType(), packet.getData()); +// + } + + public Response processPUSH(Packet packet) { + // data format: data, location... + if(packet.getData().isEmpty()) + return ResponseFactory.response500("LocalStreamHandler", "data, collection"); + + String[] data = packet.getData().split(Config.getProperty("app", "general.internal.delim")); + if(data.length != 2) + return ResponseFactory.response500("LocalStreamHandler", "collection"); + String collection = data[1]; + + if(!manager.isReady()) + return ResponseFactory.response441(manager.streamType()); + + if(!manager.push(data[0], collection)) + return ResponseFactory.response449(manager.streamType(), data[0], collection); + + return ResponseFactory.response200(); + } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java index fe427cee..4f4c07b6 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java @@ -5,8 +5,6 @@ import java.util.Set; import org.reflections.Reflections; -import org.stream.external.handler.ExternalStreamConnection; -import org.stream.external.handler.ExternalStreamManager; public class LocalStreamManager { @@ -30,8 +28,14 @@ protected LocalStreamManager(LocalStreamHandler handler) { private void reflect() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException { Reflections reflection = new Reflections("org.stream.local.connected.connections"); Set> types = reflection.getSubTypesOf(LocalStreamConnection.class); - for(Class c : types) - templates.put(c.getDeclaredConstructor(LocalStreamManager.class, String.class).newInstance(this, "").getUUID(), c); + for(Class c : types) { + String uuid = c.getDeclaredConstructor(LocalStreamManager.class).newInstance(this).getUUID(); + if(templates.containsKey(uuid)) { + System.err.println(String.format("Local stream UUID <%s> is not unique.", uuid)); + System.exit(1); + } + templates.put(uuid, c); + } } protected boolean containsTemplate(String type) { @@ -47,14 +51,13 @@ protected boolean setStream(String type) { return false; try { - this.stream = templates.get(type).getDeclaredConstructor(LocalStreamManager.class).newInstance(this); - return true; + stream = templates.get(type).getDeclaredConstructor(LocalStreamManager.class).newInstance(this); } catch(Exception e) { e.printStackTrace(); System.exit(1); } - return false; + return stream.init(); } protected String streamType() { @@ -101,13 +104,20 @@ public DataState state(String... query) { return stream.state(query); } - protected String get(String... query) { - if(!validate(query)) + protected Set get(String... query) { + if(!isReady() || !validate(query)) return null; return stream.get(query); } + protected boolean push(String data, String collection) { + if(!isReady()) + return false; + + return stream.push(data, collection); + } + protected boolean modify(String data, String... query) { if(!validate(query)) return false; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/framework/router/TestRouter.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/framework/router/TestRouter.java index bad2c908..c45f751d 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/framework/router/TestRouter.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/framework/router/TestRouter.java @@ -20,10 +20,6 @@ public void defineProcesses() throws NoSuchMethodException, SecurityException { p1.setAccessible(true); addProcess("", p1); } - - public Response process1(Packet packet) { - return Response.create(1, ""); - } } class Router2 extends Router { @@ -31,10 +27,6 @@ class Router2 extends Router { public Router2() { super("Router2", "RT2"); } - - public Response process1(Packet packet) { - return Response.create(2, ""); - } } class RouterTemplate extends Router { @@ -49,10 +41,6 @@ public RouterTemplate(int num, String uuid, String tag) { super(uuid, tag); this.num = num; } - - public Response process1(Packet packet) { - return Response.create(num, ""); - } } public class TestRouter { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/lsh/mongodb/TestMongoDatabase.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/lsh/mongodb/TestMongoDatabase.java new file mode 100644 index 00000000..42d45a2b --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/lsh/mongodb/TestMongoDatabase.java @@ -0,0 +1,97 @@ +package test.lsh.mongodb; + +import static org.junit.Assert.assertEquals; + +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.core.core.Core; +import org.junit.Test; +import org.properties.Config; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; + +public class TestMongoDatabase { + +static { + // disable loggers + LogManager.getRootLogger().setLevel(Level.OFF); + Config.setProperty("stream", "general.consumer.types", "null"); + Config.setProperty("stream", "general.producer.types", "null"); + Config.setProperty("stream", "local.stream.type", "mongo_db"); + + // add testing database to system + Bson filter = Filters.eq("element1", "e1"); + MongoClient client = new MongoClient(new MongoClientURI(Config.getProperty("stream", "mongodb.properties.uri"))); + MongoDatabase db = client.getDatabase("testing"); + MongoCollection collection = db.getCollection("test-mongo-database"); + collection.deleteMany(filter); + collection.insertOne(new Document().append("_timestamp", System.nanoTime()).append("element1", "e1")); +} + + @Test + public void TestINIT() { + Config.setProperty("stream", "mongodb.database.main", "testing"); + new Core(); + } + + @Test + public void TestSCAN() { + Config.setProperty("stream", "mongodb.database.main", "testing"); + Core core = new Core(); + + // test contains_collection + assertEquals(200, core.send("LSH", "SCAN", "contains_collection, test-mongo-database").code()); + assertEquals("true", core.send("LSH", "SCAN", "contains_collection, test-mongo-database").data()); + assertEquals("false", core.send("LSH", "SCAN", "contains_collection, dne").data()); + assertEquals(445, core.send("LSH", "SCAN", "contains_collection, test-mongo-database, invalid").code()); + + // test contains_type + assertEquals(200, core.send("LSH", "SCAN", "contains_type, test-mongo-database, element1").code()); + assertEquals("true", core.send("LSH", "SCAN", "contains_type, test-mongo-database, element1").data()); + assertEquals("false", core.send("LSH", "SCAN", "contains_type, test-mongo-database, dne").data()); + assertEquals(445, core.send("LSH", "SCAN", "contains_type, test-mongo-database, element1, invalid").code()); + + // test contains_item + assertEquals(200, core.send("LSH", "SCAN", "contains_item, test-mongo-database, element1, e1").code()); + assertEquals("true", core.send("LSH", "SCAN", "contains_item, test-mongo-database, element1, e1").data()); + assertEquals("false", core.send("LSH", "SCAN", "contains_item, test-mongo-database, element1, dne").data()); + assertEquals(445, core.send("LSH", "SCAN", "contains_item, test-mongo-database, element1, e1, invalid").code()); + + assertEquals(500, core.send("LSH", "SCAN", "").code()); + } + + @Test + public void TestRQST() { + Config.setProperty("stream", "mongodb.database.main", "testing"); + Core core = new Core(); + + // test get_all + assertEquals(200, core.send("LSH", "RQST", "get_all, test-mongo-database").code()); + assertEquals(445, core.send("LSH", "RQST", "get_all, test-mongo-database, invalid").code()); + assertEquals(446, core.send("LSH", "RQST", "get_all, dne").code()); + + // test get_item + assertEquals(200, core.send("LSH", "RQST", "get_item, test-mongo-database, element1, e1").code()); + assertEquals(446, core.send("LSH", "RQST", "get_item, test-mongo-database, element1, e2").code()); + + assertEquals(500, core.send("LSH", "RQST", "").code()); + } + + @Test + public void TestPUSH() { + Config.setProperty("stream", "mongodb.database.main", "testing"); + Core core = new Core(); + + assertEquals(200, core.send("LSH", "PUSH", "element1, e1,.,test-mongo-database").code()); + assertEquals(200, core.send("LSH", "PUSH", "element1, e1, element2, e2,.,test-mongo-database").code()); + assertEquals(449, core.send("LSH", "PUSH", "element1, e1, invalid,.,test-mongo-database").code()); + + assertEquals(500, core.send("LSH", "PUSH", "invalid").code()); + } +} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestESH.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestESH.java index 5d6011dc..a7282476 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestESH.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestESH.java @@ -7,12 +7,16 @@ import org.core.core.Core; import org.framework.router.Response; import org.junit.Test; +import org.properties.Config; public class TestESH { static { // disable loggers LogManager.getRootLogger().setLevel(Level.OFF); + Config.setProperty("output", "consumer.types", "null"); + Config.setProperty("output", "producer.types", "null"); + Config.setProperty("output", "local.stream.type", "null"); } @Test diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java index 17ca01d2..af60af21 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java @@ -1,5 +1,111 @@ package test.protocols; +import static org.junit.Assert.assertEquals; + +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.core.core.Core; +import org.junit.Test; +import org.properties.Config; + public class TestLSH { +static { + // disable loggers + LogManager.getRootLogger().setLevel(Level.OFF); + Config.setProperty("stream", "general.consumer.types", "null"); + Config.setProperty("stream", "general.producer.types", "null"); + Config.setProperty("stream", "local.stream.type", "null"); +} + + @Test + public void TestINIT() { + Config.setProperty("stream", "local.stream.type", "local_template"); + Config.setProperty("testing", "lsh.ready", "true"); + Core core = new Core(); + + assertEquals(440, core.send("LSH", "INIT", "null").code()); + assertEquals(443, core.send("LSH", "INIT", "local_template").code()); + assertEquals(500, core.send("LSH", "INIT", "").code()); + + // enable to check engine catch invalid property: local.stream.type + // Config.setProperty("stream", "local.stream.type", "invalid"); + // new Core(); + } + + @Test + public void TestSCAN() { + Config.setProperty("stream", "local.stream.type", "local_template"); + Core core = new Core(); + + assertEquals(200, core.send("LSH", "SCAN", "valid").code()); + + Config.setProperty("testing", "lsh.ready", "false"); + assertEquals(441, core.send("LSH", "SCAN", "valid").code()); + Config.setProperty("testing", "lsh.ready", "true"); + + assertEquals(445, core.send("LSH", "SCAN", "invalid").code()); + assertEquals("true", core.send("LSH", "SCAN", "valid").data()); + assertEquals("false", core.send("LSH", "SCAN", "dne").data()); + + assertEquals(500, core.send("LSH", "SCAN", "").code()); + } + + @Test + public void TestRQST() { + Config.setProperty("stream", "local.stream.type", "local_template"); + Core core = new Core(); + + assertEquals(200, core.send("LSH", "RQST", "valid").code()); + + Config.setProperty("testing", "lsh.ready", "false"); + assertEquals(441, core.send("LSH", "RQST", "valid").code()); + Config.setProperty("testing", "lsh.ready", "true"); + + assertEquals(445, core.send("LSH", "RQST", "invalid").code()); + assertEquals(446, core.send("LSH", "RQST", "dne").code()); + assertEquals(447, core.send("LSH", "RQST", "irregular").code()); + + assertEquals(500, core.send("LSH", "RQST", "").code()); + } + + @Test + public void TestSTAT() { + Config.setProperty("stream", "local.stream.type", "local_template"); + Core core = new Core(); + + assertEquals(200, core.send("LSH", "STAT", "valid").code()); + assertEquals("EXISTS", core.send("LSH", "STAT", "valid").data()); + + Config.setProperty("testing", "lsh.ready", "false"); + assertEquals(441, core.send("LSH", "STAT", "valid").code()); + Config.setProperty("testing", "lsh.ready", "true"); + + assertEquals(445, core.send("LSH", "STAT", "invalid").code()); + assertEquals(446, core.send("LSH", "STAT", "dne").code()); + assertEquals(448, core.send("LSH", "STAT", "irregular").code()); + + assertEquals(500, core.send("LSH", "STAT", "").code()); + } + + @Test + public void TestPUSH() { + Config.setProperty("stream", "local.stream.type", "local_template"); + Core core = new Core(); + + assertEquals(200, core.send("LSH", "PUSH", format("1", "valid")).code()); + + Config.setProperty("testing", "lsh.ready", "false"); + assertEquals(441, core.send("LSH", "PUSH", format("1", "valid")).code()); + Config.setProperty("testing", "lsh.ready", "true"); + + assertEquals(449, core.send("LSH", "PUSH", format("1", "invalid")).code()); + + assertEquals(500, core.send("LSH", "PUSH", "").code()); + assertEquals(500, core.send("LSH", "PUSH", "1").code()); + } + + private String format(String s1, String s2) { + return s1 + Config.getProperty("app", "general.internal.delim") + s2; + } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java index 157bed0e..957d7c71 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java @@ -7,12 +7,16 @@ import org.core.core.Core; import org.framework.router.Response; import org.junit.Test; +import org.properties.Config; public class TestSRC { static { // disable loggers LogManager.getRootLogger().setLevel(Level.OFF); + Config.setProperty("output", "consumer.types", "null"); + Config.setProperty("output", "producer.types", "null"); + Config.setProperty("output", "local.stream.type", "null"); } @Test @@ -137,13 +141,14 @@ public void TestRQST() { assertEquals(200, core.send("SRC", "INIT", "external_template, key").code()); assertEquals(200, core.send("SRC", "INIT", "external_template, not_ready, true").code()); - assertEquals(200, core.send("SRC", "RQST", "key, correct").code()); + assertEquals(200, core.send("SRC", "RQST", "key, correct", "destination").code()); - assertEquals(421, core.send("SRC", "RQST", "does_not_exist, correct").code()); - assertEquals(423, core.send("SRC", "RQST", "not_ready, correct").code()); - assertEquals(428, core.send("SRC", "RQST", "key, does_not_exist").code()); - assertEquals(429, core.send("SRC", "RQST", "key, irregular").code()); - assertEquals(500, core.send("SRC", "RQST", "").code()); - assertEquals(500, core.send("SRC", "RQST", "key").code()); + assertEquals(421, core.send("SRC", "RQST", "does_not_exist, correct", "destination").code()); + assertEquals(423, core.send("SRC", "RQST", "not_ready, correct", "destination").code()); + assertEquals(428, core.send("SRC", "RQST", "key, does_not_exist", "destination").code()); + assertEquals(429, core.send("SRC", "RQST", "key, irregular", "destination").code()); + assertEquals(500, core.send("SRC", "RQST", "", "destination").code()); + assertEquals(500, core.send("SRC", "RQST", "key", "destination").code()); + assertEquals(500, core.send("SRC", "RQST", "key, correct", "").code()); } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java index 5eca334b..8adecfce 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java @@ -32,10 +32,6 @@ public void defineProcesses() throws NoSuchMethodException, SecurityException { p1.setAccessible(true); addProcess("", p1); } - - public Response process1(Packet packet) { - return Response.create(num, ""); - } } public class TestRouterSendSpeed {