Skip to content

Commit

Permalink
update LSH for compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
Conor Flynn committed Mar 21, 2023
1 parent 61b6282 commit 5101e0c
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ public class Config {
Properties stream_properties = new Properties();
stream_properties.put("general.consumer.types", "socket_consumer");
stream_properties.put("general.producer.types", "socket_producer");
//stream_properties.put("rest.socket.address", "DataEngine");
stream_properties.put("rest.socket.address", "localhost");
stream_properties.put("rest.socket.address", "DataEngine");
//stream_properties.put("rest.socket.address", "localhost");
stream_properties.put("rest.socket.port", "61100");
stream_properties.put("rest.socket.key", "rest-key-reserved");
//stream_properties.put("output.socket.address", "RestApp");
stream_properties.put("output.socket.address", "localhost");
stream_properties.put("output.socket.address", "RestApp");
// stream_properties.put("output.socket.address", "localhost");
stream_properties.put("output.socket.port", "61200");
//stream_properties.put("local.stream.type", "mongo_db");
stream_properties.put("local.stream.type", "null");
stream_properties.put("mongodb.properties.uri", "mongodb://MONGO:27017");
stream_properties.put("local.stream.type", "mongo_db");
//stream_properties.put("local.stream.type", "null");
//stream_properties.put("mongodb.properties.uri", "mongodb://MONGO:27017");
stream_properties.put("mongodb.properties.uri", "mongodb://localhost:27017");
stream_properties.put("mongodb.database.state", "main-state-db");
stream_properties.put("mongodb.database.main", "main-db");
stream_properties.put("mongodb.auth.collection", "auth-collection");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public Response processRQST(Packet packet) {

// check to see if dated
// if not
if((validate = packet.validate("startDate", "endDate")) != null)
if((validate = packet.validate("start_date", "end_date")) != null)
response = manager.request(packet.getData("type"), packet.getData());

// if dated
else
response = manager.request(packet.getData("type"), packet.getData(), packet.getData("startDate"), packet.getData("endDate"));
response = manager.request(packet.getData("type"), packet.getData(), packet.getData("start_date"), packet.getData("end_date"));

// check to see if valid
if(!((boolean)response[0])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.HashMap;

import org.framework.router.Response;
import org.stream.external.requests.ExternalRequestFramework;
import org.stream.external.requests.ExternalRequestManager;

Expand All @@ -11,8 +10,6 @@ public class ExternalStreamManager {
private final ExternalStreamHandler handler;
private final ExternalRequestManager manager;

private final HashMap<String, String> collections = new HashMap<String, String>();

protected ExternalStreamManager(ExternalStreamHandler handler) {
this.handler = handler;
this.manager = new ExternalRequestManager();
Expand Down Expand Up @@ -75,7 +72,7 @@ protected Object[] request(String type, HashMap<String, String> data, String sta

// submit response
String response;
if(startDate == null && endDate == null)
if(startDate == null || endDate == null)
response = request.request(url_path, properties, headers);
else
response = request.request(url_path, properties, headers, startDate, endDate);
Expand All @@ -90,7 +87,10 @@ public void processRequest(String collection, HashMap<String, String> data) {
if(data == null || data.isEmpty())
return;

System.out.println(collection + ": " + data.toString());
// TODO: FINISH
StringBuilder sb = new StringBuilder();
for(String key : data.keySet())
sb.append(key.replaceAll(",", ".") + "," + data.get(key).replaceAll(",", ".") + ",");
sb.delete(sb.length() - 1, sb.length());
handler.send("LSH", "PUSH", "collection", collection, "data", sb.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,10 @@ public Response processSCAN(Packet packet) {
//public Response process
public Response processRQST(Packet packet) {
String validate;
if((validate = packet.validate("uuid", "request", "query", "destination")) != null)
if((validate = packet.validate("type", "destination")) != null)
return ResponseFactory.response500("LocalStreamHandler", validate);

String request;
String delim = Config.getProperty("app", "general.collection.delim");
if(packet.containsKey("date")) {
request = packet.getData("uuid") + delim + packet.getData("query") + delim + packet.getData("date");
} else {
request = packet.getData("uuid") + delim + packet.getData("query");
}
String request = packet.getData("query");

//String[] query = packet.getData("query").split(Config.getProperty("app", "general.data.delim"));
String[] query = new String[] {"get_all", request};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,6 @@ public Response processEXSR(Packet packet) {
return send("ESH", "EXSR", packet.getData());
}

public Response processEXST(Packet packet) {
return send("ESH", "EXST", packet.getData());
}

public Response processINIT(Packet packet) {
return send("ESH", "INIT", packet.getData());
}

public Response processIATH(Packet packet) {
return send("ESH", "IATH", packet.getData());
}

public Response processIATV(Packet packet) {
return send("ESH", "IATV", packet.getData());
}

public Response processEXEC(Packet packet) {
return send("ESH", "EXEC", packet.getData());
}

public Response processKILL(Packet packet) {
return send("ESH", "KILL", packet.getData());
}

public Response processSUBS(Packet packet) {
return send("ESH", "SUBS", packet.getData());
}

public Response processEDAT(Packet packet) {
return send("OUT", "EDAT", packet.getData());
}
Expand All @@ -65,7 +37,7 @@ public Response processRQST(Packet packet) {
// Not Dated: data=key, request, query, destination

String validate;
if((validate = packet.validate("key", "request", "query", "destination")) != null)
if((validate = packet.validate("type", "destination")) != null)
return ResponseFactory.response500("ExternalStreamHandler", validate);

// check to see if request is dated
Expand All @@ -80,17 +52,9 @@ public Response processRQST(Packet packet) {
// extract packet data
HashMap<String, String> data = packet.getData();

// retrieve stream type based on key
Response type_response = send("ESH", "TYPE", data);
if(type_response.code() != 200)
return type_response;

// retrieve uuid of the data
String uuid = type_response.data();
data.put("uuid", uuid);

// not dated
if(!dated) {
data.put("query", String.format("get_all, %s", packet.getData("type")));
Response lsh_response = send("LSH", "RQST", data);
// if data does not exist send request to external stream handler
if(lsh_response.code() == 446)
Expand All @@ -114,6 +78,7 @@ else if(dated) {

// initial request
data.put("date", date.format(formatter));
data.put("query", String.format("%s-%s", data.get("type"), data.get("date")));
lsh_response = send("LSH", "RQST", data);
if(lsh_response.code() == 200)
continue;
Expand Down
Loading

0 comments on commit 5101e0c

Please sign in to comment.