Skip to content

Commit

Permalink
Merge pull request #5 from DataINCITE/flynnc3-temp
Browse files Browse the repository at this point in the history
Update Internals to be compatible with API handler integration
  • Loading branch information
flynnc3 authored Mar 21, 2023
2 parents 6a5ea53 + 9095e63 commit 4ee2adc
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 391 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ public class Config {
Properties stream_properties = new Properties();
stream_properties.put("general.consumer.types", "socket_consumer");
stream_properties.put("general.producer.types", "socket_producer");
//stream_properties.put("rest.socket.address", "DataEngine");
stream_properties.put("rest.socket.address", "localhost");
stream_properties.put("rest.socket.address", "DataEngine");
//stream_properties.put("rest.socket.address", "localhost");
stream_properties.put("rest.socket.port", "61100");
stream_properties.put("rest.socket.key", "rest-key-reserved");
//stream_properties.put("output.socket.address", "RestApp");
stream_properties.put("output.socket.address", "localhost");
stream_properties.put("output.socket.address", "RestApp");
// stream_properties.put("output.socket.address", "localhost");
stream_properties.put("output.socket.port", "61200");
//stream_properties.put("local.stream.type", "mongo_db");
stream_properties.put("local.stream.type", "null");
stream_properties.put("mongodb.properties.uri", "mongodb://MONGO:27017");
stream_properties.put("local.stream.type", "mongo_db");
//stream_properties.put("local.stream.type", "null");
//stream_properties.put("mongodb.properties.uri", "mongodb://MONGO:27017");
stream_properties.put("mongodb.properties.uri", "mongodb://localhost:27017");
stream_properties.put("mongodb.database.state", "main-state-db");
stream_properties.put("mongodb.database.main", "main-db");
stream_properties.put("mongodb.auth.collection", "auth-collection");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public Response processRQST(Packet packet) {

// check to see if dated
// if not
if((validate = packet.validate("startDate", "endDate")) != null)
if((validate = packet.validate("start_date", "end_date")) != null)
response = manager.request(packet.getData("type"), packet.getData());

// if dated
else
response = manager.request(packet.getData("type"), packet.getData(), packet.getData("startDate"), packet.getData("endDate"));
response = manager.request(packet.getData("type"), packet.getData(), packet.getData("start_date"), packet.getData("end_date"));

// check to see if valid
if(!((boolean)response[0])) {
Expand All @@ -56,184 +56,4 @@ public Response processRQST(Packet packet) {
// return valid
return ResponseFactory.response200();
}

// // source: source of data
// public Response processEXSR(Packet packet) {
// String validate;
// if((validate = packet.validate("source")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// return ResponseFactory.response200(String.format("%s", manager.containsTemplate(packet.getData("source"))));
// }
//
// // key: key of the given stream
// public Response processEXST(Packet packet) {
// String validate;
// if((validate = packet.validate("key")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// return ResponseFactory.response200(String.format("%s", manager.containsStream(packet.getData("key"))));
// }
//
// // source: source of the given stream
// // auth_data[]: all keys associated with data
// public Response processINIT(Packet packet) {
// String validate;
// if((validate = packet.validate("source")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String source = packet.getData("source");
//
// // validate data
// String tempHash = manager.getHash(source, packet.getData());
// if(tempHash == null)
// return ResponseFactory.response430(packet.getData());
// if(manager.containsStream(tempHash))
// return ResponseFactory.response220(tempHash);
//
// // attempt to add stream
// Object[] output = manager.addStream(source, packet.getData());
// boolean success = (Boolean) output[0];
// String hash = (String) output[1];
//
// if(!success)
// return ResponseFactory.response420(source);
//
// boolean authorized = manager.authorizeStream(hash);
//
// // if successful authorize
// if(authorized)
// return ResponseFactory.response200(String.format("%s", hash));
//
// manager.removeStream(hash);
// return ResponseFactory.response422(source);
// }
//
// // key: stream key
// public Response processIATH(Packet packet) {
// String validate;
// if((validate = packet.validate("key")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// if(!manager.containsStream(packet.getData("key")))
// return ResponseFactory.response421(packet.getData("key"));
//
// return ResponseFactory.response200(String.format("%s", manager.isStreamAuthorized(packet.getData("key"))));
// }
//
// public Response processIATV(Packet packet) {
// String validate;
// if((validate = packet.validate("key")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String key = packet.getData("key");
// if(!manager.containsStream(key))
// return ResponseFactory.response421(key);
//
// return ResponseFactory.response200(String.format("%s", manager.isStreamActive(key)));
// }
//
// public Response processEXEC(Packet packet) {
// String validate;
// if((validate = packet.validate("key")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String key = packet.getData("key");
// if(!manager.containsStream(key))
// return ResponseFactory.response421(key);
//
// if(!manager.isStreamReady(key))
// return ResponseFactory.response423(key);
//
// if(manager.isStreamActive(key))
// return ResponseFactory.response424(key);
//
// return ResponseFactory.response200(String.format("%s", manager.executeStream(key)));
// }
//
// public Response processKILL(Packet packet) {
// String validate;
// if((validate = packet.validate("key")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String key = packet.getData("key");
// if(!manager.containsStream(key))
// return ResponseFactory.response421(key);
//
// if(!manager.isStreamActive(key))
// return ResponseFactory.response425(key);
//
// return ResponseFactory.response200(String.format("%s", manager.killStream(key)));
// }
//
// public Response processSUBS(Packet packet) {
// String validate;
// if((validate = packet.validate("key", "subscription")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String key = packet.getData("key");
// String subscription = packet.getData("subscription");
//
// if(!manager.containsStream(key))
// return ResponseFactory.response421(key);
//
// if(!manager.isStreamActive(key))
// return ResponseFactory.response425(key);
//
// if(!manager.containsSubscriptionType(key, subscription))
// return ResponseFactory.response426(key, subscription);
//
// Object[] response = manager.subscribe(key, subscription);
//
// if((Boolean)response[0])
// return ResponseFactory.response200(String.format("%s", "true"));
//
// return ResponseFactory.response427(key, response[0].toString(), response[1].toString());
// }
//
// public Response processRQST(Packet packet) {
// String validate;
// if((validate = packet.validate("key", "request", "query", "destination")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String key = packet.getData("key");
// String request = packet.getData("request");
//
// if(!manager.containsStream(key))
// return ResponseFactory.response421(key);
//
// if(!manager.isStreamReady(key))
// return ResponseFactory.response423(key);
//
// if(!manager.containsRequestType(key, request))
// return ResponseFactory.response428(key, request);
//
// Object[] response = manager.request(key, packet.getData());
//
// if(response == null)
// return ResponseFactory.response501("Response from Manager.request should never be null. "
// + "Improper implementation of ExternalStreamConnection.");
//
// if((Boolean)response[0])
// return ResponseFactory.response200(String.format("%s", (String)response[1]));
//
// return ResponseFactory.response429(key, request, response[1].toString());
// }
//
// public Response processTYPE(Packet packet) {
// String validate;
// if((validate = packet.validate("key")) != null)
// return ResponseFactory.response500("ExternalStreamHandler", validate);
//
// String key = packet.getData("key");
// if(!manager.containsStream(key))
// return ResponseFactory.response421(key);
//
// String uuid = manager.getStreamType(key);
// if(uuid == null)
// return ResponseFactory.response501("Stream was removed in different thread mid observation.");
//
// return ResponseFactory.response200(uuid);
//
// }
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.stream.external.handler;

import java.util.HashMap;
import java.util.TreeMap;

import org.framework.router.Response;
import org.stream.external.requests.ExternalRequestFramework;
import org.stream.external.requests.ExternalRequestManager;

Expand All @@ -11,8 +11,6 @@ public class ExternalStreamManager {
private final ExternalStreamHandler handler;
private final ExternalRequestManager manager;

private final HashMap<String, String> collections = new HashMap<String, String>();

protected ExternalStreamManager(ExternalStreamHandler handler) {
this.handler = handler;
this.manager = new ExternalRequestManager();
Expand Down Expand Up @@ -50,13 +48,16 @@ protected Object[] request(String type, HashMap<String, String> data, String sta

// retrieve properties
HashMap<String, String> properties = new HashMap<String, String>();
TreeMap<String, String> user_properties = new TreeMap<String, String>();
if(data.containsKey("properties")) {
String[] raw_properties = data.get("properties").split(",");
if(raw_properties.length % 2 != 0)
return new Object[] {false, String.format("Properties must be in <key, value> pairs.")};

for(int i = 0; i < raw_properties.length; i+=2)
for(int i = 0; i < raw_properties.length; i+=2) {
properties.put(raw_properties[i], raw_properties[i + 1]);
user_properties.put(raw_properties[i], raw_properties[i + 1]);
}
}

// retrieve headers
Expand All @@ -75,10 +76,10 @@ protected Object[] request(String type, HashMap<String, String> data, String sta

// submit response
String response;
if(startDate == null && endDate == null)
response = request.request(url_path, properties, headers);
if(startDate == null || endDate == null)
response = request.request(url_path, properties, headers, user_properties);
else
response = request.request(url_path, properties, headers, startDate, endDate);
response = request.request(url_path, properties, headers, startDate, endDate, user_properties);

if(response != null)
return new Object[] {false, response};
Expand All @@ -90,7 +91,10 @@ public void processRequest(String collection, HashMap<String, String> data) {
if(data == null || data.isEmpty())
return;

System.out.println(collection + ": " + data.toString());
// TODO: FINISH
StringBuilder sb = new StringBuilder();
for(String key : data.keySet())
sb.append(key.replaceAll(",", ".") + "," + data.get(key).replaceAll(",", ".") + ",");
sb.delete(sb.length() - 1, sb.length());
handler.send("LSH", "PUSH", "collection", collection, "data", sb.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.TreeMap;
import java.util.stream.Stream;

import org.json.JSONArray;
Expand Down Expand Up @@ -65,7 +67,6 @@ public ExternalRequestFramework(ExternalStreamManager manager, String name, Stri
HashMap<String, String> tags, String[] recursive_location, String recursive_replacement, String[] path,
boolean is_dated, String date_location, String date_start_var, String date_end_var, String date_format) {
this.manager = manager;
this.collection = name;
this.name = name;
this.url = url;
this.url_path = url_path;
Expand Down Expand Up @@ -178,7 +179,6 @@ protected final Request getRequest(String url, HashMap<String, String> propertie
return builder.build();
}


public final synchronized String request(String[] url_path, HashMap<String, String> properties, HashMap<String, String> headers) {
// validate that url_path is correctly formatted
if(url_path.length % 2 != 0) {
Expand Down Expand Up @@ -208,8 +208,21 @@ public final synchronized String request(String[] url_path, HashMap<String, Stri
return process(url_builder.toString(), properties, headers);
}

public final synchronized String request(String[] url_path, HashMap<String, String> properties, HashMap<String, String> headers, TreeMap<String, String> user_properties) {
// update collection to be dated
StringBuilder sb = new StringBuilder();
sb.append(name);
if(!user_properties.isEmpty())
sb.append("-").append(user_properties.toString());
if(url_path.length != 0)
sb.append("-").append(Arrays.toString(url_path));
this.collection = sb.toString();
collection = sb.toString();
return request(url_path, properties, headers);
}

public final synchronized String request(String[] url_path, HashMap<String, String> properties, HashMap<String, String> headers,
String startDate, String endDate) {
String startDate, String endDate, TreeMap<String, String> user_properties) {

// validate that the request can be dated
if(!is_dated) {
Expand Down Expand Up @@ -278,9 +291,18 @@ public final synchronized String request(String[] url_path, HashMap<String, Stri

// submit request with updated properties
// update collection to be dated
collection = name + "-" + date.format(date_format).toString();
String temp_collection = collection;
StringBuilder sb = new StringBuilder();
sb.append(name);
sb.append("-").append(date.format(date_format).toString());
if(!user_properties.isEmpty())
sb.append("-").append(user_properties.toString());
if(url_path.length != 0)
sb.append("-").append(Arrays.toString(url_path));
this.collection = sb.toString();
collection = sb.toString();
String request = request(url_path, properties, headers);
collection = name;
collection = temp_collection;
if(request != null)
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Properties;
import java.util.Set;

import org.core.logger.Logger;
import org.reflections.Reflections;
import org.reflections.util.ConfigurationBuilder;
import org.stream.external.handler.ExternalStreamManager;
Expand Down Expand Up @@ -213,7 +214,7 @@ public void initialize(ExternalStreamManager manager) throws InstantiationExcept
manager, name, url, url_path, properties, headers, tags, recursive_location, recursive_replacement, path,
is_dated, date_location, date_start_var, date_end_var, date_format));

System.out.println(String.format("Successfully added type [%s]", name));
Logger.log(String.format("[ExternalRequestManager] Successfully added Request Framework type [%s]", name));
}
}

Expand Down
Loading

0 comments on commit 4ee2adc

Please sign in to comment.