Skip to content

Commit

Permalink
Integrated REST API endpoints successfully.
Browse files Browse the repository at this point in the history
  • Loading branch information
Conor Flynn committed Sep 26, 2022
1 parent 3372fa5 commit 6c940c2
Show file tree
Hide file tree
Showing 16 changed files with 208 additions and 134 deletions.
Binary file modified Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx
Binary file not shown.
3 changes: 3 additions & 0 deletions DeFi-Data-Engine/DeFi Data Engine/config/app.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ general.data.delim=,
# collection delimiter
general.collection.delim==

# transfer delimiter
general.transfer.delim=&&&

# date time formatter for data intake
general.data.dateformat=dd-MM-yyyy

Expand Down
4 changes: 2 additions & 2 deletions DeFi-Data-Engine/DeFi Data Engine/config/stream.properties
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# === GENERAL PROPERTIES ===

# consumer types for accepting input
general.consumer.types=null
general.consumer.types=socket_consumer

# producer types for writing output
general.producer.types=null
general.producer.types=socket_producer

# === REST SOCKET PROPERTIES ===

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.SocketException;

import org.framework.router.Response;
import org.json.JSONObject;
import org.out.handler.OutputConsumer;
import org.out.handler.OutputManager;
import org.out.handler.ProtocolDirectory;
import org.out.socket.SocketManager;
import org.properties.Config;

Expand Down Expand Up @@ -46,41 +46,79 @@ public void run() {

// listen for data packets from rest socket
while(true) {
String[] data = ((String)in.readUTF()).split(Config.getProperty("app", "general.internal.delim"));
int protocol_index = 0, data_index = 0;
if(data.length == 2) {
protocol_index = 0;
data_index = 1;
}

else if(data.length == 3) {
protocol_index = 1;
data_index = 2;
}

else {
System.err.println("Invalid data format for SocketConsumer. Terminating.");
System.exit(1);
}

String[] protocol = ProtocolDirectory.getProtocol(data[protocol_index]);
if(protocol == null) {
String[] data = ((String)in.readUTF()).split(Config.getProperty("app", "general.transfer.delim"));
String tag, sub_tag, sub_data, data_str;

if(data.length == 3) {
tag = data[0];
sub_tag = data[1];
data_str = data[2];

Response response = send(tag, sub_tag, data_str);
out.writeUTF(new JSONObject()
.put("response", "403")
.put("message", "Protocol does not exist. Please reference documentation.")
.put("response", "200")
.put("code", response.code())
.put("message", response.message())
.put("data", response.data())
.toString());
} else {
Response response = send(protocol[0], protocol[1], data[data_index], data[0]);

} else if(data.length == 4) {
tag = data[0];
sub_tag = data[1];
sub_data = data[2];
data_str = data[3];

Response response = send(tag, sub_tag, data_str, sub_data);
out.writeUTF(new JSONObject()
.put("response", "200")
.put("code", response.code())
.put("message", response.message())
.put("data", response.data())
.toString());
.toString());

} else {
out.writeUTF(new JSONObject()
.put("response", "502")
.put("message", "Communication between REST API and SocketConsumer is not consistent. Differing input lengths.")
.toString());
}


// int protocol_index = 0, data_index = 0;
// if(data.length == 2) {
// protocol_index = 0;
// data_index = 1;
// }
//
// else if(data.length == 3) {
// protocol_index = 1;
// data_index = 2;
// }
//
// else {
// System.err.println("Invalid data format for SocketConsumer. Terminating.");
// System.exit(1);
// }
//
// String[] protocol = ProtocolDirectory.getProtocol(data[protocol_index]);
// if(protocol == null) {
// out.writeUTF(new JSONObject()
// .put("response", "403")
// .put("message", "Protocol does not exist. Please reference documentation.")
// .toString());
// } else {
// Response response = send(protocol[0], protocol[1], data[data_index], data[0]);
// out.writeUTF(new JSONObject()
// .put("response", "200")
// .put("code", response.code())
// .put("message", response.message())
// .put("data", response.data())
// .toString());
// }
}
} catch(SocketException e) {
System.out.println("Rest Application has unexpectedly closed.");
System.err.println("Rest Application has unexpectedly closed.");
System.exit(1);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public SocketDestination(String key, DataOutputStream out) {
public final synchronized boolean send(Packet packet) {
try {
out.write(packet.getData().getBytes());
out.write(10);
} catch (JSONException | IOException e) {
e.printStackTrace();
return false;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public synchronized static String accept(int port) {
DataOutputStream out = new DataOutputStream(connection.getOutputStream());
String key = UUID.randomUUID().toString();
out.write(key.getBytes());
out.write(10);

if(connections.containsKey(key))
connections.get(key).close();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package org.stream.external.connected.connections;

import java.time.format.DateTimeFormatter;

import org.properties.Config;
import org.stream.external.handler.ExternalStreamConnection;
import org.stream.external.handler.ExternalStreamManager;

Expand All @@ -12,8 +9,6 @@ public class TemplateExternalConnection extends ExternalStreamConnection {
private boolean authorized = false;
private boolean override = false;

private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.getProperty("app", "general.data.dateformat"));

public TemplateExternalConnection(ExternalStreamManager manager, String data) {
super(manager, data.split(",")[0]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ public Response processEXST(Packet packet) {
public Response processINIT(Packet packet) {
// extract template from data
String data = packet.getData();
int splitIndex = data.indexOf(',');
String delim = Config.getProperty("app", "general.internal.delim");
int delim_len = delim.length();
int splitIndex = data.indexOf(delim);
String template = "";
if(splitIndex != -1) {
template = data.substring(0, splitIndex).trim();
data = data.substring(splitIndex + 1).trim();
data = data.substring(splitIndex + delim_len).trim();
} else {
template = data;
data = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,6 @@ protected boolean killStream(String hash) {
*/
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.getProperty("app", "general.data.dateformat"));
protected void processSubscription(String hash, String subscription, String destination, String data) {
Response out_response = handler.send("OUT", "EDAT", String.format("%s", data), destination);
if(out_response.code() != 200)
Logger.warn(out_response);

// define subscribed date
String date = LocalDate.now().format(formatter);
String collection = subscription + Config.getProperty("app", "general.collection.delim") + date;
Expand All @@ -420,10 +416,6 @@ protected void processSubscription(String hash, String subscription, String dest
* @param data Data sent by the given subscription.
*/
protected void processRequest(String hash, String request, String destination, String data) {
Response out_response = handler.send("OUT", "EDAT", String.format("%s", data), destination);
if(out_response.code() != 200)
Logger.warn(out_response);

Response lsh_response = handler.send("LSH", "PUSH", String.format("%s%s%s", data, Config.getProperty("app", "general.internal.delim"), request));
if(lsh_response.code() != 200)
Logger.warn(lsh_response);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package org.stream.local.handler;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.framework.router.Packet;
import org.framework.router.Response;
import org.framework.router.ResponseFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ public Response processRQST(Packet packet) {

// dated
else if(data.length == 5) {
LocalDate start = LocalDate.parse(data[1], formatter);
LocalDate end = LocalDate.parse(data[2], formatter);
try {
LocalDate start = LocalDate.parse(data[1], formatter);
LocalDate end = LocalDate.parse(data[2], formatter);
List<LocalDate> dates = start.datesUntil(end).collect(Collectors.toList());
// invalid date processing
if(dates.isEmpty())
Expand All @@ -108,13 +108,20 @@ else if(data.length == 5) {
// perform requests
request = uuid + Config.getProperty("app", "general.collection.delim") + data[3] + Config.getProperty("app", "general.collection.delim") + date.format(formatter);
lsh_response = send("LSH", "RQST", format(request, data[4]), packet.getSubData());
if(lsh_response.code() == 200)
continue;

if(lsh_response.code() == 446) {
esh_response = send("ESH", "RQST", format(data[0], request), packet.getSubData());
if(esh_response.code() != 200)
return esh_response;
} else {
return lsh_response;
}


if(lsh_response.code() != 200 && lsh_response.code() != 446)
lsh_response = send("LSH", "RQST", format(request, data[4]), packet.getSubData());
if(lsh_response.code() != 200)
return lsh_response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,9 @@ public void TestRQSTDated() {
Config.setProperty("stream", "mongodb.database.main", "testing");
Core core = new Core();

assertEquals(200, core.send("SRC", "INIT", "external_template, key").code());
assertEquals(200, core.send("SRC", "INIT", "external_template:::key").code());

assertEquals(200, core.send("SRC", "RQST", "key:::10-09-2022:::12-09-2022:::template-external-request:::get_all, template-external-request", "null").code());
// assertEquals(445, core.send("LSH", "RQST", "get_all, test-mongo-database, invalid").code());
// assertEquals(446, core.send("LSH", "RQST", "get_all, dne").code());
//
// // test get_item
// assertEquals(200, core.send("LSH", "RQST", "get_item, test-mongo-database, element1, e1").code());
// assertEquals(446, core.send("LSH", "RQST", "get_item, test-mongo-database, element1, e2").code());
//
// assertEquals(500, core.send("LSH", "RQST", "").code());
assertEquals(200, core.send("SRC", "RQST", "key:::10-09-2022:::15-09-2022:::template-external-request:::get_all, template-external-request", "null").code());
}

@Test
Expand Down
13 changes: 11 additions & 2 deletions DeFi-Data-Engine/Rest Application/config/app.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,17 @@
# Wait-time in between checking for responses from engine:
rest.wait.ms=10

# Delimiter used for communication between engine and API:
general.internal.delim=,.,
# delimiter used for internal processing
general.internal.delim=:::

# data delimiter
general.data.delim=,

# collection delimiter
general.collection.delim==

# transfer delimiter
general.transfer.delim=&&&

# === SPRING PROPERTIES ===

Expand Down
Loading

0 comments on commit 6c940c2

Please sign in to comment.