Skip to content

Commit

Permalink
Stable version supporting AmberData - aave-protocol-dated
Browse files Browse the repository at this point in the history
  • Loading branch information
Conor Flynn committed Oct 31, 2022
1 parent f5030a2 commit fad1e0f
Show file tree
Hide file tree
Showing 22 changed files with 458 additions and 118 deletions.
Binary file modified Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

public class Logger {

public static final void log(String message) {
System.out.println(messageFormat("INFO", message));
}

public static final void log(Packet packet) {
System.out.println(packetFormat(packet));
}
Expand All @@ -16,6 +20,10 @@ public static final void log(Response response) {
System.out.println(responseFormat(response));
}

public static final void warn(String message) {
System.out.println(messageFormat("WARN", message));
}

public static final void warn(Packet packet) {
System.err.println(packetFormat(packet));
}
Expand All @@ -24,6 +32,10 @@ public static final void warn(Response response) {
System.err.println(responseFormat(response));
}

public static final void terminate(String message) {
System.err.println(messageFormat("ERROR", message));
}

public static final void terminate(Packet packet) {
System.err.println(packetFormat(packet));
System.exit(1);
Expand All @@ -34,6 +46,14 @@ public static final void terminate(Response response) {
System.exit(1);
}

private static final String messageFormat(String type, String message) {
return String.format("[%s] [%-10s] %-9s- [%s]",
time(),
Thread.currentThread().getName(),
type,
message);
}

private static final String packetFormat(Packet packet) {
return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%4s] [%s]",
time(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.framework.router;

import java.util.Arrays;
import java.util.HashMap;

public class ResponseFactory {

Expand All @@ -27,7 +28,7 @@ public static Response response200(String data) {
}

public static Response response220(String hash) {
return Response.create(220, String.format("Stream with generated hash <%s> already exists. Using existing stream for connections.", hash), String.format("true, %s", hash));
return Response.create(220, String.format("Stream with generated hash <%s> already exists. Using existing stream for connections.", hash), String.format("%s", hash));
}

public static Response response400(String router) {
Expand Down Expand Up @@ -96,6 +97,10 @@ public static Response response429(String hash, String request, String response)
return Response.create(429, String.format("Stream with hash <%s> returned an irregular response when attempting to subscribe to <%s>. Response returned is: <%s>", hash, request, response));
}

public static Response response430(HashMap<String, String> data) {
return Response.create(430, String.format("Stream hash could not be generated with the given properties: <%s>", data));
}

public static Response response440(String source) {
return Response.create(440, String.format("Requested data source <%s> does not exist in cache.", source));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public void run() {

// extract non-essential data
String[] data = Arrays.copyOfRange(input, 2, input.length);

String tag = input[0];
String sub_tag = input[1];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.HashSet;
import java.util.UUID;

import org.core.logger.Logger;

public class SocketManager {

private static final HashMap<Integer, ServerSocket> servers = new HashMap<Integer, ServerSocket>();
Expand Down Expand Up @@ -53,6 +55,8 @@ public synchronized static String accept(int port) {
out.write(key.getBytes());
out.write(10);

Logger.log("Successfully connected to external socket. Key <" + key + ">");

if(connections.containsKey(key))
connections.get(key).close();

Expand Down Expand Up @@ -100,6 +104,8 @@ public synchronized static boolean accept(int port, String required_key) {

if(!synced(key))
throw new Exception("Connection inflow and outflow not synchronized.");

Logger.log("Successfully connected to reserved socket. Key <" + key + ">");

return true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class Config {
stream_properties.put("rest.socket.address", "localhost");
stream_properties.put("rest.socket.port", "61100");
stream_properties.put("rest.socket.key", "rest-key-reserved");
stream_properties.put("output.socket.address", "localhost");
stream_properties.put("output.socket.address", "defi-de.idea.rpi.edu");
stream_properties.put("output.socket.port", "61200");
stream_properties.put("local.stream.type", "mongo_db");
stream_properties.put("mongodb.properties.uri", "mongodb://localhost:27017");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package org.stream.external.connected.amberdata;

import java.io.IOException;
import java.lang.reflect.Method;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.HashMap;

import org.core.logger.Logger;
import org.framework.router.ResponseFactory;
import org.json.JSONArray;
import org.json.JSONObject;
import org.properties.Config;

import okhttp3.OkHttpClient;
import okhttp3.Request;
Expand All @@ -23,44 +30,36 @@ public class AmberDataRequestHandler {

try {
// add all methods to the handler
requests.put("lending-latest", classobj.getMethod("requestLendingLatest", String.class, String.class));

requests.put("lending-latest", classobj.getMethod("requestLendingLatest", AmberDataRequestPacket.class));
requests.put("aave-protocol-dated", classobj.getMethod("requestAaveProtocolDated", AmberDataRequestPacket.class));
} catch (Exception e) {
e.printStackTrace();
ResponseFactory.responseNotHandled("Irregular method call for AmberDataRequestHandler.");
}
}

public static Object[] request(String key, String request) {
String data = request;
int splitIndex = data.indexOf(',');
String type = "";
if(splitIndex != -1) {
type = data.substring(0, splitIndex).trim();
data = data.substring(splitIndex + 1).trim();
} else {
type = data.trim();
data = "";
}

if(!requests.containsKey(type))
return new Object[] {false, null};
public static Object[] request(AmberDataRequestPacket packet) {
if(!requests.containsKey(packet.getData("request")))
return new Object[] {false, "Request does not exist."};

if(packet.getKey() == null)
return new Object[] {false, "<key> cannot be null."};

try {
return (Object[])requests.get(type).invoke(null, key, data);
return (Object[])requests.get(packet.getData("request")).invoke(null, packet);
} catch (Exception e) {
e.printStackTrace();
ResponseFactory.responseNotHandled("Irregular method call for AmberDataRequestHandler.");
return null;
}
}

public static Object[] requestLendingLatest(String key, String param) {
public static Object[] requestLendingLatest(AmberDataRequestPacket packet) {
Request request = new Request.Builder()
.url("https://web3api.io/api/v2/market/defi/lending/exchanges/aave/latest")
.get()
.addHeader("Accept", "application/json")
.addHeader("x-api-key", key)
.addHeader("x-api-key", packet.getData("key"))
.build();

try {
Expand All @@ -79,4 +78,194 @@ public static Object[] requestLendingLatest(String key, String param) {
return new Object[] {false, null};
}
}

private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.getProperty("app", "general.data.dateformat"));
public static Object[] requestAaveProtocolDated(AmberDataRequestPacket packet) {
LocalDate next = LocalDate.parse(packet.getData("date"), formatter);
next = next.plusDays(1);
String tmr = next.format(formatter);

String url = String.format("https://web3api.io/api/v2/defi/lending/aavev2/protocol?startDate=%s&endDate=%s",
packet.getData("date") + "T01:00:00",
tmr + "T01:00:00");

OkHttpClient client = new OkHttpClient();

Request request = new Request.Builder()
.url(url)
.get()
.addHeader("accept", "application/json")
.addHeader("x-api-key", "UAK7ed69235426c360be22bfc2bde1809b6")
.build();

okhttp3.Response response;
try {
response = client.newCall(request).execute();
JSONObject json = new JSONObject(response.body().string());

if(json.toString().equals("") || !json.has("description"))
return new Object[] {false, "JSON Object returned empty or invalid contents."};

if(!response.isSuccessful() || json.getInt("status") != 200
|| !json.getString("description").equals("Successful request"))
return new Object[]{false, json.getString("description")};

if(!json.has("payload") || !json.getJSONObject("payload").has("data"))
return new Object[] {false, "Malformed Aave packet"};

JSONArray arr = json.getJSONObject("payload").getJSONArray("data");
for(int i = 0; i < arr.length(); i++) {
JSONObject obj = arr.getJSONObject(i);
String action = obj.getString("action");
switch(action) {

case "UseReserveAsCollateral":
packet.getConnection().processRequest(
packet.getData("request"),
packet.getData("date"),
format("action", obj.getString("action"),
"timestamp", obj.get("timestamp").toString(),
"blockNumber", obj.getLong("blockNumber"),
"transactionHash", obj.getString("transactionHash"),
"logIndex", obj.getInt("logIndex"),
"assetId", obj.getString("assetId"),
"assetSymbol", obj.getString("assetSymbol"),
"marketId", obj.getString("marketId"),
"market", obj.getString("market"),
"reserveAsCollateralEnabled", obj.getBoolean("reserveAsCollateralEnabled"),
"user", obj.getString("user")));
break;

case "Deposit":
packet.getConnection().processRequest(
packet.getData("request"),
packet.getData("date"),
format("action", obj.getString("action"),
"timestamp", obj.get("timestamp").toString(),
"blockNumber", obj.getLong("blockNumber"),
"transactionHash", obj.getString("transactionHash"),
"logIndex", obj.getInt("logIndex"),
"assetId", obj.getString("assetId"),
"assetSymbol", obj.getString("assetSymbol"),
"marketId", obj.getString("marketId"),
"market", obj.getString("market"),
"user", obj.getString("user"),
"onBehalfOf", obj.getString("onBehalfOf")));
break;

case "Withdraw":
packet.getConnection().processRequest(
packet.getData("request"),
packet.getData("date"),
format("action", obj.getString("action"),
"timestamp", obj.get("timestamp").toString(),
"blockNumber", obj.getLong("blockNumber"),
"transactionHash", obj.getString("transactionHash"),
"logIndex", obj.getInt("logIndex"),
"assetId", obj.getString("assetId"),
"assetSymbol", obj.getString("assetSymbol"),
"marketId", obj.getString("marketId"),
"market", obj.getString("market"),
"amount", obj.get("amount").toString(),
"user", obj.getString("user"),
"to", obj.getString("to")));
break;

case "LiquidationCall":
packet.getConnection().processRequest(
packet.getData("request"),
packet.getData("date"),
format("action", obj.getString("action"),
"timestamp", obj.get("timestamp").toString(),
"blockNumber", obj.getLong("blockNumber"),
"transactionHash", obj.getString("transactionHash"),
"logIndex", obj.getInt("logIndex"),
"collateralAssetId", obj.getString("collateralAssetId"),
"collateralAssetSymbol", obj.getString("collateralAssetSymbol"),
"principalAssetId", obj.getString("principalAssetId"),
"principalAssetSymbol", obj.getString("principalAssetSymbol"),
"principalAmount", obj.get("principalAmount").toString(),
"marketId", obj.getString("marketId"),
"market", obj.getString("market"),
"liquidator", obj.getString("liquidator")));
break;

case "Repay":
packet.getConnection().processRequest(
packet.getData("request"),
packet.getData("date"),
format("action", obj.getString("action"),
"timestamp", obj.get("timestamp").toString(),
"blockNumber", obj.getLong("blockNumber"),
"transactionHash", obj.getString("transactionHash"),
"logIndex", obj.getInt("logIndex"),
"assetId", obj.getString("assetId"),
"assetSymbol", obj.getString("assetSymbol"),
"marketId", obj.getString("marketId"),
"market", obj.getString("market"),
"amount", obj.get("amount").toString(),
"user", obj.getString("user"),
"repayer", obj.getString("repayer")));
break;

case "Borrow":
packet.getConnection().processRequest(
packet.getData("request"),
packet.getData("date"),
format("action", obj.getString("action"),
"timestamp", obj.get("timestamp").toString(),
"blockNumber", obj.getLong("blockNumber"),
"transactionHash", obj.getString("transactionHash"),
"logIndex", obj.getInt("logIndex"),
"assetId", obj.getString("assetId"),
"assetSymbol", obj.getString("assetSymbol"),
"marketId", obj.getString("marketId"),
"market", obj.getString("market"),
"amount", obj.get("amount").toString(),
"borrowRate", obj.get("borrowRate").toString(),
"debt", obj.get("debt"),
"user", obj.getString("user"),
"onBehalfOf", obj.getString("onBehalfOf")));
break;

case "FlashLoan":
packet.getConnection().processRequest(
packet.getData("request"),
packet.getData("date"),
format("action", obj.getString("action"),
"timestamp", obj.get("timestamp").toString(),
"blockNumber", obj.getLong("blockNumber"),
"transactionHash", obj.getString("transactionHash"),
"logIndex", obj.getInt("logIndex"),
"assetId", obj.getString("assetId"),
"assetSymbol", obj.getString("assetSymbol"),
"marketId", obj.getString("marketId"),
"market", obj.getString("market"),
"amount", obj.get("amount").toString(),
"borrowRate", obj.get("totalFee").toString(),
"target", obj.get("target"),
"initiator", obj.getString("initiator")));

default:
Logger.warn("Unrecognized transaction type in <aave-protocol-dated>: " + action);
}
}

return new Object[] {true, ""};
} catch (IOException e) {
e.printStackTrace();
ResponseFactory.responseNotHandled("Unhandled exception cost.");
return new Object[] {false, null};
}
}

private static String format(Object... data) {
StringBuilder out = new StringBuilder();
for(int i = 0; i < data.length; i++) {
out.append(data[i].toString());
if(i != data.length - 1)
out.append(",");
}
return out.toString();
}
}
Loading

0 comments on commit fad1e0f

Please sign in to comment.