Skip to content

Commit

Permalink
Update so Socket connection can accept requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Conor Flynn committed Dec 13, 2022
1 parent 6de8233 commit f2328d5
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,11 @@ public final Response receive(Packet packet) {
* All {@link Method} objects must contain a single parameter, a {@link Packet} object,
* and return a {@link Response} object.
*
* @param subtag Subtag of the process to handle the incoming {@link Packet} object.
* @param sub_tag Subtag of the process to handle the incoming {@link Packet} object.
* @param method {@link Method} to pass the {@link Packet} object to.
*/
public final void addProcess(String subtag, Method method) {
processes.put(subtag, method);
public final void addProcess(String sub_tag, Method method) {
processes.put(sub_tag, method);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public final synchronized boolean send(Packet packet) {
try {
out.write(packet.getData("data").getBytes());
out.write(10);
out.flush();
} catch (JSONException | IOException e) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Object[] producerListen() {
return new Object[] {true, ""};
}

protected final Response send(String tag, String sub_tag, String... data) {
public final Response send(String tag, String sub_tag, String... data) {
return handler.send(tag, sub_tag, data);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.out.producers;

import org.framework.router.Response;
import org.out.destinations.SocketDestination;
import org.out.handler.OutputManager;
import org.out.handler.OutputProducer;
Expand All @@ -9,6 +10,7 @@
public class SocketProducer extends OutputProducer {

private Thread listener;
public final SocketProducer producer = this;

public SocketProducer(OutputManager manager) {
super(manager);
Expand All @@ -24,7 +26,7 @@ protected boolean init() {
listener = new Thread() {
public void run() {
while(true) {
String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port")));
String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port")), producer);
if(key == null) {
System.err.println("SocketProducer: Could not create connection to socket port.");
System.exit(1);
Expand Down Expand Up @@ -55,4 +57,8 @@ protected boolean kill() {

return true;
}

public Response send(String tag, String sub_tag, String... data) {
return manager.send(tag, sub_tag, data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.UUID;

import org.core.logger.Logger;
import org.framework.router.Response;
import org.json.JSONObject;
import org.out.producers.SocketProducer;
import org.properties.Config;

public class SocketManager {

Expand Down Expand Up @@ -38,8 +43,69 @@ public synchronized static boolean exists(String key) {
return connections.containsKey(key);
}

public synchronized static void createThread(String key, SocketProducer producer) {
Thread thread = new Thread() {
public void run() {
// perform logger verifications
Logger.log(String.format("Starting thread for Socket with key <%s>", key));
if(!inflow.containsKey(key)) {
Logger.terminate(String.format("Key <%s> not found within inflow thread configuration, manual review recommended.", key));
return;
}

if(!outflow.containsKey(key)) {
Logger.terminate(String.format("Key <%s> not found within outflow thread configuration, manual review recommended.", key));
return;
}

// retrieve inflow stream and listen
DataInputStream in = inflow.get(key);
DataOutputStream out = outflow.get(key);
String str;
while(true)
try {
str = readLine(in);

// parse input
String[] input = str.split(Config.getProperty("app", "general.transfer.delim"));

// validate input
if(input.length <= 2) {
out.writeUTF(new JSONObject()
.put("response", "502")
.put("message", "Packet processed from REST API does not contain a TAG or SUB_TAG. Review REST API endpoint code.")
.toString());
}

// extract non-essential data
String[] data = Arrays.copyOfRange(input, 2, input.length);
String tag = input[0];
String sub_tag = input[1];

// execute valid response to engine
Response response = producer.send(tag, sub_tag, data);
out.writeUTF(new JSONObject()
.put("response", "200")
.put("code", response.code())
.put("message", response.message())
.put("data", response.data())
.toString());
out.flush();
System.out.println("<<<responded>>>");

} catch(Exception e) {
break;
}

Logger.log(String.format("Terminating thread for Socket with key <%s>", key));
}
};

thread.start();
}

// used for generic channel accepting
public synchronized static String accept(int port) {
public synchronized static String accept(int port, SocketProducer producer) {
if(!servers.containsKey(port))
if(!createServer(port))
return null;
Expand Down Expand Up @@ -68,6 +134,9 @@ public synchronized static String accept(int port) {
if(!synced(key))
throw new Exception("Connection inflow and outflow not synchronized");

// start internal thread for socket information parsing
createThread(key, producer);

return key;
} catch(Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -132,4 +201,13 @@ public static DataInputStream read(String key) {
private static boolean synced(String key) {
return connections.containsKey(key) && inflow.containsKey(key) && outflow.containsKey(key);
}

private static final String readLine(DataInputStream in) throws IOException {
StringBuilder out = new StringBuilder();
char c = 0;
while((c = (char)in.read()) != 10)
out.append(c);

return out.toString();
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ private final String request(String tag, String sub_tag, String... data) {
formatted_data.append(delim);
}

// static request
out.writeUTF(String.format("%s%s%s%s%s", tag, delim, sub_tag, delim, formatted_data));
return in.readUTF();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
Expand All @@ -15,28 +16,15 @@

public class LocalTest {

private static final String host = "defi-de.idea.rpi.edu";
private static final String host = "localhost";

public static void main(String[] args) throws UnknownHostException, IOException {
final Socket socket = SocketFactory.getDefault().createSocket(host, 61200);
final DataInputStream in = new DataInputStream(socket.getInputStream());
final BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
final DataOutputStream out = new DataOutputStream(socket.getOutputStream());

String destination = readLine(in);

Thread thread = new Thread() {
public void run() {
try {
while(true) {
System.out.println(readLine(in));
}
} catch(Exception e) {
e.printStackTrace();
System.exit(1);
}
}
};
thread.start();

String init = request(String.format("http://%s:8080/defi/v1/rest/initialize?"
+ "source=amber_data&"
+ "auth_data=key,UAK7ed69235426c360be22bfc2bde1809b6",
Expand All @@ -45,34 +33,49 @@ public void run() {
JSONObject json_init = new JSONObject(init);
String key = json_init.getString("data");

String rqst_protocol_dated = String.format("http://%s:8080/defi/v1/rest/request_dated?"
+ "destination=%s&"
+ "key=%s&"
+ "request=request,aave-protocol-dated&"
+ "query=aave-protocol-dated&"
+ "start_date=%s&"
+ "end_date=%s",
host,
out.writeBytes(String.format(
"SRC&&&RQST&&&"
+ "destination&&&%s&&&"
+ "key&&&%s&&&"
+ "start_date&&&2022-08-01&&&"
+ "end_date&&&2022-08-02&&&"
+ "query&&&aave-protocol-dated&&&"
+ "request&&&aave-protocol-dated\n",
destination,
key,
"2022-08-01",
"2022-08-02");
key));

String rqst_asset_dated = String.format("http://%s:8080/defi/v1/rest/request_dated?"
+ "destination=%s&"
+ "key=%s&"
+ "request=request,aave-protocol-dated&"
+ "query=aave-protocol-dated&"
+ "start_date=%s&"
+ "end_date=%s",
host,
destination,
key,
"2022-08-01",
"2022-08-02");
while(true) {
System.out.print(readLine(in));
}

String rqst = request(rqst_asset_dated);
System.out.println(rqst);
// String rqst_protocol_dated = String.format("http://%s:8080/defi/v1/rest/request_dated?"
// + "destination=%s&"
// + "key=%s&"
// + "request=request,aave-protocol-dated&"
// + "query=aave-protocol-dated&"
// + "start_date=%s&"
// + "end_date=%s",
// host,
// destination,
// key,
// "2022-08-01",
// "2022-08-02");
//
// String rqst_asset_dated = String.format("http://%s:8080/defi/v1/rest/request_dated?"
// + "destination=%s&"
// + "key=%s&"
// + "request=request,aave-protocol-dated&"
// + "query=aave-protocol-dated&"
// + "start_date=%s&"
// + "end_date=%s",
// host,
// destination,
// key,
// "2022-08-01",
// "2022-08-02");
//
// String rqst = request(rqst_asset_dated);
// System.out.println(rqst);
}

public static String request(String str) throws IOException {
Expand All @@ -99,7 +102,7 @@ public static String request(String str) throws IOException {
return "";
}

public static final String readLine(DataInputStream in) throws IOException {
public static final String readLine(BufferedReader in) throws IOException {
StringBuilder out = new StringBuilder();
char c = 0;
while((c = (char)in.read()) != 10)
Expand Down

0 comments on commit f2328d5

Please sign in to comment.