Skip to content

Commit

Permalink
Integrate MongoDB classes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Conor Flynn committed Sep 22, 2022
1 parent 7796f44 commit c390be4
Show file tree
Hide file tree
Showing 26 changed files with 845 additions and 90 deletions.
Binary file modified Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx
Binary file not shown.
3 changes: 3 additions & 0 deletions DeFi-Data-Engine/DeFi Data Engine/config/app.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
# delimiter used for internal processing
general.internal.delim=,.,

# data delimiter
general.data.delim=,

# enable all packet logging
general.logging.packets=true

Expand Down
28 changes: 0 additions & 28 deletions DeFi-Data-Engine/DeFi Data Engine/config/output.properties

This file was deleted.

48 changes: 48 additions & 0 deletions DeFi-Data-Engine/DeFi Data Engine/config/stream.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# === GENERAL PROPERTIES ===

# consumer types for accepting input
general.consumer.types=null

# producer types for writing output
general.producer.types=null

# === REST SOCKET PROPERTIES ===

# Rest socket address
rest.socket.address=localhost

# Rest socket port
rest.socket.port=61100

# Rest socket key
rest.socket.key=rest-key-reserved

# === OUTPUT SOCKET PROPERTIES ===

# Output socket address
output.socket.address=localhost

# Output socket port
output.socket.port=61200

# === LOCAL STREAM PROPERTIES ===

# local stream type used for database configuration
local.stream.type=mongo_db

# === MONGO DB PROPERTIES ===

# local MongoDB client URI
mongodb.properties.uri=mongodb://localhost:27017

# local MongoDB state database name
mongodb.database.state=main-state-db

# local MongoDB main database name
mongodb.database.main=main-db

# authorization collection
mongodb.auth.collection=auth-collection

# query delim
mongodb.query.delim=,
3 changes: 3 additions & 0 deletions DeFi-Data-Engine/DeFi Data Engine/config/testing.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# === TESTING PROPERTIES ===
lsh.authorized=true
lsh.ready=true
2 changes: 1 addition & 1 deletion DeFi-Data-Engine/DeFi Data Engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>2.12.3</version>
<version>3.12.11</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ public Response processSTRT(Packet packet) {
return out_response;

// start local stream handler processes:
Response lsh_response = send("LSH", "INIT", Config.getProperty("output", "local.stream.type"));
if(lsh_response.code() != 200)
return lsh_response;
String lsh_type = Config.getProperty("stream", "local.stream.type");
if(!lsh_type.equals("null")) {
Response lsh_response = send("LSH", "INIT", lsh_type);
if(lsh_response.code() != 200)
return lsh_response;
}

return ResponseFactory.response200();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.core.logger;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

import org.framework.router.Packet;
import org.framework.router.Response;
Expand All @@ -26,20 +27,26 @@ public static final void terminate(Response response) {
}

private static final String packetFormat(Packet packet) {
return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%s] [%s]",
Instant.now().toString(),
return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%4s] [%s] [%s]",
time(),
Thread.currentThread().getName(),
packet.getSender(),
packet.getTag(),
packet.getSubTag(),
packet.getData(),
packet.getSubData());
}

private static final String responseFormat(Response response) {
return String.format("[%s] [%-10s] RESPONSE - [%3d] [%s]",
Instant.now().toString(),
time(),
Thread.currentThread().getName(),
response.code(),
response.data());
}

private static final String time() {
return LocalDateTime.now()
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.nnnnnnnnn"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public String data() {
* @param message Response message of the {@link Response} object. Uses {@link String#format(String, Object...)} for formatting with {@code args} parameter.
* @return New {@link Response} object formatted based on the passed parameters.
*/
public static Response create(int code, String message) {
protected static Response create(int code, String message) {
Response response = new Response(code, message);
if(log)
Logger.log(response);
Expand All @@ -111,7 +111,7 @@ public static Response create(int code, String message) {
* @param data {@link String} of data to be returned in the response.
* @return New {@link Response} object formatted based on the passed parameters.
*/
public static Response create(int code, String message, String data) {
protected static Response create(int code, String message, String data) {
Response response = new Response(code, message, data);
if(log)
Logger.log(response);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.framework.router;

import java.util.Arrays;

public class ResponseFactory {

public static void responseNotHandled(String message) {
Expand Down Expand Up @@ -92,24 +94,40 @@ public static Response response440(String source) {
return Response.create(440, String.format("Requested data source <%s> does not exist in cache.", source));
}

public static Response response441(String hash) {
return Response.create(441, String.format("Requested data stream with given hash <%s> does not exist in cache.", hash));
public static Response response441(String source) {
return Response.create(441, String.format("Local data stream with source <%s> is not ready to handle queries.", source));
}

public static Response response442(String source) {
return Response.create(442, String.format("Failed to add local data source <%s>.", source));
}

public static Response response443(String source) {
return Response.create(443, String.format("Local data stream with given source <%s> already exists.", source));
return Response.create(443, String.format("Local data stream with source <%s> already exists.", source));
}

public static Response response444(String source) {
return Response.create(444, String.format("Failure to authorize the local data source <%s> with the given properties.", source));
}

public static Response response445(String source, String query) {
return Response.create(445, String.format("Local data stream with given source <%s> could not validate passed query <%s>", query));
return Response.create(445, String.format("Local data stream with source <%s> could not validate passed query <%s>.", source, query));
}

public static Response response446(String source, String query) {
return Response.create(446, String.format("Local data stream with source <%s> does not contain data from requested query <%s>.", source, query));
}

public static Response response447(String source, String query) {
return Response.create(447, String.format("Local data stream with source <%s> failed to process the query <%s>.", source, query));
}

public static Response response448(String source, String query) {
return Response.create(448, String.format("Local data stream with source <%s> failed to retrieve state with the query <%s>.", source, query));
}

public static Response response449(String source, String data, String... location) {
return Response.create(449, String.format("Local data stream with source <%s> failed to push data point <%s> to given location <%s>", source, data, Arrays.toString(location)));
}

public static Response response460(String consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ public String getUUID() {

@Override
protected boolean init() {
int port = Integer.parseInt(Config.getProperty("output", "rest.socket.port"));
int port = Integer.parseInt(Config.getProperty("stream", "rest.socket.port"));

// create server
if(!SocketManager.createServer(port))
return false;

// accept inflow from REST
final String key = Config.getProperty("output", "rest.socket.key");
final String key = Config.getProperty("stream", "rest.socket.key");

if(!SocketManager.accept(port, key))
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private void reflect() throws InstantiationException, IllegalAccessException, Il

private void load() {
// load consumers:
String[] consumer_types = Config.getProperty("output", "consumer.types").replaceAll(" ", "").split(",");
String[] consumer_types = Config.getProperty("stream", "general.consumer.types").replaceAll(" ", "").split(",");
for(String type : consumer_types) {
if(type.equals("null"))
continue;
Expand All @@ -77,7 +77,7 @@ private void load() {
}

// load producers:
String[] producer_types = Config.getProperty("output", "producer.types").replaceAll(" ", "").split(",");
String[] producer_types = Config.getProperty("stream", "general.producer.types").replaceAll(" ", "").split(",");
for(String type : producer_types) {
if(type.equals("null"))
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ protected boolean init() {
listener = new Thread() {
public void run() {
while(true) {
String key = SocketManager.accept(Integer.parseInt(Config.getProperty("output", "output.socket.port")));
String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port")));
if(key == null) {
System.err.println("SocketProducer: Could not create connection to socket port.");
System.exit(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package org.stream.local.connected.connections;

import java.util.Set;

import org.bson.Document;
import org.bson.conversions.Bson;
import org.properties.Config;
import org.stream.local.connected.mongodb.MongoDatabaseRequestHandler;
import org.stream.local.handler.DataState;
import org.stream.local.handler.LocalStreamConnection;
import org.stream.local.handler.LocalStreamManager;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;

// https://www.mongodb.com/docs/drivers/java/sync/current/fundamentals/builders/filters/
public class MongoDatabaseConnection extends LocalStreamConnection {

private MongoClient client;
private MongoDatabase state_db;
private MongoDatabase main_db;

private boolean authorized = false;

public MongoDatabaseConnection(LocalStreamManager manager) {
super(manager);
}

@Override
public String getUUID() {
return "mongo_db";
}

public boolean init() {
client = new MongoClient(new MongoClientURI(Config.getProperty("stream", "mongodb.properties.uri")));
state_db = client.getDatabase(Config.getProperty("stream", "mongodb.database.state"));
main_db = client.getDatabase(Config.getProperty("stream", "mongodb.database.main"));
return true;
}

@Override
public boolean authorize() {
if(client == null || state_db == null || main_db == null)
return false;

Bson filter = Filters.eq("title", "test");
MongoCollection<Document> collection = main_db.getCollection(Config.getProperty("stream", "mongodb.auth.collection"));
collection.deleteOne(filter);

Document document = new Document("title", "test");
collection.insertOne(document);

Document resolved = collection.find().filter(filter).first();

authorized = resolved != null && resolved.get("title").equals("test");

return authorized;
}

@Override
public boolean isAuthorized() {
return authorized;
}

@Override
public boolean isReady() {
return authorized;
}

@Override
public boolean validate(String... query) {
// parse query
return MongoDatabaseRequestHandler.validate(query);
}

@Override
public boolean contains(String... query) {
if(!validate(query))
return false;

return MongoDatabaseRequestHandler.contains(main_db, query);
}

@Override
public DataState state(String... query) {
// TODO Integrate state handler
return null;
}

@Override
public Set<String> get(String... query) {
if(!validate(query))
return null;

return MongoDatabaseRequestHandler.get(main_db, query);
}

@Override
public boolean push(String data, String collection) {
return MongoDatabaseRequestHandler.push(main_db, data, collection);
}

@Override
public boolean modify(String data, String... query) {
// TODO Auto-generated method stub
return false;
}
}

This file was deleted.

Loading

0 comments on commit c390be4

Please sign in to comment.