Skip to content

Update Internals to be compatible with API handler integration #5

Merged
merged 2 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ public class Config {
Properties stream_properties = new Properties();
stream_properties.put("general.consumer.types", "socket_consumer");
stream_properties.put("general.producer.types", "socket_producer");
//stream_properties.put("rest.socket.address", "DataEngine");
stream_properties.put("rest.socket.address", "localhost");
stream_properties.put("rest.socket.address", "DataEngine");
//stream_properties.put("rest.socket.address", "localhost");
stream_properties.put("rest.socket.port", "61100");
stream_properties.put("rest.socket.key", "rest-key-reserved");
//stream_properties.put("output.socket.address", "RestApp");
stream_properties.put("output.socket.address", "localhost");
stream_properties.put("output.socket.address", "RestApp");
// stream_properties.put("output.socket.address", "localhost");
stream_properties.put("output.socket.port", "61200");
//stream_properties.put("local.stream.type", "mongo_db");
stream_properties.put("local.stream.type", "null");
stream_properties.put("mongodb.properties.uri", "mongodb://MONGO:27017");
stream_properties.put("local.stream.type", "mongo_db");
//stream_properties.put("local.stream.type", "null");
//stream_properties.put("mongodb.properties.uri", "mongodb://MONGO:27017");
stream_properties.put("mongodb.properties.uri", "mongodb://localhost:27017");
stream_properties.put("mongodb.database.state", "main-state-db");
stream_properties.put("mongodb.database.main", "main-db");
stream_properties.put("mongodb.auth.collection", "auth-collection");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public Response processRQST(Packet packet) {

// check to see if dated
// if not
if((validate = packet.validate("startDate", "endDate")) != null)
if((validate = packet.validate("start_date", "end_date")) != null)
response = manager.request(packet.getData("type"), packet.getData());

// if dated
else
response = manager.request(packet.getData("type"), packet.getData(), packet.getData("startDate"), packet.getData("endDate"));
response = manager.request(packet.getData("type"), packet.getData(), packet.getData("start_date"), packet.getData("end_date"));

// check to see if valid
if(!((boolean)response[0])) {
Expand All @@ -56,184 +56,4 @@ public Response processRQST(Packet packet) {
// return valid
return ResponseFactory.response200();
}

// // source: source of data
// public Response processEXSR(Packet packet) {
// String validate;
// if((validate = packet.validate("source")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// return ResponseFactory.response200(String.format("%s", manager.containsTemplate(packet.getData("source"))));
// }
//
// // key: key of the given stream
// public Response processEXST(Packet packet) {
// String validate;
// if((validate = packet.validate("key")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// return ResponseFactory.response200(String.format("%s", manager.containsStream(packet.getData("key"))));
// }
//
// // source: source of the given stream
// // auth_data[]: all keys associated with data
// public Response processINIT(Packet packet) {
// String validate;
// if((validate = packet.validate("source")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String source = packet.getData("source");
//
// // validate data
// String tempHash = manager.getHash(source, packet.getData());
// if(tempHash == null)
// return ResponseFactory.response430(packet.getData());
// if(manager.containsStream(tempHash))
// return ResponseFactory.response220(tempHash);
//
// // attempt to add stream
// Object[] output = manager.addStream(source, packet.getData());
// boolean success = (Boolean) output[0];
// String hash = (String) output[1];
//
// if(!success)
// return ResponseFactory.response420(source);
//
// boolean authorized = manager.authorizeStream(hash);
//
// // if successful authorize
// if(authorized)
// return ResponseFactory.response200(String.format("%s", hash));
//
// manager.removeStream(hash);
// return ResponseFactory.response422(source);
// }
//
// // key: stream key
// public Response processIATH(Packet packet) {
// String validate;
// if((validate = packet.validate("key")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// if(!manager.containsStream(packet.getData("key")))
// return ResponseFactory.response421(packet.getData("key"));
//
// return ResponseFactory.response200(String.format("%s", manager.isStreamAuthorized(packet.getData("key"))));
// }
//
// public Response processIATV(Packet packet) {
// String validate;
// if((validate = packet.validate("key")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String key = packet.getData("key");
// if(!manager.containsStream(key))
// return ResponseFactory.response421(key);
//
// return ResponseFactory.response200(String.format("%s", manager.isStreamActive(key)));
// }
//
// public Response processEXEC(Packet packet) {
// String validate;
// if((validate = packet.validate("key")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String key = packet.getData("key");
// if(!manager.containsStream(key))
// return ResponseFactory.response421(key);
//
// if(!manager.isStreamReady(key))
// return ResponseFactory.response423(key);
//
// if(manager.isStreamActive(key))
// return ResponseFactory.response424(key);
//
// return ResponseFactory.response200(String.format("%s", manager.executeStream(key)));
// }
//
// public Response processKILL(Packet packet) {
// String validate;
// if((validate = packet.validate("key")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String key = packet.getData("key");
// if(!manager.containsStream(key))
// return ResponseFactory.response421(key);
//
// if(!manager.isStreamActive(key))
// return ResponseFactory.response425(key);
//
// return ResponseFactory.response200(String.format("%s", manager.killStream(key)));
// }
//
// public Response processSUBS(Packet packet) {
// String validate;
// if((validate = packet.validate("key", "subscription")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String key = packet.getData("key");
// String subscription = packet.getData("subscription");
//
// if(!manager.containsStream(key))
// return ResponseFactory.response421(key);
//
// if(!manager.isStreamActive(key))
// return ResponseFactory.response425(key);
//
// if(!manager.containsSubscriptionType(key, subscription))
// return ResponseFactory.response426(key, subscription);
//
// Object[] response = manager.subscribe(key, subscription);
//
// if((Boolean)response[0])
// return ResponseFactory.response200(String.format("%s", "true"));
//
// return ResponseFactory.response427(key, response[0].toString(), response[1].toString());
// }
//
// public Response processRQST(Packet packet) {
// String validate;
// if((validate = packet.validate("key", "request", "query", "destination")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String key = packet.getData("key");
// String request = packet.getData("request");
//
// if(!manager.containsStream(key))
// return ResponseFactory.response421(key);
//
// if(!manager.isStreamReady(key))
// return ResponseFactory.response423(key);
//
// if(!manager.containsRequestType(key, request))
// return ResponseFactory.response428(key, request);
//
// Object[] response = manager.request(key, packet.getData());
//
// if(response == null)
// return ResponseFactory.response501("Response from Manager.request should never be null. "
// + "Improper implementation of ExternalStreamConnection.");
//
// if((Boolean)response[0])
// return ResponseFactory.response200(String.format("%s", (String)response[1]));
//
// return ResponseFactory.response429(key, request, response[1].toString());
// }
//
// public Response processTYPE(Packet packet) {
// String validate;
// if((validate = packet.validate("key")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String key = packet.getData("key");
// if(!manager.containsStream(key))
// return ResponseFactory.response421(key);
//
// String uuid = manager.getStreamType(key);
// if(uuid == null)
// return ResponseFactory.response501("Stream was removed in different thread mid observation.");
//
// return ResponseFactory.response200(uuid);
//
// }
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.stream.external.handler;

import java.util.HashMap;
import java.util.TreeMap;

import org.framework.router.Response;
import org.stream.external.requests.ExternalRequestFramework;
import org.stream.external.requests.ExternalRequestManager;

Expand All @@ -11,8 +11,6 @@ public class ExternalStreamManager {
private final ExternalStreamHandler handler;
private final ExternalRequestManager manager;

private final HashMap<String, String> collections = new HashMap<String, String>();

protected ExternalStreamManager(ExternalStreamHandler handler) {
this.handler = handler;
this.manager = new ExternalRequestManager();
Expand Down Expand Up @@ -50,13 +48,16 @@ protected Object[] request(String type, HashMap<String, String> data, String sta

// retrieve properties
HashMap<String, String> properties = new HashMap<String, String>();
TreeMap<String, String> user_properties = new TreeMap<String, String>();
if(data.containsKey("properties")) {
String[] raw_properties = data.get("properties").split(",");
if(raw_properties.length % 2 != 0)
return new Object[] {false, String.format("Properties must be in <key, value> pairs.")};

for(int i = 0; i < raw_properties.length; i+=2)
for(int i = 0; i < raw_properties.length; i+=2) {
properties.put(raw_properties[i], raw_properties[i + 1]);
user_properties.put(raw_properties[i], raw_properties[i + 1]);
}
}

// retrieve headers
Expand All @@ -75,10 +76,10 @@ protected Object[] request(String type, HashMap<String, String> data, String sta

// submit response
String response;
if(startDate == null && endDate == null)
response = request.request(url_path, properties, headers);
if(startDate == null || endDate == null)
response = request.request(url_path, properties, headers, user_properties);
else
response = request.request(url_path, properties, headers, startDate, endDate);
response = request.request(url_path, properties, headers, startDate, endDate, user_properties);

if(response != null)
return new Object[] {false, response};
Expand All @@ -90,7 +91,10 @@ public void processRequest(String collection, HashMap<String, String> data) {
if(data == null || data.isEmpty())
return;

System.out.println(collection + ": " + data.toString());
// TODO: FINISH
StringBuilder sb = new StringBuilder();
for(String key : data.keySet())
sb.append(key.replaceAll(",", ".") + "," + data.get(key).replaceAll(",", ".") + ",");
sb.delete(sb.length() - 1, sb.length());
handler.send("LSH", "PUSH", "collection", collection, "data", sb.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.TreeMap;
import java.util.stream.Stream;

import org.json.JSONArray;
Expand Down Expand Up @@ -65,7 +67,6 @@ public ExternalRequestFramework(ExternalStreamManager manager, String name, Stri
HashMap<String, String> tags, String[] recursive_location, String recursive_replacement, String[] path,
boolean is_dated, String date_location, String date_start_var, String date_end_var, String date_format) {
this.manager = manager;
this.collection = name;
this.name = name;
this.url = url;
this.url_path = url_path;
Expand Down Expand Up @@ -178,7 +179,6 @@ protected final Request getRequest(String url, HashMap<String, String> propertie
return builder.build();
}


public final synchronized String request(String[] url_path, HashMap<String, String> properties, HashMap<String, String> headers) {
// validate that url_path is correctly formatted
if(url_path.length % 2 != 0) {
Expand Down Expand Up @@ -208,8 +208,21 @@ public final synchronized String request(String[] url_path, HashMap<String, Stri
return process(url_builder.toString(), properties, headers);
}

public final synchronized String request(String[] url_path, HashMap<String, String> properties, HashMap<String, String> headers, TreeMap<String, String> user_properties) {
// update collection to be dated
StringBuilder sb = new StringBuilder();
sb.append(name);
if(!user_properties.isEmpty())
sb.append("-").append(user_properties.toString());
if(url_path.length != 0)
sb.append("-").append(Arrays.toString(url_path));
this.collection = sb.toString();
collection = sb.toString();
return request(url_path, properties, headers);
}

public final synchronized String request(String[] url_path, HashMap<String, String> properties, HashMap<String, String> headers,
String startDate, String endDate) {
String startDate, String endDate, TreeMap<String, String> user_properties) {

// validate that the request can be dated
if(!is_dated) {
Expand Down Expand Up @@ -278,9 +291,18 @@ public final synchronized String request(String[] url_path, HashMap<String, Stri

// submit request with updated properties
// update collection to be dated
collection = name + "-" + date.format(date_format).toString();
String temp_collection = collection;
StringBuilder sb = new StringBuilder();
sb.append(name);
sb.append("-").append(date.format(date_format).toString());
if(!user_properties.isEmpty())
sb.append("-").append(user_properties.toString());
if(url_path.length != 0)
sb.append("-").append(Arrays.toString(url_path));
this.collection = sb.toString();
collection = sb.toString();
String request = request(url_path, properties, headers);
collection = name;
collection = temp_collection;
if(request != null)
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Properties;
import java.util.Set;

import org.core.logger.Logger;
import org.reflections.Reflections;
import org.reflections.util.ConfigurationBuilder;
import org.stream.external.handler.ExternalStreamManager;
Expand Down Expand Up @@ -213,7 +214,7 @@ public void initialize(ExternalStreamManager manager) throws InstantiationExcept
manager, name, url, url_path, properties, headers, tags, recursive_location, recursive_replacement, path,
is_dated, date_location, date_start_var, date_end_var, date_format));

System.out.println(String.format("Successfully added type [%s]", name));
Logger.log(String.format("[ExternalRequestManager] Successfully added Request Framework type [%s]", name));
}
}

Expand Down
Loading