diff --git a/DeFi-Data-Engine/DeFi Data Engine/pom.xml b/DeFi-Data-Engine/DeFi Data Engine/pom.xml index 988695ce..972a27ff 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/pom.xml +++ b/DeFi-Data-Engine/DeFi Data Engine/pom.xml @@ -46,6 +46,14 @@ junit 4.13.2 + + + + com.datastax.oss + java-driver-core + 4.9.0 + + diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java index 0d8c1102..6c70167d 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java @@ -26,13 +26,11 @@ protected boolean init() { listener = new Thread() { public void run() { while(true) { - System.out.println("accepting..."); String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port")), producer); if(key == null) { System.err.println("SocketProducer: Could not create connection to socket port."); System.exit(1); } - System.out.println("accepted!"); manager.add(new SocketDestination(key, SocketManager.write(key))); } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java index ff5f48f6..062c7e84 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java @@ -205,6 +205,8 @@ private static final String readLine(DataInputStream in) throws IOException { StringBuilder out = new StringBuilder(); char c = 0; while((c = (char)in.read()) != 10) { + if(out.length() > 200_000) + break; out.append(c); } return out.toString(); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java index 2c273ca8..cbf67676 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java @@ -16,8 +16,8 @@ public class Config { app_properties.put("general.collection.delim", "="); app_properties.put("general.transfer.delim", "&&&"); app_properties.put("general.data.dateformat", "yyyy-MM-dd"); - app_properties.put("general.logging.packets", "true"); - app_properties.put("general.logging.responses", "true"); + app_properties.put("general.logging.packets", "false"); + app_properties.put("general.logging.responses", "false"); properties.put("app", app_properties); Properties stream_properties = new Properties(); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java index e9cc21d3..3f66083d 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamManager.java @@ -88,9 +88,16 @@ protected Object[] request(String type, HashMap data, String sta } public void processRequest(String collection, HashMap data) { - if(data == null || data.isEmpty()) + // invalid hashmap + if(data == null) return; + // if empty then submit blank push + if(data.isEmpty()) { + handler.send("LSH", "PUSH", "collection", collection, "data", "<<>>"); + return; + } + StringBuilder sb = new StringBuilder(); for(String key : data.keySet()) sb.append(key.replaceAll(",", ".") + "," + data.get(key).replaceAll(",", ".") + ","); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestREST.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestREST.java index 618d562f..91ad7edc 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestREST.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/requests/ExternalRequestREST.java @@ -118,6 +118,11 @@ else if(obj.has(path[i])) { return "Data array retrieval had fatal error, killing process."; } + // if data is empty push empty data point + if(data.length() == 0) { + manager.processRequest(getCollection(), new HashMap()); + } + // extract and print data for(int i = 0; i < data.length(); i++) { HashMap point = parse(data.getJSONObject(i)); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java index 09d2286c..eca8ccf9 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java @@ -2,14 +2,13 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map.Entry; import java.util.Set; import org.bson.Document; -import org.core.logger.Logger; +import org.bson.conversions.Bson; import org.properties.Config; import com.mongodb.client.MongoCollection; @@ -153,7 +152,6 @@ else if(translations.get(query[0]).containsKey("collection")) } public final static boolean containsCollection(MongoDatabase db, String[] query) { - Logger.log(Arrays.toString(query)); if(!query[0].equals("contains_collection")) return false; @@ -243,19 +241,29 @@ public final static Set getItem(MongoDatabase db, String... query) { } public final static boolean push(MongoDatabase db, String data, String collection_name) { + // retrieve created collection MongoCollection collection = db.getCollection(collection_name); String[] split_data = data.split(Config.getProperty("app", "general.data.delim")); // validate that every type has an id associated. - if(split_data.length % 2 != 0) + if(split_data.length % 2 != 0 && !data.equals("<<>>")) return false; Document document = new Document(); - for(int i = 0; i < split_data.length; i+=2) - document.append(split_data[i], split_data[i + 1]); - document.append("_timestamp", System.nanoTime()); - collection.insertOne(document); + // validate not empty + if(!data.equals("<<>>")) { + for(int i = 0; i < split_data.length; i+=2) + document.append(split_data[i], split_data[i + 1]); + document.append("_timestamp", System.nanoTime()); + collection.insertOne(document); + } else { + // create temporary object then remove + document.append("_empty", "holder"); + collection.insertOne(document); + Bson holder = Filters.eq("_empty", "holder"); + collection.deleteOne(holder); + } return true; } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/resources/requests/amberdata-sushiswap-protocol.properties b/DeFi-Data-Engine/DeFi Data Engine/src/main/resources/requests/amberdata-sushiswap-protocol.properties new file mode 100644 index 00000000..59f53b4c --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/resources/requests/amberdata-sushiswap-protocol.properties @@ -0,0 +1,28 @@ +request.name= amberdata-sushiswap-protocol + +url.base= https://web3api.io/api/v2/defi/dex/sushiswap/protocol + +url.properties= size,900 + +url.headers= accept,application/json,\ + x-api-key,. + +data.path= payload,\ + data + +recursion.type= rest + +recursion.tags= -l,900,\ + -t,url + +recursion.location= payload,metadata,next + +date.valid= true + +date.location= properties + +date.start= startDate + +date.end= endDate + +date.format= yyyy-MM-dd \ No newline at end of file diff --git a/R-Code-Samples/aave-protocol-dated-func-v1.Rmd b/R-Code-Samples/aave-protocol-dated-func-v1.Rmd deleted file mode 100644 index e6a51955..00000000 --- a/R-Code-Samples/aave-protocol-dated-func-v1.Rmd +++ /dev/null @@ -1,175 +0,0 @@ ---- -title: "DeFi Engine Use Example" -subtitle: "aave-protocol-dated function" -author: "Kacy Adams" -date: "12/15/2022" -output: - pdf_document: default - html_document: - toc: true - number_sections: true - df_print: paged ---- -# Start by loading the proper libraries: -```{r setup, include=FALSE} -# Set the default CRAN repository -local({r <- getOption("repos") - r["CRAN"] <- "http://cran.r-project.org" - options(repos=r) -}) - -# Set code chunk defaults -knitr::opts_chunk$set(echo = TRUE) - -# Load required packages; install if necessary -# CAUTION: DO NOT interrupt R as it installs packages!! -if (!require("ggplot2")) { - install.packages("ggplot2") - library(ggplot2) -} - -if (!require("httr")) { - install.packages("httr") - library(httr) -} -if (!require("jsonlite")) { - install.packages("jsonlite") - library(jsonlite) -} - -if (!require("lubridate")) { - install.packages("lubridate") - library(lubridate) -} -if(!require("dplyr")){ - install.packages("dplyr") - library(dplyr) -} -if(!require("stringr")){ - install.packages("stringr") - library(stringr) -} -if(!require("tidyr")){ - install.packages("tidyr") - library(tidyr) -} -if(!require("knitr")){ - install.packages("knitr") - library(knitr) -} - - -``` - - -We provide a function to request and parse data from our DeFi data engine living on the IDEA Cluster. This initializes a data stream from the Amber Data API, opens a socket, requests data, listens on the socket, and then parses the received data. The finished dataframe is as close as possible to the schema of the cold-storage data we currently use. -```{R} - -getJson <- function(startdate, enddate) { - - #Initialize data stream with Amber Data API key - #It is ok to do this multiple times, will always return the same key - out <- httr::GET(url = "http://defi-de.idea.rpi.edu:8080/defi/v1/rest/initialize?source=amber_data&auth_data=key,UAK7ed69235426c360be22bfc2bde1809b6") - out <- content(out, "parsed") - engine_key <- out$data - - - #Create socket and get socket_key which tells the engine where to put the data - socket <- socketConnection("defi-de.idea.rpi.edu", 61200, blocking=TRUE) - socket_key <- readLines(socket, 1) - - - #Build the request delimited by &&& - #Similar to a GET request in the way we handle parameters - reqString <- paste( - "SRC&&&RQST&&&destination&&&", socket_key, - "&&&key&&&", engine_key, "&&&", - "start_date&&&", startdate, "&&&", - "end_date&&&", enddate, "&&&", - "query&&&aave-protocol-dated&&&", - "request&&&aave-protocol-dated\n", sep="") - - #Write this request back to the socket to tell engine what we want - writeLines(reqString, socket) - - #Now the engine will begin feeding us data - #We grab the first to initialize the data var and then we continue listening\ - data <- readLines(socket, 1) - while (TRUE) { - temp <- readLines(socket, 1) - data <- paste(data, temp, "") - if (grepl("<<>>", temp, fixed=TRUE)) {break} - } - - - data <- str_replace(data, " <<>> ", "") #We begin parsing by removing the end character - data <- str_replace_all(data, " ", "") #We remove all spaces - data <- str_replace_all(data, "\\}\\{", "\\},\\{") #We add commas in between every object - data <- paste("[", data, "]", sep="") #and format the string as a json list - output <- fromJSON(data) #and then it is finally ready to be parsed as a json - - #We then need to match this with our current cold-storage data schema - - output <- output[,-(colnames(output) == "_id")] #We remove the Mongodb data engine ID - colnames(output)[colnames(output) == "action"] = "type" #Rename the 'action' column to 'type' - output$type <- as.factor(output$type) #and make this column a factor for a faster df - - #We calculate the date and time via the UNIX epoch timestamp provided in the data - output <- output %>% mutate(datetime = as_datetime(as.numeric(substr(timestamp, 1, nchar(timestamp)-3)))) - - #We rename to match the desired schema - colnames(output)[colnames(output) == "assetId"] = "reserveId" - colnames(output)[colnames(output) == "assetSymbol"] = "reserve" - - #We convert these from chars to numbers (doubles) for obvious reasons - output$amount <- as.double(output$amount) - output$borrowRate <- as.double(output$borrowRate) - output$principalAmount <- as.double(output$principalAmount) - - #We rename to match desired schema - colnames(output)[colnames(output) == "user"] = "userId" - - #We unite these columns as they are only present in specific actions - #and our old data refers to them purely as 'onBehalfOfId' - output <- output %>% unite(col = "onBehalfOfId", onBehalfOf,repayer,initiator, na.rm = TRUE, sep = "") - - #We rename to match desired schema - colnames(output)[colnames(output) == "collateralAssetId"] = "collateralReserveId" - colnames(output)[colnames(output) == "collateralAssetSymbol"] = "collateralReserve" - colnames(output)[colnames(output) == "principalAssetId"] = "principalReserveId" - colnames(output)[colnames(output) == "principalAssetSymbol"] = "principalReserve" - - #We remove column 15 as it is a mongoDB timestamp from the engine that is no longer needed - output <- output[,-15] - - return(output) -} - -``` - -With this function, we can now get our data. -```{R} - -#We make a sample call to the function which will return all transactions 2022 August 1st to August 3rd -df <- getJson("2022-08-01", "2022-08-04") -kable(head(df), "simple") - -``` - -Now that we have our dataframe, let's show that it is usable. We'll plot the daily count of transactions over the dataframe. -```{r} -library(dplyr) - -dailySummaries <- df %>% - mutate(day = floor_date(datetime, unit = "day")) %>% # Add a new column that rounds the date of each transaction down to the nearest week - group_by(day) %>% # Group the transactions together by the week they were performed. - summarise(transactionCount = n()) # Count the number of transactions in each group. -``` - -With these daily summaries computed, we can simply plot the week on the x-axis and the transaction count on the y-axis to visualize this. -```{r} -dailyTransactionsPlot <- ggplot(data = dailySummaries, aes(x = day, y = transactionCount)) + geom_line() -dailyTransactionsPlot -``` -We see that we received transactions for each of the three days requested. Remember that the engine parameter 'end_date' is exclusive and as such we receive zero transactions for August 4th. - diff --git a/R-Code-Samples/aave-protocol-dated-func-v2.Rmd b/R-Code-Samples/aave-protocol-dated-func-v2.Rmd deleted file mode 100644 index 52a3deae..00000000 --- a/R-Code-Samples/aave-protocol-dated-func-v2.Rmd +++ /dev/null @@ -1,85 +0,0 @@ ---- -title: "DFDE-Method" -author: "Conor Flynn" -date: "2022-12-15" -output: html_document ---- - -```{r setup, include=FALSE} -knitr::opts_chunk$set(echo = TRUE) -``` - -```{r} -library("httr") -library("jsonlite") -library("lubridate") -library(stringr) -library(tidyverse) - -getJson <- function(startdate, enddate) { - - httr::GET(url = "http://defi-de.idea.rpi.edu:8080/defi/v1/rest/initialize?source=amber_data&auth_data=key,UAK7ed69235426c360be22bfc2bde1809b6") - - engine_key <- "b6c810a7f35f4fa0d28258278325b4b5ab82ba79868ab33d01d5c878e13872ec129a91a3fbf702e59c2404f0fb4a53420a3ffb50130c35b4d06b32d81e56c1f4" - - socket <- socketConnection("defi-de.idea.rpi.edu", 61200, blocking=TRUE) - ss <- readLines(socket, 1) - ss - - reqString <- paste( - "SRC&&&RQST&&&destination&&&", ss, - "&&&key&&&", engine_key, "&&&", - "start_date&&&", startdate, "&&&", - "end_date&&&", enddate, "&&&", - "query&&&aave-protocol-dated&&&", - "request&&&aave-protocol-dated\n", sep="") - - writeLines(reqString, socket) - - data <- readLines(socket, 1) - - while (TRUE) { - temp <- readLines(socket, 1) - data <- paste(data, temp, "") - if (grepl("<<>>", temp, fixed=TRUE)) {break} - } - data - - data <- str_replace(data, " <<>> ", "") - data <- str_replace_all(data, " ", "") - data <- str_replace_all(data, "\\}\\{", "\\},\\{") - data <- paste("[", data, "]", sep="") - output <- fromJSON(data) - - output <- output[,-(colnames(output) == "_id")] - colnames(output)[colnames(output) == "action"] = "type" - output$type <- as.factor(output$type) - output <- output %>% mutate(datetime = as_datetime(as.numeric(substr(timestamp, 1, nchar(timestamp)-3)))) - - colnames(output)[colnames(output) == "assetId"] = "reserveId" - colnames(output)[colnames(output) == "assetSymbol"] = "reserve" - - output$amount <- as.double(output$amount) - output$borrowRate <- as.double(output$borrowRate) - - colnames(output)[colnames(output) == "user"] = "userId" - - output <- output %>% unite(col = "onBehalfOfId", onBehalfOf,repayer,initiator, na.rm = TRUE, sep = "") - - - colnames(output)[colnames(output) == "collateralAssetId"] = "collateralReserveId" - colnames(output)[colnames(output) == "collateralAssetSymbol"] = "collateralReserve" - colnames(output)[colnames(output) == "principalAssetId"] = "principalReserveId" - colnames(output)[colnames(output) == "principalAssetSymbol"] = "principalReserve" - - output$principalAmount <- as.double(output$principalAmount) - output <- output[,-15] - output - - return(output) -} - - -temp <- getJson("2022-08-01", "2022-08-02") -temp -``` \ No newline at end of file diff --git a/R-Code-Samples/aave-protocol-dated-func.html b/R-Code-Samples/aave-protocol-dated-func.html deleted file mode 100644 index ad15a684..00000000 --- a/R-Code-Samples/aave-protocol-dated-func.html +++ /dev/null @@ -1,1989 +0,0 @@ - - - - - - - - - - - - - - - -DeFi Engine Use Example - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- - - - - - - - -
-

1 Start by loading the -proper libraries:

-

We provide a function to request and parse data from our DeFi data -engine living on the IDEA Cluster. This initializes a data stream from -the Amber Data API, opens a socket, requests data, listens on the -socket, and then parses the received data. The finished dataframe is as -close as possible to the schema of the cold-storage data we currently -use.

-
getJson <- function(startdate, enddate) {
-  
-  #Initialize data stream with Amber Data API key
-  #It is ok to do this multiple times, will always return the same key
-  out <- httr::GET(url = "http://defi-de.idea.rpi.edu:8080/defi/v1/rest/initialize?source=amber_data&auth_data=key,UAK7ed69235426c360be22bfc2bde1809b6")
-  out <- content(out, "parsed")
-  engine_key <- out$data
-
-  
-  #Create socket and get socket_key which tells the engine where to put the data
-  socket <- socketConnection("defi-de.idea.rpi.edu", 61200, blocking=TRUE)
-  socket_key <- readLines(socket, 1)
-  
-  
-  #Build the request delimited by &&&
-  #Similar to a GET request in the way we handle parameters
-  reqString <- paste(
-                "SRC&&&RQST&&&destination&&&", socket_key, 
-                "&&&key&&&", engine_key, "&&&",
-                "start_date&&&", startdate, "&&&",
-                "end_date&&&", enddate, "&&&",
-                "query&&&aave-protocol-dated&&&",
-                "request&&&aave-protocol-dated\n", sep="")
-  
-  #Write this request back to the socket to tell engine what we want
-  writeLines(reqString, socket)
-  
-  #Now the engine will begin feeding us data
-  #We grab the first to initialize the data var and then we continue listening\
-  data <- readLines(socket, 1)
-  while (TRUE) {
-    temp <- readLines(socket, 1)
-    data <- paste(data, temp, "")
-    if (grepl("<<<end>>>", temp, fixed=TRUE)) {break}
-  }
-  
-  
-  data <- str_replace(data, "  <<<end>>> ", "") #We begin parsing by removing the end character
-  data <- str_replace_all(data, " ", "") #We remove all spaces
-  data <- str_replace_all(data, "\\}\\{", "\\},\\{") #We add commas in between every object
-  data <- paste("[", data, "]", sep="")              #and format the string as a json list
-  output <- fromJSON(data)                           #and then it is finally ready to be parsed as a json
-  
-  #We then need to match this with our current cold-storage data schema
-  
-  output <- output[,-(colnames(output) == "_id")] #We remove the Mongodb data engine ID
-  colnames(output)[colnames(output) == "action"] = "type" #Rename the 'action' column to 'type'
-  output$type <- as.factor(output$type)                   #and make this column a factor for a faster df
-  
-  #We calculate the date and time via the UNIX epoch timestamp provided in the data
-  output <- output %>% mutate(datetime = as_datetime(as.numeric(substr(timestamp, 1, nchar(timestamp)-3))))
-  
-  #We rename to match the desired schema
-  colnames(output)[colnames(output) == "assetId"] = "reserveId"
-  colnames(output)[colnames(output) == "assetSymbol"] = "reserve"
-  
-  #We convert these from chars to numbers (doubles) for obvious reasons
-  output$amount <- as.double(output$amount)
-  output$borrowRate <- as.double(output$borrowRate)
-  output$principalAmount <- as.double(output$principalAmount)
-  
-  #We rename to match desired schema
-  colnames(output)[colnames(output) == "user"] = "userId"
-  
-  #We unite these columns as they are only present in specific actions
-  #and our old data refers to them purely as 'onBehalfOfId'
-  output <- output %>% unite(col = "onBehalfOfId", onBehalfOf,repayer,initiator, na.rm = TRUE, sep = "")
-  
-  #We rename to match desired schema
-  colnames(output)[colnames(output) == "collateralAssetId"] = "collateralReserveId"
-  colnames(output)[colnames(output) == "collateralAssetSymbol"] = "collateralReserve"
-  colnames(output)[colnames(output) == "principalAssetId"] = "principalReserveId"
-  colnames(output)[colnames(output) == "principalAssetSymbol"] = "principalReserve"
-  
-  #We remove column 15 as it is a mongoDB timestamp from the engine that is no longer needed
-  output <- output[,-15]
-  
-  return(output)
-}
-

With this function, we can now get our data.

-
#We make a sample call to the function which will return all transactions 2022 August 1st to August 3rd
-df <- getJson("2022-08-01", "2022-08-04")
-kable(head(df), "simple")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
typetimestampblockNumbertransactionHashlogIndexreserveIdreservemarketIdmarketuserIdonBehalfOfId_timestampamounttodebttargetcollateralReserveIdcollateralReserveprincipalReserveIdprincipalReserveprincipalAmountliquidatordatetime
Deposit1659383078000152586270x77b1e547d8ba32c2e520fddeca196b9ebeb1727893f53a85b585ec2475bbe6962070xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2WETH0xb53c1a33016b2dc2ff3653530bff1848a515c8c5main0x01d1f55d94a53a9517c07f793f35320faa0d2dcf0x01d1f55d94a53a9517c07f793f35320faa0d2dcf4871391038174192NANANANANANANANANANA2022-08-01 19:44:38
Deposit1659377375000152581970x46c823e9928332703f354df6a4ed5878e6db7f911f9470c922cdcb10349f7bb81800xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2WETH0xb53c1a33016b2dc2ff3653530bff1848a515c8c5main0x57a580a806390777e5f60287c64cafc3059e457f0x57a580a806390777e5f60287c64cafc3059e457f4871391015109392NANANANANANANANANANA2022-08-01 18:09:35
Deposit1659370449000152576920xafcbe61c0c1b8fb41a27517e87db812ce5df8ed97ad13ff5bd6c9b36e4163a713670xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48USDC0xb53c1a33016b2dc2ff3653530bff1848a515c8c5main0x57a580a806390777e5f60287c64cafc3059e457f0x57a580a806390777e5f60287c64cafc3059e457f4871390980122054NANANANANANANANANANA2022-08-01 16:14:09
Deposit1659336581000152551040x16e049bab8829c234dd615dff72a3d9af28baf3eff02c724a568a04e4ddd92464070x4e3fbd56cd56c3e72c1403e103b45db9da5b9d2bCVX0xb53c1a33016b2dc2ff3653530bff1848a515c8c5main0x408fc3a462b356e7285c4ee55d5053762e64eec20x408fc3a462b356e7285c4ee55d5053762e64eec24871390773891750NANANANANANANANANANA2022-08-01 06:49:41
Deposit1659378023000152582490x97c9ab75d19e334ddbd2133d7e6d1bb67d68a1bc0d42259d6febf09ca56a8c3a900xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2WETH0xb53c1a33016b2dc2ff3653530bff1848a515c8c5main0x111ad8ccd98d1b8e28fe5b3bb861cdb093e14f860xcc9a0b7c43dc2a5f023bb9b738e45b0ef6b06e044871391018923771NANANANANANANANANANA2022-08-01 18:20:23
Withdraw1659378450000152582710x3a67a9bf3cac03cf3e101e44629b5aca4e38dab99c85fbc389fa04ab5f307d892880xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2WETH0xb53c1a33016b2dc2ff3653530bff1848a515c8c5main0x80aca0c645fedabaa20fd2bf0daf57885a309fe648713910222710632.8998730x80aca0c645fedabaa20fd2bf0daf57885a309fe6NANANANANANANANA2022-08-01 18:27:30
-

Now that we have our dataframe, let’s show that it is usable. We’ll -plot the daily count of transactions over the dataframe.

-
library(dplyr)
-
-dailySummaries <- df %>%
-  mutate(day = floor_date(datetime, unit = "day")) %>% # Add a new column that rounds the date of each transaction down to the nearest week
-  group_by(day) %>% # Group the transactions together by the week they were performed.
-  summarise(transactionCount = n()) # Count the number of transactions in each group.
-

With these daily summaries computed, we can simply plot the week on -the x-axis and the transaction count on the y-axis to visualize -this.

-
dailyTransactionsPlot <- ggplot(data = dailySummaries, aes(x = day, y = transactionCount)) + geom_line()
-dailyTransactionsPlot
-

-We see that we received transactions for each of the three days -requested. Remember that the engine parameter ‘end_date’ is exclusive -and as such we receive zero transactions for August 4th.

-
- - - - -
- - - - - - - - - - - - - - - diff --git a/R-Code-Samples/aave-protocol-dated-func.pdf b/R-Code-Samples/aave-protocol-dated-func.pdf deleted file mode 100644 index a2d429ed..00000000 Binary files a/R-Code-Samples/aave-protocol-dated-func.pdf and /dev/null differ diff --git a/R-Code-Samples/data-engine-r-functions.Rmd b/R-Code-Samples/data-engine-r-functions.Rmd new file mode 100644 index 00000000..81c9e42a --- /dev/null +++ b/R-Code-Samples/data-engine-r-functions.Rmd @@ -0,0 +1,134 @@ +--- +title: "DeFi Engine Use Example" +subtitle: "aave-protocol-dated function" +author: "Conor Flynn" +date: "03/27/2023" +output: + pdf_document: default + html_document: + toc: true + number_sections: true + df_print: paged +--- +# Start by loading the proper libraries: +```{r setup, include=FALSE} +# Set the default CRAN repository +local({r <- getOption("repos") + r["CRAN"] <- "http://cran.r-project.org" + options(repos=r) +}) + +# Set code chunk defaults +knitr::opts_chunk$set(echo = TRUE) + +# Load required packages; install if necessary +# CAUTION: DO NOT interrupt R as it installs packages!! +if (!require("ggplot2")) { + install.packages("ggplot2") + library(ggplot2) +} + +if (!require("httr")) { + install.packages("httr") + library(httr) +} +if (!require("jsonlite")) { + install.packages("jsonlite") + library(jsonlite) +} + +if (!require("lubridate")) { + install.packages("lubridate") + library(lubridate) +} +if(!require("dplyr")){ + install.packages("dplyr") + library(dplyr) +} +if(!require("stringr")){ + install.packages("stringr") + library(stringr) +} +if(!require("tidyr")){ + install.packages("tidyr") + library(tidyr) +} +if(!require("knitr")){ + install.packages("knitr") + library(knitr) +} + +``` + +We provide a function to request and parse data from our DeFi data engine living on the IDEA Cluster. This initializes a data stream from the Amber Data API, opens a socket, requests data, listens on the socket, and then parses the received data. The finished dataframe is as close as possible to the schema of the cold-storage data we currently use. +```{R} + +request <- function(protocol, properties = "", headers = "", startdate = "", enddate = "") { + #Create socket and get destination which tells the engine where to put the data + socket <- socketConnection("localhost", 61200, blocking=TRUE) + destination <- readLines(socket, 1) + + formatted_properties = "" + if(properties != "") + formatted_properties = paste("properties", "&&&", properties, "&&&") + + formatted_headers = "" + if(headers != "") + formatted_headers = paste("headers", "&&&", headers, "&&&") + + formatted_startdate = "" + formatted_enddate = "" + if(startdate != "" && enddate != "") { + formatted_startdate = paste("start_date", "&&&", startdate, "&&&") + formatted_enddate = paste("end_date", "&&&", enddate, "&&&") + } + + #Build the request delimited by &&& + #Similar to a GET request in the way we handle parameters + request.raw <- paste( + "SRC", "&&&", "RQST", "&&&", + "type", "&&&", protocol, "&&&", + formatted_properties, + formatted_headers, + formatted_startdate, + formatted_enddate, + "destination", "&&&", destination, "&&&", + "\n", sep="") + + # remove all spaces from request + request.data <- str_replace_all(request.raw, " ", "") + + #Write this request back to the socket to tell engine what we want + writeLines(request.data, socket) + + # define data frame + df = data.frame() + temp.df = data.frame() + + #Now the engine will begin feeding us data + #We grab the first to initialize the data var and then we continue listening\ + data <- readLines(socket, 1) + counter <- 0 + while (TRUE) { + temp <- readLines(socket, 1) + counter <- counter + 1 + if(counter %% 1000 == 0) + print(paste("Processed", counter, "lines")) + if (grepl("<<>>", temp, fixed=TRUE)) {break} + temp.df <- as.data.frame(fromJSON(temp)) + df <- rbind.fill(df, temp.df) + } + + return(df) +} + +``` + +With this function, we can now get our data. +```{R} + +#We make a sample call to the function which will return all transactions 2022 August 1st to August 3rd +df <- request("amberdata-aave-protocol", "", "x-api-key,UAK7ed69235426c360be22bfc2bde1809b6", "2022-08-02", "2022-08-03") +kable(head(df), "simple") + +``` \ No newline at end of file