Skip to content

Flynnc3 temp #16

Merged
merged 2 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 106 additions & 107 deletions R-Code-Samples/DataEnginePrimaryFunctions.Rmd
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
title: "DeFi Engine Use Example"
subtitle: "aave-protocol-dated function"
title: "Data Engine Primary Request Function"
subtitle: "request function definition"
author: "Conor Flynn"
date: "03/27/2023"
date: "04/18/2023"
output:
pdf_document: default
html_document:
Expand All @@ -12,124 +12,123 @@ output:
---
# Start by loading the proper libraries:
```{r setup, include=FALSE}
# Set the default CRAN repository
local({r <- getOption("repos")
r["CRAN"] <- "http://cran.r-project.org"
options(repos=r)
})
# Set code chunk defaults
knitr::opts_chunk$set(echo = TRUE)
library(knitr)
library(plyr)
library(dplyr)
library(httr)
library(jsonlite)
library(lubridate)
library(stringr)
# library(tidyr)
library(knitr)
library(tidyr)
```

# Load required packages; install if necessary
# CAUTION: DO NOT interrupt R as it installs packages!!
## Define request function:

```
# DESCRIPTION:
# This function serves as the primary request point for making internal requests to the defi data engine. Note that should a request be
# made outside the function, please ensure the necessary format depicted within it is used otherwise an internal error may occur.
# Note that you MUST be connected to the RPI network (such that a connection can be made to defi-de.idea.rpi.edu) otherwise this
# function will not work.

We provide a function to request and parse data from our DeFi data engine living on the IDEA Cluster. This initializes a data stream from the Amber Data API, opens a socket, requests data, listens on the socket, and then parses the received data. The finished dataframe is as close as possible to the schema of the cold-storage data we currently use.
```{R}
# INPUT:
# - protocol: The name of the protocol to receive data from.
# [optional] - properties: All properties used in the call (note typically REST API URL parameters).
# [optional] - headers: All headers used in the call (note typically REST API URL headers).
# [optional] - startdate: Starting date to retrieve data from. In format 'yyyy-MM-dd' i.e. 2023-04-01
# [optional] - enddate: Ending date to retrieve data from (non-inclusive). In format 'yyyy-MM-dd' i.e. 2023-04-01

# OUTPUT:
# list containing two elements:
# - $response: Contains all response information as listed below:
# - $response: Value denoting status of call TO the engine. Code 200 denotes connection was received by engine properly.
# - $code: Code returned by engine based on internal schema. Full list of codes can be found here
# (https://github.rpi.edu/DataINCITE/IDEA-DeFi-CRAFT/wiki/Response-Codes)
# - $message: Message response accompanying code should the response be irregular.
# - $data: Data returned by call should any be requested.
# - $df: Data frame containing all data parsed from the call.
```{r}
request <- function(protocol, properties = "", headers = "", startdate = "", enddate = "") {
#Create socket and get destination which tells the engine where to put the data
socket <- socketConnection("defi-de.idea.rpi.edu", 61200, blocking=TRUE)
#socket <- socketConnection("localhost", 61200, blocking=TRUE)
destination <- readLines(socket, 1)
formatted_properties = ""
if(properties != "")
formatted_properties = paste("properties", "&&&", properties, "&&&")
formatted_headers = ""
if(headers != "")
formatted_headers = paste("headers", "&&&", headers, "&&&")
formatted_startdate = ""
formatted_enddate = ""
if(startdate != "" && enddate != "") {
formatted_startdate = paste("start_date", "&&&", startdate, "&&&")
formatted_enddate = paste("end_date", "&&&", enddate, "&&&")
}
#Build the request delimited by &&&
#Similar to a GET request in the way we handle parameters
request.raw <- paste(
"SRC", "&&&", "RQST", "&&&",
"type", "&&&", protocol, "&&&",
formatted_properties,
formatted_headers,
formatted_startdate,
formatted_enddate,
"destination", "&&&", destination, "&&&",
"\n", sep="")
# remove all spaces from request
request.data <- str_replace_all(request.raw, " ", "")
#Write this request back to the socket to tell engine what we want
writeLines(request.data, socket)
# define data frame
df = data.frame()
temp.df = data.frame()
#Now the engine will begin feeding us data
#We grab the first to initialize the data var and then we continue listening\
counter <- 0
response <- ""
while (TRUE) {
temp <- readLines(socket, 1)
suppressWarnings({
#Create socket and get destination which tells the engine where to put the data
socket <- socketConnection("defi-de.idea.rpi.edu", 61200, blocking=TRUE)
destination <- readLines(socket, 1)
formatted_properties = ""
if(properties != "")
formatted_properties = paste("properties", "&&&", properties, "&&&")
# if line is heartbeat then acknowledge and continue
if (grepl("<<<heartbeat>>>", temp, fixed=TRUE))
{
print(paste("Heartbeat read"))
next
formatted_headers = ""
if(headers != "")
formatted_headers = paste("headers", "&&&", headers, "&&&")
formatted_startdate = ""
formatted_enddate = ""
if(startdate != "" && enddate != "") {
formatted_startdate = paste("start_date", "&&&", startdate, "&&&")
formatted_enddate = paste("end_date", "&&&", enddate, "&&&")
}
# if line is response then process and terminate
else if (grepl("<<<response>>>", temp, fixed=TRUE))
{
#Build the request delimited by &&&
#Similar to a GET request in the way we handle parameters
request.raw <- paste(
"SRC", "&&&", "RQST", "&&&",
"type", "&&&", protocol, "&&&",
formatted_properties,
formatted_headers,
formatted_startdate,
formatted_enddate,
"destination", "&&&", destination, "&&&",
"\n", sep="")
# remove all spaces from request
request.data <- str_replace_all(request.raw, " ", "")
#Write this request back to the socket to tell engine what we want
writeLines(request.data, socket)
# define data frame
df = data.frame()
temp.df = data.frame()
#Now the engine will begin feeding us data
#We grab the first to initialize the data var and then we continue listening\
counter <- 0
response <- ""
while (TRUE) {
temp <- readLines(socket, 1)
response <- fromJSON(temp)
break
if(temp == '')
{
print("Read empty string. Check engine logs or refresh configuration.")
next
}
# if line is heartbeat then acknowledge and continue
if (grepl("<<<heartbeat>>>", temp, fixed=TRUE))
{
print(paste("Heartbeat read for", protocol, sep=" "))
next
}
# if line is response then process and terminate
else if (grepl("<<<response>>>", temp, fixed=TRUE))
{
temp <- readLines(socket, 1)
response <- fromJSON(temp)
break
}
# increment processed line counter
counter <- counter + 1
if(counter %% 1000 == 0)
print(paste("Processed", counter, "lines for", protocol))
# add data point line to data frame
temp.df <- as.data.frame(fromJSON(temp))
df <- rbind.fill(df, temp.df)
}
# increment processed line counter
counter <- counter + 1
if(counter %% 1000 == 0)
print(paste("Processed", counter, "lines"))
# add data point line to data frame
temp.df <- as.data.frame(fromJSON(temp))
df <- rbind.fill(df, temp.df)
}
output <- list("response"=response, "df"=df)
return(output)
output <- list("response"=response, "df"=df)
close(socket)
return(output)
})
}
```

With this function, we can now get our data.
```{R}
#We make a sample call to the function which will return all transactions 2022 August 1st to August 3rd
# aave <- request("amberdata-aave-protocol", "", "x-api-key,UAK7ed69235426c360be22bfc2bde1809b6", "2022-08-01", "2022-08-03")
# aave.df <- aave$df
# aave.response <- aave$response
aave.liquidations <- request("amberdata-aave-protocol", "action,LiquidationCall", "x-api-key,UAK7ed69235426c360be22bfc2bde1809b6", "2022-07-01", "2023-09-01")
aave.liquidations.df <- aave.liquidations$df
aave.liquidations.response <- aave.liquidations$response
# sushiswap <- request("amberdata-sushiswap-protocol", "", "x-api-key,UAK7ed69235426c360be22bfc2bde1809b6", "2022-08-01", "2022-08-03")
# sushiswap.df <- sushiswap$df
# sushiswap.response <- sushiswap$response
```
Loading