Skip to content

Commit

Permalink
Add Logger and integrate LSH framework.
Browse files Browse the repository at this point in the history
  • Loading branch information
Conor Flynn committed Sep 20, 2022
1 parent c6c4af8 commit 7796f44
Show file tree
Hide file tree
Showing 28 changed files with 356 additions and 105 deletions.
Binary file modified Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx
Binary file not shown.
7 changes: 1 addition & 6 deletions DeFi-Data-Engine/DeFi Data Engine/.classpath
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@
</attributes>
</classpathentry>
<classpathentry kind="src" path="src/main/webapp/WEB-INF"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/jdk-17.0.4">
<attributes>
<attribute name="module" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/jdk-17.0.2"/>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
Expand Down
8 changes: 7 additions & 1 deletion DeFi-Data-Engine/DeFi Data Engine/config/app.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# === GENERAL PROPERTIES ===

# delimiter used for internal processing
general.internal.delim=,.,
general.internal.delim=,.,

# enable all packet logging
general.logging.packets=true

# enable all response logging
general.logging.responses=true
9 changes: 7 additions & 2 deletions DeFi-Data-Engine/DeFi Data Engine/config/output.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# === GENERAL PROPERTIES ===

consumer.types=socket_consumer
producer.types=socket_producer
consumer.types=null
producer.types=null

# === REST SOCKET PROPERTIES ===

Expand All @@ -21,3 +21,8 @@ output.socket.address=localhost

# Output socket port
output.socket.port=61200

# === LOCAL STREAM PROPERTIES ===

# local stream type used for database configuration
local.stream.type=mongo_db
17 changes: 0 additions & 17 deletions DeFi-Data-Engine/DeFi Data Engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,6 @@
<artifactId>json</artifactId>
<version>20220320</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>

<build>
Expand All @@ -70,8 +55,6 @@
</systemPropertyVariables>
</configuration>
</plugin>


</plugins>
</build>
</project>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.core.engine.Engine;
import org.core.logger.Logger;
import org.framework.router.Response;
import org.framework.router.Router;
import org.out.controller.Controller;
import org.out.handler.OutputHandler;
Expand All @@ -14,12 +15,13 @@ public Core() {

OutputHandler out = new OutputHandler();
Controller crl = new Controller();
Logger log = new Logger();
Engine eng = new Engine();
StreamManager str = new StreamManager();

this.connect(out, crl, log, eng, str);
this.connect(out, crl, eng, str);

this.send("ENG", "STRT", "");
Response response = this.send("ENG", "STRT", "");
if(response.code() != 200)
Logger.terminate(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import org.framework.router.Packet;
import org.framework.router.Response;
import org.framework.router.ResponseFactory;
import org.framework.router.Router;
import org.properties.Config;

public class Engine extends Router {

Expand All @@ -11,8 +13,16 @@ public Engine() {
}

public Response processSTRT(Packet packet) {
// start output processes
// TODO: check all engine processes outside of OUT prior to returning response.
return send("OUT", "STRT", "");
// start output processes:
Response out_response = send("OUT", "STRT", "");
if(out_response.code() != 200)
return out_response;

// start local stream handler processes:
Response lsh_response = send("LSH", "INIT", Config.getProperty("output", "local.stream.type"));
if(lsh_response.code() != 200)
return lsh_response;

return ResponseFactory.response200();
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,45 @@
package org.core.logger;

import org.framework.router.Router;
import java.time.Instant;

public class Logger extends Router {
import org.framework.router.Packet;
import org.framework.router.Response;

public Logger() {
super("logger", "LOG");
public class Logger {

public static final void log(Packet packet) {
System.out.println(packetFormat(packet));
}

public static final void log(Response response) {
System.out.println(responseFormat(response));
}

public static final void terminate(Packet packet) {
System.err.println(packetFormat(packet));
System.exit(1);
}

public static final void terminate(Response response) {
System.err.println(responseFormat(response));
System.exit(1);
}

private static final String packetFormat(Packet packet) {
return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%s] [%s]",
Instant.now().toString(),
Thread.currentThread().getName(),
packet.getSender(),
packet.getTag(),
packet.getData(),
packet.getSubData());
}

private static final String responseFormat(Response response) {
return String.format("[%s] [%-10s] RESPONSE - [%3d] [%s]",
Instant.now().toString(),
Thread.currentThread().getName(),
response.code(),
response.data());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.framework.router;

import org.core.logger.Logger;
import org.properties.Config;

/**
* The {@link Packet} class represents a standardized data transfer object
* used throughout the engine. It contains a series of values which help to
Expand All @@ -10,6 +13,8 @@
*
*/
public class Packet {

private final static boolean log = Config.getProperty("app", "general.logging.packets").equals("true");

private final String sender;
private final String tag;
Expand Down Expand Up @@ -89,7 +94,10 @@ public final String getSubData() {
* @return New {@link Packet} object.
*/
public static Packet packet(Router router, String tag, String sub_tag, String data) {
return new Packet(router, tag, sub_tag, data, "");
Packet packet = new Packet(router, tag, sub_tag, data, "");
if(log)
Logger.log(packet);
return packet;
}

/**
Expand All @@ -103,6 +111,9 @@ public static Packet packet(Router router, String tag, String sub_tag, String da
* @return New {@link Packet} object.
*/
public static Packet packet(Router router, String tag, String sub_tag, String data, String sub_data) {
return new Packet(router, tag, sub_tag, data, sub_data);
Packet packet = new Packet(router, tag, sub_tag, data, sub_data);
if(log)
Logger.log(packet);
return packet;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.framework.router;

import org.core.logger.Logger;
import org.properties.Config;

/**
* The {@link Response} class is used to relay information from a
* given {@link Packet} sent through a {@link Router}. {@link Response}
Expand All @@ -18,6 +21,8 @@
*
*/
public final class Response {

private final static boolean log = Config.getProperty("app", "general.logging.responses").equals("true");

private final int code;
private final String message;
Expand Down Expand Up @@ -91,7 +96,10 @@ public String data() {
* @return New {@link Response} object formatted based on the passed parameters.
*/
public static Response create(int code, String message) {
return new Response(code, message);
Response response = new Response(code, message);
if(log)
Logger.log(response);
return response;
}

/**
Expand All @@ -104,6 +112,9 @@ public static Response create(int code, String message) {
* @return New {@link Response} object formatted based on the passed parameters.
*/
public static Response create(int code, String message, String data) {
return new Response(code, message, data);
Response response = new Response(code, message, data);
if(log)
Logger.log(response);
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ public static Response response0() {
return Response.create(0, "");
}

public static Response response200() {
return Response.create(200, "Successful Response.");
}

public static Response response200(String data) {
return Response.create(200, "Successful Response.", data);
return Response.create(200, "", data);
}

public static Response response220(String hash) {
Expand Down Expand Up @@ -84,6 +88,30 @@ public static Response response429(String hash, String request, String response)
return Response.create(429, String.format("Stream with hash <%s> returned an irregular response when attempting to subscribe to <%s>. Response returned is: <%s>", hash, request, response));
}

public static Response response440(String source) {
return Response.create(440, String.format("Requested data source <%s> does not exist in cache.", source));
}

public static Response response441(String hash) {
return Response.create(441, String.format("Requested data stream with given hash <%s> does not exist in cache.", hash));
}

public static Response response442(String source) {
return Response.create(442, String.format("Failed to add local data source <%s>.", source));
}

public static Response response443(String source) {
return Response.create(443, String.format("Local data stream with given source <%s> already exists.", source));
}

public static Response response444(String source) {
return Response.create(444, String.format("Failure to authorize the local data source <%s> with the given properties.", source));
}

public static Response response445(String source, String query) {
return Response.create(445, String.format("Local data stream with given source <%s> could not validate passed query <%s>", query));
}

public static Response response460(String consumer) {
return Response.create(460, String.format("Output consumer <%s> failed to listen to consumption channel.", consumer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public Response processSTRT(Packet packet) {
if(!(boolean)producer_response[0])
return ResponseFactory.response470(producer_response[1].toString());

return ResponseFactory.response200("");
return ResponseFactory.response200();
} catch(Exception e) {
e.printStackTrace();
System.exit(1);
Expand All @@ -42,6 +42,6 @@ public Response processEDAT(Packet packet) {
if(!manager.send(packet.getSubData(), packet))
return ResponseFactory.response472(packet.getSubData());

return ResponseFactory.response200("");
return ResponseFactory.response200();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class ProtocolDirectory {
protocols.put("kill", new String[]{"SRC", "KILL"});
protocols.put("subscribe", new String[]{"SRC", "SUBS"});
protocols.put("request", new String[]{"SRC", "RQST"});
protocols.put("scan", new String[] {"SRC", "SCAN"});
}

public static String[] getProtocol(String protocol) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.UUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public static final String getProperty(String name, String property) {
return properties.get(name).getProperty(property);
}

public static final void setProperty(String name, String property, String value) {
validate(name, property);
properties.get(name).setProperty(property, value);
}

public static final void validate(String name, String... keys) {
if(!properties.containsKey(name)) {
System.err.println(String.format("Property file <%s> does not exist. Program terminating.", name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public AmberDataConnection(ExternalStreamManager manager, String data) {
public String getUUID() {
return "amber_data";
}

public void init() {

}
Expand All @@ -42,7 +42,7 @@ public void defineRequestTypes() {
public String getHash(String data) {
try {
MessageDigest md = MessageDigest.getInstance("SHA-512");
byte[] bytes = md.digest(("salt" + data).getBytes());
byte[] bytes = md.digest(("salt" + System.currentTimeMillis() + data).getBytes());
BigInteger signum = new BigInteger(1, bytes);
String hashed = signum.toString(16);
while(hashed.length() < 32)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import org.stream.external.handler.ExternalStreamConnection;
import org.stream.external.handler.ExternalStreamManager;

public class TemplateConnection extends ExternalStreamConnection {
public class TemplateExternalConnection extends ExternalStreamConnection {

private boolean active = false;
private boolean authorized = false;
private boolean override = false;

public TemplateConnection(ExternalStreamManager manager, String data) {
public TemplateExternalConnection(ExternalStreamManager manager, String data) {
super(manager, data.split(",")[0]);

if(data.contains(","))
Expand Down
Loading

0 comments on commit 7796f44

Please sign in to comment.