From 7796f44cbd369836d0b5c63230d24aa3e73d5413 Mon Sep 17 00:00:00 2001 From: Conor Flynn Date: Mon, 19 Sep 2022 23:07:42 -0400 Subject: [PATCH] Add Logger and integrate LSH framework. --- .../Internal Manual/Packet Spreadsheet.xlsx | Bin 16601 -> 17283 bytes DeFi-Data-Engine/DeFi Data Engine/.classpath | 7 +- .../DeFi Data Engine/config/app.properties | 8 +- .../DeFi Data Engine/config/output.properties | 9 +- DeFi-Data-Engine/DeFi Data Engine/pom.xml | 17 --- .../src/main/java/org/core/core/Core.java | 8 +- .../src/main/java/org/core/engine/Engine.java | 16 ++- .../src/main/java/org/core/logger/Logger.java | 43 ++++++- .../java/org/framework/router/Packet.java | 15 ++- .../java/org/framework/router/Response.java | 15 ++- .../org/framework/router/ResponseFactory.java | 30 ++++- .../java/org/out/handler/OutputHandler.java | 4 +- .../org/out/handler/ProtocolDirectory.java | 1 + .../java/org/out/socket/SocketManager.java | 1 - .../src/main/java/org/properties/Config.java | 5 + .../connections/AmberDataConnection.java | 4 +- ...n.java => TemplateExternalConnection.java} | 4 +- .../handler/ExternalStreamHandler.java | 18 +-- .../connections/TemplateLocalConnection.java | 5 + .../org/stream/local/handler/DataState.java | 10 ++ .../local/handler/LocalStreamConnection.java | 22 ++++ .../local/handler/LocalStreamHandler.java | 43 +++++++ .../local/handler/LocalStreamManager.java | 117 ++++++++++++++++++ .../src/test/java/test/protocols/TestLSH.java | 5 + .../java/test/speed/TestRouterSendSpeed.java | 4 +- DeFi-Data-Engine/Rest Application/.classpath | 2 +- DeFi-Data-Engine/Rest Application/pom.xml | 46 +------ .../Testing Environment/.classpath | 2 +- 28 files changed, 356 insertions(+), 105 deletions(-) rename DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/{TemplateConnection.java => TemplateExternalConnection.java} (92%) create mode 100644 DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java create mode 100644 DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/DataState.java create mode 100644 DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java create mode 100644 DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java create mode 100644 DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java diff --git a/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx b/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx index 32e084ea235992413f9c3b0d6e26b8ad6dd28753..86dc1b834d4751ff595f76407a6430dfeca230c3 100644 GIT binary patch delta 9244 zcmZ8{Wl)^Kw(Z~q7$5|98{C7tySuwhun=57Ah^Q}9w4{`cSx|{4oT2~U;#pK4bICw z@5j0CbyszD)n5H?_wKdV>g8~x_c2Hf%jl?@G<@8!HFPS#!sC-Leux$H8D%T~TmvM~ z9RXTVKB^v{aVhGyTwk+J~d#zw7RPaJNOcI|NjTs4{>1^QL;G z9IZ|}2#}*NA5hgai4w+HR&=9?)>$AXy{Og)Yk2*@N;9R#5B3G4PkE(p#g4{?nO7~%*I|{Z=H-? zB6I(@Asr$WGB@b72gsC4+3Kd=rMeEl?0@0|&==`U*aqt5S2xp$yn05*MQlZwc`WW7 z_^C?gU4~*dLS9ZuzI*-aGvWjM2AWzInY*i{AD74GP$kcq^7^Z%U4A~JqTLJ@ zfQ6Msd{dvygA<2#hip}(P?fT9#IP0hqwyWv%)@*N@v0RP_PF6o_x2BhJZx%T`ahw$ z^V&iibCoIFo+o;=fw7qz4uWcXOogr?eQ94LM7sj&Ev zY|l)7va+f5^0C6v{GOGP*x@{G!y+g>hg9|6N!Q66gUy?YcL&f#+v+C{e>$+zosvbMW%FQd@!yi zX9xNU(GnE#hisrn66JeO)*s0VYO)>vva*&z1C$}+fjruAf0|Ge+aBTMsb)#oG+5?6 zn9z}9${8JRWDKi%Z*Cl)SG1kr4nXh-Gr@*oGj3C_%`Tdc{BqrSV=k(zKdG!p;iRgg zBzX}Y;0*n75D6?O=8NK^?fNOBwTPp2*cTMKe%=#&o+oa$rJM>^~BHTQ|-&BKALsQ{wC=}ZZg&OQ(Nx0)WyWQrM;L$H_inxu$#~i z^^eYwz+&UB3Raun#_?2;a*4ST6IW*Drmv3=2@Wd;K`x2@7!Y;+`q0A!L6J#n<_nuG z+E%>zK`VLsV~hg;ku7e~c_wMuBhI|&j!XTOMflL@c`lA_wD_p3DA~yCuMn&dg{b)y zYXcL@(4eh63=IUDCxNZJ;sPA0UO6pm@K~^vUHaFKZW|9anX62PfAc$8Vp@(`jYOjv z&fL9!PE(v1fxQ_;O3dh*g7RFjh8zCYZTtM;=|RVz6|!*F_AAMquEMF|((~$U;Jk9` z=K?61wy0I4HvfQcxTaFZQ|voci?A-Vq+>v<*{;{#AqTFSHXXM8Gkl4+)WhkGf)(-X3`0FW@n=7hhe6g(v&>Y33^H8jv9f8Y4Vl(6MXaLFh*3XGua*4%<)pvqD!YspF&u^4p5CJC?e>G^N!dK^UYZ;C2 zXT?ndeufb(wX#TH!@}9T-(v<7K+bM9bJ;`5*Og4VL@QLp69#_Kf!|8L?DjEzEKp=J zV%U%Zfk4mC|5@P12zLYz&SM5G&8#sA!gCWndo`w;@fqK~zbmFyF5d^Sp3)ShEuM>h z$~l(CY6;-t?I&JLTm=S6u209DJsvIYK0;mE@@_|Neaf{`hCPIOlccPudmjRNrn3$X zj-UwfCEp-Y?z2;7Q4Oi7%g5~okKwfun^g+}|H*zE0K?Dw`=jWmu4K&%A4N+NRA-3r z&c{zTTlzg65a(EtRPc2J8Z+wBqZl3hox;=O77*z+3vPPnXLq_04eB4U!zD)%kyL?u z5D6UV(|NmK63fxD|HV(-r$di>CqrUN4G|r6(n6j(CXzWckU7kv4c5VE`sxkhW=4_? z8=<7+iJ;_PE~Af-A?i=12Q{HKeSIlh83DRg*^GvI$A4o~3d^QjvC*^x)ZmZsy5B#B zfu(t(_dghEz0j&yb=)s9a*+kVhQ!v#Qe?535ap2)>HFf5;yy2!$q)9h1np9CX%)?L zsNN@)PbEqpBzOt+0Xbh`LZe9f&G;MgeDArlc`{51c`*}9>}cf>!~2)Bj4~}O%&5Pn3Een00dgnrZ;t70biF7@ov($NksMXWz={1pktPk`IXYZ^D(;+k3Z)6ut(us*y}o|DxI!%MjI5ydY!jIR6(1+A zHL>Dl@iA}Kc|fEw;gMqIlt^`RMisq+`}N$GXpw!y17QUdP6`gB#`#a*&Z2m!V`N&q zoDvljK7e*L;oYU*t@17xw|Qu3{m@4k`&uV|UoJ8|UJ2u1j^_72l3E|jr;$~Bw-W8i zeDQa}IcDP5+j+E(rbE&EUvp15@D|4m;NCa<4E-27}#5ktx)6+UHb|I@BR&e zs#RpVN8C`}AHw?079oXyH|j8qp82e0{|(_BYpI%;V@i{(^fCK6(S|CsSpM+bnLs@Y z`UK3n!yPJtb~z_K%%Vp)gwcE-cb%tiAH61}zQ@3-=VVCP zePDLvillMT2eN_)%IAkh++k9ak=%&+u9qT`*a3~l0$7KJV z8^kQAh&2>XleZbsj*nGR92jDrmoovB3Iv?V2n*SAg?&H6*pBcICC{sUb0yy;wzp!``~!7L)&jJ zctm-r7GDzbilG8gU+k@>vnXQ8NE#61-tzNyl36<$^hSZR<1dyi52bwJqZ6pD9T20U zd5_*1{{=|mjyRZ}U&mpSAG>b7uS>CtTs6c^*2d}%Y_=Z?fN)@BMR=1|6pZ_TG5zTw zkDe%5je0`V4x81KBmAW*{E@CLFBfXdm~99DEREDl3-1zsGRrJ}D% z9+oSAYk9m+&*boQ#3f??J%DU7eH!~=V1jH+XvT_$+EUPu+pk?nD+RC;56SIZ9=a&> zTQ3~2qIrDddzQj}axC?9eX_fG`?PsKQL+5+-fGg_nAv9-&VCwL`eD=9-?$VK(BiTp)R5ar+Xw5!xU2$vhNkn)yIXV9H9iaG3uxGTaqah2Av=w> z9Oe^dpq6i>!&hX@wMa&%Nljz|kHE>lCgbHQ25fScHQa*sFFjyIF7Uy@Dr8wHzUe$A6A_!&UCz>p54$UyFtJXQR(lC zDEy6AU2^9ExxdBf^DpER>fcpKr5&MFma`rmsQ$E>UFLM|1diz^2iOBrOc*ubL~(v5 zv2YcAY|=PUq}<9^vU-+EBHMbZQtGIxE_=@@vZtg`uhmLwaME{$a4}GznHk%a)Jvu) zrf+rJ(^)iv5*RD0$y9uIp{qG96%A>@kvla3f6HV?W6j> z2g~i)!tF+$0C_Iwodx0;iMswXFSfZh^fAt*^IXO@=zVBeWAhE(@FDP zcLqgsoj@`F2k^)9CXlue$-ZK@n_Ra-+y?;sIP8Mz+fZWI`Yv3OL(unQY`i$sig{7$ zW8fo-c*Re!`0~_FddIj00cYxZ<@@^!k6yx>)$D*zz)PZlzMeSFnCavlghIB5McXf_ ztI~;M&fS@B=Ls!`WjjVtXw8V2NvVh#X<+*Z=lx%5{DYr#w}aqSHuI2A?J7Z6Xr00+%yGfKLkj4d>vv@jhY zRSgHq*LcMI@}Uu*H|eU#j8WTJL1F89KVOW;F*~2S)*yWNCXZBZK`3h|9G)hz_r8m| zjD+R~U4SP>yH{pMxMWi$5|O8#es?8?@fYVvVEw!}NhRIj$m_fG=MdkPib86Wy zfNg!_6gxJlUq=m^`a?IDk?UE5AgfRUI>>p~kDKq)6wcmR-m<{E^7eEt%K>!DKpB?2 zo_K`6fQ=H}IxcJ{HHDB5JP+61#af8j!wNCdpYq^snI;ZAaOJ&e5ZaWGeo#>Qi?&@e zDch;qOB&l4$oi#Z-xcJ$^$S0f1PSv1aNS&XCBiN8+YLKlTS}KOTrA-jvb(~h@iRzy zYhmc8e2vN8K^?j%;giVs6xu8(oXZB2|Gs8g8gaNdO)H{Oa4RfGCo!hU*cXuB`2-e? z5qq;mFRq`s6x$IcSu!D7`rmsqGWW-YLtuNGOD2? zVX9?5EB6_1TpB&g;C3tysp9MXQU3yYel2uj@9inyEo|Qy_E3fbw=}RuQdeASM1cf3 z3JA1_0RoYNVD3`nFh)9jz>^vu-)2Z}&CKA+Q!rVMV@tSoO3mOoh&jB0-IJvTHIZ)f zBGHL2K%@$bDzZ^`k*qv4EUmJ0+M#>uH08vFduUYDaIfZD>*%S8@dMxE1N7YwM<+Ep zO5~z8GOm1=;Yedktf88E3Y~!rS&AXbU;#5*3I^k!k17U?I1xxXfcnA>ec}{DG#A#` zknTUW(fXNIQGZhAsSTcVBrrs?N%;D6$hK!KIjvw*`tfQJDXV1Y+QLeM2AWTy^Q5pS znSa!2@||1P`ibv<*v4jX>KJ72{#?mkp0rox@0HZxY&NN9svF9B<*6O(+%Lx{593Q2 z*S7;J+kD&IldpjXCjqM{?}GB5wq#M3Ukawa=(K2d^CT71X_U;{_S_^?z|3W15Y?$K z6{4NIzr^$8Xm;J+vJTDln3eca3Yng)VAnTjfFg)8Hjb=m(Kl&{i>pELHpPI|WCz-q zMsV-(m5)%L6?j~s|2pHo{D0U*CT>9{>iC;I+Yg0VUx9a|%1cXlFvFcLWMs*-^dlG= z%8;go`1oo@L;M4ehBIH-{hXq4duDCi%XPDg`T7G3ql02;JyA5<`pBnL^b+SAVdSab z_^}e{;)PfrY%8P>iSE4FQoUoO57T~Bc%Dw?;MAcH{v^Hg%*N5Xdxk+Q@}7|X+ui}` zo466Q_#;+BXQ2KfXRlGiKks>L4`2eGh92t%ni6l^LQz7W?H-P2HxD0Ym-@-qtlU+s zc&$}BR=75*^*K7_94!I@`6Y$d!9=gbsf_GD@74hYQ!+4>|=}wHqe@JvQ7bAcH&^`Vh;Qp7=Y5&+X_+Pu~@5=4v9pLKV z#pUnjT54#Pk|#_UYE|{ze=arv>60eZ6&Ry6_+!XERrA9LLw&fKdsMjD-spZk$Q9GZ zv05+&g0_dQ6~7cG>(+~O|ONJ3Ypi(+2)g%x=j8gIiqMz4zi#K#Hm#j%o1 z>Rkj+97beWWgXH>c|fAgvP@nL2l0A(a?Hktv3h6Zy?*YYNXJSHZUu6hvf$nNTBXTljDxN*YbWBla*B4GfQ#Gb? zsM{4XlqTU5*OknqZ#|-VuIz$+-o|=L1`wHHNHl%}~xWxjsFagtG2l)}9%m9W6BNr6(COGSNzs9adl9+s}H zHxgjLfV^mrqmV8~>obI31R;O}t6Zn0koUiI;9rAL*{t6z!3(qc`a393qmzaTOTbI7 zjm^k=aTD$wzdk2@k8R{u8fy#U{#dlw?WN0p znLNkP%%Jy|#3bA)FI=5n$i45V^CSR{PLs}aI<9~p=&8!btws`O#AMT=wmoH`%k+93 zCiIgpoF;l8#35fRGy27NjyTD1C*`hgCFar-E#~uw7tiP1jZf&Xk0^v#qvoGu{=5Kz zzA(U$xG4ZrHxCLSf~U~FM~tsD@s!phbu%jURok*@aN}6J&*|oh}N^3HLptk zV8WSv)uqcfuU0n6CM78$5)kqw-NchNy@_iQ@xHn6t%rp@FkgLQsQG;VXJgCUiDYlr zmW^a@WOwT!`^j4k78pus@?>F{^I=al=00YThBR*9pS&)F(&C>Z(EW7z7qe4nb7g)i zX2}%jaC<;RT8QN&OGVv(yd0nW(WRd0F^JcBi*{%A(7@9hNnwyC=|nM@&BH6{Wm$J8 zQ64cd26+W`QRhdyhn7^I$^nzgn(N?>!MEx2lcQ0ujPtN8NnZujs9O+i-X3gQcIdNq zb79vyo;tT$CFK;14cg;{BEE2VK@ygZ+z|lIVYtO3VSN?9_+t1i_;CR#j=4s8liYp0rOntBcHejS^?LAkyN?!9Jnd6_Y-yxIo$e8IFW zd7*HuH4uvc!!@g2-BRmhv1s5grv*oi7X&wx(-hyC*+-q<8{aqu9Q#EUG7%9Hd=bqfqMkR8SdN5DW>vf8tv^i5A|{C(VkHNc zp`}Sw^9~k}Kj5-Y$G=;V&FzqmTAOR1Z(YzY5 z*tR4)4G)Hy*Gis)B>Rz(^`z*1(rel*%ZXpu=aJF%`1jz0<3Fiqq4hJDvNAx66v;It zEY;U>469LISFuR}w^hxn%D{gHB8{X)slVN#9BWFh`@#=T3XlF+f~;A5Zab6uO1Q6+ z@=~~abY+Recupyt$VeZ^COCtb=Q%PtB`6;;u``zW>cACOQC^g9jveTh)`nP>`Q8sM zPIA-=acK`=zeI{Td1n%AGX`j`O^6E741JoCc?${__>+d;vbp6EPTa#{F-HY9q@wn1*(QKIN!w`FG%;?m?XiF6V5s_R6P&6sa6u zMFmGt+7m9jcPfV4ypqLFTczsNqS1oCnYmfI+!naDcl!dw^|b(;@OGNt4Ts6=&kc=4 zoZRNMDF(y5Fkqb<6R%Zh*x3H-m18j4>z=#L?^HiO!{^*8Hs82WdyZHM6o1gxf~0Bq z_xxRTE#B2W^t^~e8;w{I$V(~?TYoQxHxq?d^XSCUaVB;U;Yi!2ERbYbx&N1|=;+W> zC0HcS;wvxE6(Rh+Tf^yglB0<=B7EKSS#IsfJ;3)PhL>)GEk)6Z1Bwxok3o@l zagFdVN<89k1I2>_bGkKSvfy{G7gBa;bG1M8JI4GimzI z&(@c31dO2ZkxO;{tOcCR}rV{PL(O@(Q3sZANbmsHPz zcXpr?6d*@ z%*GfoLNtsT(5|fxIZ4Ah6V*0QhC0~F&~ra?(GAC_7?G;`Oy;=eDwN4+B(wzPv8=|^ z@e-R*2<31>i^;0buakQ#THNDWU_E)g=|jckrUzz`si&B75X5y!W@nOF9S@k51VCFQ zy)&GB*Q+9~ZvV$&|7Z!B{NF=8G4sND zmjXCl`t7GDLf+T%`b{ZPiHmodscdvwRKag_dc)PX@?5`_+ud>MNkbjIPy%x*4>*## z70f*`vM+-cId(zN7uI|J#l@o0ed`_uGffInup&0S#>FHfLZ15_N5P>qnNT$ zEH%qHj<&(8l_ZfSm{6AFJO(F0a}!?

(FDdCxaeewN`Q-y_%yDI9=p}?iO38x>{fTPAm-c>Xg{}uS{Z~B29gAa_ z%eii)j{0ZRvrT0#uLC1ld&rHpcw>P6QoHgQ#T*WIQ^F+;PbeSQVtI|HTO&JJWsg1F zF!^js8Y`#-hIQlkQLpZaFTK4qZPZ15CHdnlel~J)$lJ;@T zKfuHMVMF;L>=;zxDL#Nqx${G36lbD z9v@|8^ZOB6uD8mY*KA3-ieUyGE{7I7ej7Tyef7{25V)@hW?w#zM`Gif#(XWE&o`v1 z;t=5z0_Phj3H&^eYubKx@YAr5COK=tMSfZDGhHD6Y2OI_Bfis)TT>v%Vx{snKX^L` z7#D`XvP+v&uCEfF1RbsLD{!6(NNk!|U?Mp`Vf$;b{^*2cJ$K|oc>nRu6Gc5x{_mHW zXNi&jZJ^Ieic|jA9{L}h=6?@=G8j7{Rb5GF zkGXy{6NJB)Dx@Y-y|DOd&4m;`()7aqh;pV} zWvx#~H&QQEve5-hWt~oe3L21*$4}ZfODks`D>lSyr9?{%P$~_$by(RKJY`C#Y8$se zAWv>!eCh=XD$!-#EN|+1DJv2-s3^Jl)B-=hu36^%frp4?++3b2!5P1(kK7-EgPOTU zP-~7@%hZNY-t9Tf!(a@8#bVZ0yO{MWmXCj|ar({wHGche%r=fJCci5I?fz5L1?Y zWfp-u3o{#w`Bk*KvRg~@+2eqz(fqYHzO_gVB)=TWpW|zU>jWoRPfJ1PItNF_YWT-d zcSWVHB`1}lDK>`GB@g^F8qswSZI0|$X;Pr2zwYwNFc^MeGk zEcR_eUMF3?YRKShPsTc$%S+sbG9KcE?0$%apW256@^c$7hhs<1LP&p$ z_q}BG+YFkeXkJgXqKIm>^=D3qr172NQMk2uK>Cr7=8qmCVmh>jmqRY#w-mHaf3sKW zli!3BbR7Bs1j3Jh9nW%(Vv>uQPa~kZmd22zollruv;`hakCZWcN6hg~oL$1BG>UZO z0NKJ6?dcIs@#$;PQ>y#6Vr5+Xuq!65H zH1bFY{+IEbc*gIJHncZD!>+AjJ{o<-lD296tm)Dj%L*3s9tfm4T?-IVCf{<8sUCCT zwIuGWyM>P64gWlLM;r*WiQK`c#=R@s_ZSpI!REE{5||p`o%%i^UeCuHW2!@kaxxg6 zrb^leE#=DK@zx_bNeBbDdeDx*I8?3kuXKh9)4ToDk&St!;$AoRQ0BF56>IpJokoOQ z9Z?nO+gTTbOPj#Ona%DTi>r0u{o2QQV&2BnmFn9|Y#4atPXt2swmM?yI( zHy|X0hl1L|gG3V(L#}Y~0lLVL8Y^7IyZkuv3Vb)xFmM7hBfb1u+xGw?LVH{SPa=h_ zvy2#T)^z1!YD9L8!}i=snp`;dEX#!-3XfMZ_}g*dfu|Ig=AhX0uBn2Z0nD%*Ucn_8 zeqaC9gjy#X2BMm5_`VY<%!w_f(ccLRT~HM-pNqfS5h*8c0@VeC5{Bf4`}r|2 zv)ae_*QYQX6Lvv2OBxur^6Hz|shcJZ6fjwTeg;Pz=L&i-(8}YnEqx_vKd3xYkFyt7 zMyMb<+h*DtIE#zJ`=qndh#b3+xBGOvXEoN?7<2jrAmQeXc8W)m+=qs)X{yDofG?O? z+~}>`{USz;L)%IffX9XuFYUc68szS4H}YJN2>KH}e2FBy3Y z6+%j^h)iE1O^#L5Il?3DhS^$g@5OJIeKCHqnBdsS43aXJ@0+L?0l~E*hEnz$+t3;l zPPAp4#O}{gn#AAZ4EQ1%E8}>D=;&)Mr@utJ5t^*Y02I^t1GD&Sk%}&qYXtPQWBV6aqS!Nn+nlst+DE0@nK17$p`hO0p#C+* zfhX-3*|DDS12K_AwCe2w*>OK&diekb4Aa|La`beyFHh+#m<2kTn0E-Jo{N9VG&ox> zZtR=dQ_F#Zm(#1QyTdDUrK3OhN7o{k1u~aMe<&^s{_b|LC0=;Cb%>OeAkLNEwK$MI zkLElS#B{9PE?G1*ow)qELL;}YTwCozA9u_cOU~b#I^Lc}m{ zMmXT1!T_4r`iZ(5-eo0sfQRc{HSmpW!3d)f11d+FM{y#Sx-&1Go0sw8xap*==bFBp zTJ10Z4?PUe8cWAfb@;=Xn7aX`zUJ~hR2U+6@>o6RN-Jlj9IPFg7BMC&9~6rkH?a=+ z6lpNVmcLIb8UjjZo@|vg$?t_-f|%GnLN`FD{Gln;a--9^-g9eXsB?V+>qz$3yTMfG zbqRL7IYtyf-*)C_~s>o>bNiHZtp)IQhLGpAB1Q>x{ z0b=57v`*S_1Md!dISX$NUFKofGd$& zn7jLdx6H~Qk6Xh56R2=j(6yvxYo^r0O`w6zZnCiboHXx2l+O|$%}jl(&D$OX)^fny zbBVWD3CQmg=Kp?R0d&oBWQx}^_;6U zr6b?U6T=)E6?Z2?g=tlX)6KZk*`^ufG>J#%zj5n^zilkks!VLC;8$0VUZg7TD)9Ci z@t)vaD-7j$Y~@f6MsT!Lo}-1}81QQ;VCVYqC;90PH;-Kq5=gxi^dL|~Zx4GO>E9nI z_pY%p84cal0dF>u`gw~QkhZH&Z>M(?glgkI>mAW5+0f*-;*5K+(c@gG7{i@2$Ws-L z_gBDE_~#V=m+?xqCnGm`@m}JwDJuQa*wFxH0!HL7@n#BFpYG6AXHI%fzw|gRzIs`c zep_=Whb?-noGKk}n!6RQ--1*M0Uy&=XM-AgEFK;&=X#C`LXw42-}pTRTRKD*`?fcJ zsFO!h057Iq0u-OuRz;NZDUPkGwaJB>?5*Y;o!S#f`n9CqlE}a{`luNX|9NO4d6TU3x_nn-6oz>S-s zAcq8PW$ZO1R;;zPdJ8w=GzV_v^wKv?#JcxTl_hie9q=h;O)(1-e-Yd2gjr@H4YOb% zosv32D(+OGZpj2qeWJ;#Rzkv zzKn$%9z+3?3I<5xn7J7QKJA6WWu?&~ooFF1dztm_4A!W^Fn~_@+u$4%@+cW8sqAob zGvC9<`E1-64UoFC)M}GA!kz$x#PAY=%(m7+IUU92P%QDo@e>rGTE9mfmm_AURe4;M zXGQip$FSLv=~X;W2S;<4jXBu`VwI^UPbXz*QxiO7rYQKyIxlaa6ibT*Xey><_cjg+ zaar0Ib6@@I6UDUhxo6{C`$G)4dMqn~>|AM^5hHpf%#&>sn-qz&U@`$DCwOonGfa^D z&d7lWqcuxlqc+JCpbN*QrJ_ZoP!J&fM za#d`N4W@U8-s9xg)Kf@Alk6T6_=X3WYeaNj36ig&@mJ|?+O=o#ij2@!ipHRVStI(z zqe^U9+70I!X#9?BN`P1cS;0C%tZWch7SF!i*V0Di?J?3FcS|lKJ(7^_-oLJr2oJ-f zF2IY}W{o;||EaSp`NX!T^POX*pGMv7b>sYmk9^!Pq(AlqugcR7a*pS21O3V6Y+ca} zCVH)K@?~L(4gT+CtM5Y8-2A<^9%YHhn#LnO>(y?HUgr*+0<61HGncq^$Avql7Up%Q z=+<+otd5f#jnZ$cutqC_G;F%p;V#I0Mi1|(RdAH~C9bKdG_>YY5@9`-A4wT(eA1U< zI4y-tADV+b(Q^)3N&+SG%Q#^2-sLkiSzwpUs5pGLZLb$W{DY-bLv3u4#AU4~RUgRf zUg=@6fADYx0U8d1WNKHFJq-- zuxTFH^rISlGBc@bYGFd>WbTF_KS2s73wlv862ucKOE>uHM=8u2fD z3Z`FTIBy-i$T4H9HX-S_uTZYRbTDP~&3k^me)^HvPm+;sapVD|5Ypfo;P=V4%ywRl ziKae|35-0eQxbdrxBx*|&W>>2FNY|BQ{YpFBE|_YMeeIv#9Z60;mpo|&uer)cSiRN zKMQn4Uh$@ruRUMt2HbQt4TI-JNMs{KrJ~u#HRz)9(9WRAx7dVV;s0H0-CD@`o?)P% za1sAa8)B*O;rKC;upuQ>Bmf>KLeMzZtY@9#g}Mk?X61A=O`LtO2%2{);A!j^%R5RjqiTTKat1fyojieWac|I^RgB z!K*KFAQ~dHKq(Hbt+41688kh-*s#1^4_XTs@$kYVzh`Skk|oPfQJ|aA#QCXzOPfc` z%7}KQNs>cN$ZKVGu~G^CZzb*J7a(qgx%YqJTsd=LW*myBZ?+W~+>X8Sdic&s7rvJJ z^hoi|Oe&mz@D^&VaQ!>u}5AeOXm21!93FpDrUEr(ZCG&VKLdT!avwJQmx`mYr2|d5_282f>Gj^8<2KFKB2}KU*wRRS@x4mBIek(+OuZnx^ElfwqoQ?fVk(CWiTT`?c$fu;`_!t|uHx zLu=FA2RBBRpalQC7fKsYIWy&YWSrF+s~My7;il?txxeZ@A|R)@^8G?HA#09F9*TU76O0gfJiaGOMiGJNc*n$J9S$mg6c zE)UMucUT_Zw`IhI4S*qlBXn%bMcc;5w-nb%`EPS|`EfeJ{h~#(IYIese<-Rhl>@G zVV&lQGaKeSdH;Z^$yn0w0&$eFmTG>B>pKCo|DnROm30GMZbe3N?n~Nfk}q2sZ>%;r1@}O($85SqNiCWm zjf;v=(^U#Q@+eFhn^Lj&q3~Y`ROUltn=4DWQQNDh+TuhGRIy~-#5;6VlW*KfNAFi3 zcS(mVb39fS1v_b*t=)dFPO-|0n|VnBFxye@r5Ta8G^;Ro{bZ3jjssdQH%{{Z4Br;c zni^*%iOu90ynqfwzwC&7+}f{OxdyTcnL2R{t5;^U&HPy6FgW*EL*;qhkBio2deU?h zXbBT1Nd^5XZnx<_Zm$xDY;m9qTWv~=mTE*o5!I70WEg(}Rfg65Q))uxrMr z6&hi6!7*a+bD`m#dXAyV;O&B`B&X&3!?;&V&GZ#O3PiJW4XHb{pr3X4JPZv++l7J=s}l7aiaEvi0ac zC{se{)_m~4*P_{)Tq2ANpvQcedSFU+(2hRx$9dcQzzHqP6A&0_`az6iO_s)hx2KFd zYdXX!F_vx76Xi>xA*(Rz6F0zeVg1Twd9NiQ2AQjPme#>k-CTMOQ&$1vek-8`DLC9t zIaj%b{F<)n#TW8$wK);_C4Sb&H+uK9>_Cqywuud$jDD+tsfR%=K=O67gWUP9ZzC(z zPiT2Zy!xBDN^Xl}G~Lt6Utn%XdiwTtxo6D|^=S=(flGe2?PtrPVsaYijcZN3YZZ~h z&JuwcTN9xj!E4+nLzObYZU$M-6hyarc|WFzP055~@+l@g+s~q|Izcp{{T^#EpXU-3 z=o*_*wwQwQ@bREW0AycXW^tRh?GsUm9Wg!-KhBi2jRXbN{ugEA{crr38QL2gsXN-6 z+nD}4j7qhZY)C%hcmV<=2!zeL7Dl0Vh5OubZ)oU!$yY4oJ;a;DVQAvH(X4yp zF$$juQ80Non`Dl1G5sn2HRv@dB-NY9HY|o_VRSHn3;YX5i-^+=C{+aQzCIk;jO#PS zf5mUo;4-RRJnNSlH!_MHa9?U=P@}z?II&qmI~3P@NiDD9wkMFwA&`E!Uf!(*A0s`# zL`B0)V~*a~WnC+JDb(6&9;;@cm5o+2KsvrWAD@1)fW4V^?6{LB!NwmST()NJAyDl1 zN(d9m2=u8XVIYeKhK8ZaJ&`lwus`sMv5=4e1G2Eo1ajqvUdIpg=GtfHqDuQ8>pe+Y zS_$#m68OZ|EXtnAVExNqX{_L2wFNzT>1;f1=V@*`^xubf(v>SQvE01NJ4s zY5j{6s(N3Hb60V7>pH$#R>q&z2UWPmnd6LStMx4oFF_`OHZ|nMLVL*I7gUd#hQr;+ zuG908=)8>t(!~B`UF*@0ly`qJ@!XT^UI;3d$PC_RIm&S{YurFy%EMA9w+~Aq3>9p= z^8L%#{I43lh}wU+yQl{;^!#8dMpIKqY0e$$6e8ets+Ema31C;C4ISR6(fEd(5?y@B z;~}^u!c2h#AnG`*{-Wh!I&m>z5P0(A%8dIuv=7an6~mnCozvX1sM)%9#MLoDy%-hX zg-n&J2aeTcfgXm^F(}kaQ7C)%_#lqLZ>th9O+{s1#n!hb{LG1zduKFin>|1+d}!zE z@NwVH&8N;z&p?_2UPD*)#ZbAI5U@^3ierUYREkvqe4S$mKP~$q_rDGuN%ZPGwacH} zhE#-TEn4Yj2JTUbVTk5(0tN;Jr^fN`u=DZ0ZwE0+XO~H+#_Tb11|x9Fgz6i8AWS`; zF&*^1@uBP;CYy0V;NKDV>CSA_W4`s-W6;TCl8ls`)A@Cg(igzMtyk+@7j3fC06l(W zp-hhis9@zpoY_U&8VIt_Vx><fji z2BLjLpr6lTGVj?Ci~r;q3Q{Bs*o^zRTeP@$SUokiHFTZw3kOa|B~ObM>Bi^o3dGN< z%1}nvv=dIGlc%4rr=^QMBjbu%Ekk8wQ)KpLhg(N2yuDuOzx7Yv1ZSPt<$d^N?@ykW9YSl&cdXcAMXM8XOuOyMbsu%o_2t_fD|>ZkEn_pQ196EO&8(+6 zlc%uOiwNu}m%sjU*k(HV=XoRrwZd;6IcFeP z%}*e$?3C;&eoiuPmhB+IfvQix%kDrGV6Hov*>N`gG#;0NphmCBUBfBXSB^shpcegM zWPFBQm5b)Cvnz5JWqg?zE4=H*INgurPX@`&E>mp0#tg`6#6p~Q*!P`MH?&=hjlayP zJOik>hquj{)f_vFFy#z`SZtz980!`4jnU+6`xKV?4*i63_Bj;618t;h%xr=KkpWyS zC!FM_owd;Hk`^$$%M=NX6en&5aIrwwu<^`qJ}|&qSg`v@gKF>QeL~xQx-I(^v-zluiN$I0{KPDw&sgQdyZvhv3_~p>c`tzI+$FdYTvj)}b1e$pAnP?Zvp* zNO2{fs;Y3cl4BYJ+K$@g?aQjM-`>#mZbCeTdMPPAa+p>Df89O?m%3sW-jh=&fb;8R zi3?!^1J@=?V)LT0n+V#)p-=Y`TP2tRZ5St7Ln?vh^70IdLrA~5yR_K&+Q^U~9)X8- z;;>AAqaQ6qZfF>2G_)3%R9=g3aCxRrDAR#CF4@SW3kWWcuqFn7vKncpG_F{fO|81dNPV1f65|$hopL6MU+FD?+q(7YIJCPuxF%T zc3df7B}({{!}gvpx(=9&&v~yXI4zU%vH26>#3nNuo_jtX36hju8FpSKZkyBAZs-SZ zpyQt8U&wkvc~Pj;;S+m$%Z z%Hx^RQkk$%a)s$2M=bnudiNnD_~|L#7PDdvg~CAG})a$A_rTw(}U~OD;3& zoQspLcxXoTuh=9Wn|Ossi}szB6eN1jG-|Ar^Jj7rrrM*9TIzhLo#NHzEmQlLKGQ - - - - - - + diff --git a/DeFi-Data-Engine/DeFi Data Engine/config/app.properties b/DeFi-Data-Engine/DeFi Data Engine/config/app.properties index 09935a87..88474e56 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/config/app.properties +++ b/DeFi-Data-Engine/DeFi Data Engine/config/app.properties @@ -1,4 +1,10 @@ # === GENERAL PROPERTIES === # delimiter used for internal processing -general.internal.delim=,., \ No newline at end of file +general.internal.delim=,., + +# enable all packet logging +general.logging.packets=true + +# enable all response logging +general.logging.responses=true \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/config/output.properties b/DeFi-Data-Engine/DeFi Data Engine/config/output.properties index 9aa49d3b..8c2b5ca9 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/config/output.properties +++ b/DeFi-Data-Engine/DeFi Data Engine/config/output.properties @@ -1,7 +1,7 @@ # === GENERAL PROPERTIES === -consumer.types=socket_consumer -producer.types=socket_producer +consumer.types=null +producer.types=null # === REST SOCKET PROPERTIES === @@ -21,3 +21,8 @@ output.socket.address=localhost # Output socket port output.socket.port=61200 + +# === LOCAL STREAM PROPERTIES === + +# local stream type used for database configuration +local.stream.type=mongo_db \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/pom.xml b/DeFi-Data-Engine/DeFi Data Engine/pom.xml index bed7d8a0..3df3471f 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/pom.xml +++ b/DeFi-Data-Engine/DeFi Data Engine/pom.xml @@ -41,21 +41,6 @@ json 20220320 - - org.apache.kafka - kafka-clients - 3.2.1 - - - org.apache.kafka - kafka-clients - 3.2.1 - - - org.apache.kafka - kafka-streams - 3.2.1 - @@ -70,8 +55,6 @@ - - diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/core/Core.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/core/Core.java index 67b11a1c..eb550b57 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/core/Core.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/core/Core.java @@ -2,6 +2,7 @@ import org.core.engine.Engine; import org.core.logger.Logger; +import org.framework.router.Response; import org.framework.router.Router; import org.out.controller.Controller; import org.out.handler.OutputHandler; @@ -14,12 +15,13 @@ public Core() { OutputHandler out = new OutputHandler(); Controller crl = new Controller(); - Logger log = new Logger(); Engine eng = new Engine(); StreamManager str = new StreamManager(); - this.connect(out, crl, log, eng, str); + this.connect(out, crl, eng, str); - this.send("ENG", "STRT", ""); + Response response = this.send("ENG", "STRT", ""); + if(response.code() != 200) + Logger.terminate(response); } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java index 668ab259..33c006a8 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java @@ -2,7 +2,9 @@ import org.framework.router.Packet; import org.framework.router.Response; +import org.framework.router.ResponseFactory; import org.framework.router.Router; +import org.properties.Config; public class Engine extends Router { @@ -11,8 +13,16 @@ public Engine() { } public Response processSTRT(Packet packet) { - // start output processes - // TODO: check all engine processes outside of OUT prior to returning response. - return send("OUT", "STRT", ""); + // start output processes: + Response out_response = send("OUT", "STRT", ""); + if(out_response.code() != 200) + return out_response; + + // start local stream handler processes: + Response lsh_response = send("LSH", "INIT", Config.getProperty("output", "local.stream.type")); + if(lsh_response.code() != 200) + return lsh_response; + + return ResponseFactory.response200(); } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java index ce5099c2..528488b9 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java @@ -1,10 +1,45 @@ package org.core.logger; -import org.framework.router.Router; +import java.time.Instant; -public class Logger extends Router { +import org.framework.router.Packet; +import org.framework.router.Response; - public Logger() { - super("logger", "LOG"); +public class Logger { + + public static final void log(Packet packet) { + System.out.println(packetFormat(packet)); + } + + public static final void log(Response response) { + System.out.println(responseFormat(response)); + } + + public static final void terminate(Packet packet) { + System.err.println(packetFormat(packet)); + System.exit(1); + } + + public static final void terminate(Response response) { + System.err.println(responseFormat(response)); + System.exit(1); + } + + private static final String packetFormat(Packet packet) { + return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%s] [%s]", + Instant.now().toString(), + Thread.currentThread().getName(), + packet.getSender(), + packet.getTag(), + packet.getData(), + packet.getSubData()); + } + + private static final String responseFormat(Response response) { + return String.format("[%s] [%-10s] RESPONSE - [%3d] [%s]", + Instant.now().toString(), + Thread.currentThread().getName(), + response.code(), + response.data()); } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Packet.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Packet.java index 59bde8d1..be8bafab 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Packet.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Packet.java @@ -1,5 +1,8 @@ package org.framework.router; +import org.core.logger.Logger; +import org.properties.Config; + /** * The {@link Packet} class represents a standardized data transfer object * used throughout the engine. It contains a series of values which help to @@ -10,6 +13,8 @@ * */ public class Packet { + + private final static boolean log = Config.getProperty("app", "general.logging.packets").equals("true"); private final String sender; private final String tag; @@ -89,7 +94,10 @@ public final String getSubData() { * @return New {@link Packet} object. */ public static Packet packet(Router router, String tag, String sub_tag, String data) { - return new Packet(router, tag, sub_tag, data, ""); + Packet packet = new Packet(router, tag, sub_tag, data, ""); + if(log) + Logger.log(packet); + return packet; } /** @@ -103,6 +111,9 @@ public static Packet packet(Router router, String tag, String sub_tag, String da * @return New {@link Packet} object. */ public static Packet packet(Router router, String tag, String sub_tag, String data, String sub_data) { - return new Packet(router, tag, sub_tag, data, sub_data); + Packet packet = new Packet(router, tag, sub_tag, data, sub_data); + if(log) + Logger.log(packet); + return packet; } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java index 6165a6aa..ffa3ecfd 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java @@ -1,5 +1,8 @@ package org.framework.router; +import org.core.logger.Logger; +import org.properties.Config; + /** * The {@link Response} class is used to relay information from a * given {@link Packet} sent through a {@link Router}. {@link Response} @@ -18,6 +21,8 @@ * */ public final class Response { + + private final static boolean log = Config.getProperty("app", "general.logging.responses").equals("true"); private final int code; private final String message; @@ -91,7 +96,10 @@ public String data() { * @return New {@link Response} object formatted based on the passed parameters. */ public static Response create(int code, String message) { - return new Response(code, message); + Response response = new Response(code, message); + if(log) + Logger.log(response); + return response; } /** @@ -104,6 +112,9 @@ public static Response create(int code, String message) { * @return New {@link Response} object formatted based on the passed parameters. */ public static Response create(int code, String message, String data) { - return new Response(code, message, data); + Response response = new Response(code, message, data); + if(log) + Logger.log(response); + return response; } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java index 79bd17bc..39228d78 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java @@ -16,8 +16,12 @@ public static Response response0() { return Response.create(0, ""); } + public static Response response200() { + return Response.create(200, "Successful Response."); + } + public static Response response200(String data) { - return Response.create(200, "Successful Response.", data); + return Response.create(200, "", data); } public static Response response220(String hash) { @@ -84,6 +88,30 @@ public static Response response429(String hash, String request, String response) return Response.create(429, String.format("Stream with hash <%s> returned an irregular response when attempting to subscribe to <%s>. Response returned is: <%s>", hash, request, response)); } + public static Response response440(String source) { + return Response.create(440, String.format("Requested data source <%s> does not exist in cache.", source)); + } + + public static Response response441(String hash) { + return Response.create(441, String.format("Requested data stream with given hash <%s> does not exist in cache.", hash)); + } + + public static Response response442(String source) { + return Response.create(442, String.format("Failed to add local data source <%s>.", source)); + } + + public static Response response443(String source) { + return Response.create(443, String.format("Local data stream with given source <%s> already exists.", source)); + } + + public static Response response444(String source) { + return Response.create(444, String.format("Failure to authorize the local data source <%s> with the given properties.", source)); + } + + public static Response response445(String source, String query) { + return Response.create(445, String.format("Local data stream with given source <%s> could not validate passed query <%s>", query)); + } + public static Response response460(String consumer) { return Response.create(460, String.format("Output consumer <%s> failed to listen to consumption channel.", consumer)); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputHandler.java index 3afe2b11..5d7cea71 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputHandler.java @@ -26,7 +26,7 @@ public Response processSTRT(Packet packet) { if(!(boolean)producer_response[0]) return ResponseFactory.response470(producer_response[1].toString()); - return ResponseFactory.response200(""); + return ResponseFactory.response200(); } catch(Exception e) { e.printStackTrace(); System.exit(1); @@ -42,6 +42,6 @@ public Response processEDAT(Packet packet) { if(!manager.send(packet.getSubData(), packet)) return ResponseFactory.response472(packet.getSubData()); - return ResponseFactory.response200(""); + return ResponseFactory.response200(); } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/ProtocolDirectory.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/ProtocolDirectory.java index fee81cba..c5558db0 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/ProtocolDirectory.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/ProtocolDirectory.java @@ -18,6 +18,7 @@ public class ProtocolDirectory { protocols.put("kill", new String[]{"SRC", "KILL"}); protocols.put("subscribe", new String[]{"SRC", "SUBS"}); protocols.put("request", new String[]{"SRC", "RQST"}); + protocols.put("scan", new String[] {"SRC", "SCAN"}); } public static String[] getProtocol(String protocol) { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java index b192dd87..e6c798e5 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java @@ -5,7 +5,6 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.HashSet; import java.util.UUID; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java index a72ccec2..c6f2c1d4 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/properties/Config.java @@ -39,6 +39,11 @@ public static final String getProperty(String name, String property) { return properties.get(name).getProperty(property); } + public static final void setProperty(String name, String property, String value) { + validate(name, property); + properties.get(name).setProperty(property, value); + } + public static final void validate(String name, String... keys) { if(!properties.containsKey(name)) { System.err.println(String.format("Property file <%s> does not exist. Program terminating.", name)); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java index 3190d837..84772b9a 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/AmberDataConnection.java @@ -24,7 +24,7 @@ public AmberDataConnection(ExternalStreamManager manager, String data) { public String getUUID() { return "amber_data"; } - + public void init() { } @@ -42,7 +42,7 @@ public void defineRequestTypes() { public String getHash(String data) { try { MessageDigest md = MessageDigest.getInstance("SHA-512"); - byte[] bytes = md.digest(("salt" + data).getBytes()); + byte[] bytes = md.digest(("salt" + System.currentTimeMillis() + data).getBytes()); BigInteger signum = new BigInteger(1, bytes); String hashed = signum.toString(16); while(hashed.length() < 32) diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java similarity index 92% rename from DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateConnection.java rename to DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java index ce1f0fe2..1b989d42 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/connected/connections/TemplateExternalConnection.java @@ -3,13 +3,13 @@ import org.stream.external.handler.ExternalStreamConnection; import org.stream.external.handler.ExternalStreamManager; -public class TemplateConnection extends ExternalStreamConnection { +public class TemplateExternalConnection extends ExternalStreamConnection { private boolean active = false; private boolean authorized = false; private boolean override = false; - public TemplateConnection(ExternalStreamManager manager, String data) { + public TemplateExternalConnection(ExternalStreamManager manager, String data) { super(manager, data.split(",")[0]); if(data.contains(",")) diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java index af654361..f5de1bbb 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/external/handler/ExternalStreamHandler.java @@ -19,14 +19,14 @@ public Response processEXSR(Packet packet) { if(packet.getData().equals("")) return ResponseFactory.response500("ExternalStreamHandler", "source"); - return Response.create(200, "", String.format("%s", manager.containsTemplate(packet.getData().toString()))); + return ResponseFactory.response200(String.format("%s", manager.containsTemplate(packet.getData().toString()))); } public Response processEXST(Packet packet) { if(packet.getData().equals("")) return ResponseFactory.response500("ExternalStreamHandler", "streamHash"); - return Response.create(200, "", String.format("%s", manager.containsStream(packet.getData().toString()))); + return ResponseFactory.response200(String.format("%s", manager.containsStream(packet.getData().toString()))); } @SuppressWarnings("deprecation") @@ -63,7 +63,7 @@ public Response processINIT(Packet packet) { // if successful authorize if(authorized) - return Response.create(200, "", String.format("true, %s", hash)); + return ResponseFactory.response200(String.format("true, %s", hash)); manager.removeStream(hash); return ResponseFactory.response422(template); @@ -76,7 +76,7 @@ public Response processIATH(Packet packet) { if(!manager.containsStream(packet.getData())) return ResponseFactory.response421(packet.getData()); - return Response.create(200, "", String.format("%s", manager.isStreamAuthorized(packet.getData()))); + return ResponseFactory.response200(String.format("%s", manager.isStreamAuthorized(packet.getData()))); } public Response processIATV(Packet packet) { @@ -86,7 +86,7 @@ public Response processIATV(Packet packet) { if(!manager.containsStream(packet.getData())) return ResponseFactory.response421(packet.getData()); - return Response.create(200, "", String.format("%s", manager.isStreamActive(packet.getData()))); + return ResponseFactory.response200(String.format("%s", manager.isStreamActive(packet.getData()))); } public Response processEXEC(Packet packet) { @@ -102,7 +102,7 @@ public Response processEXEC(Packet packet) { if(manager.isStreamActive(packet.getData())) return ResponseFactory.response424(packet.getData()); - return Response.create(200, "", String.format("%s", manager.executeStream(packet.getData()))); + return ResponseFactory.response200(String.format("%s", manager.executeStream(packet.getData()))); } public Response processKILL(Packet packet) { @@ -115,7 +115,7 @@ public Response processKILL(Packet packet) { if(!manager.isStreamActive(packet.getData())) return ResponseFactory.response425(packet.getData()); - return Response.create(200, "", String.format("%s", manager.killStream(packet.getData()))); + return ResponseFactory.response200(String.format("%s", manager.killStream(packet.getData()))); } public Response processSUBS(Packet packet) { @@ -145,7 +145,7 @@ public Response processSUBS(Packet packet) { Object[] subscription = manager.subscribe(hash, type); if((Boolean)subscription[0]) - return Response.create(200, "", String.format("%s", "true")); + return ResponseFactory.response200(String.format("%s", "true")); return ResponseFactory.response427(hash, type, (String)subscription[1]); } @@ -180,7 +180,7 @@ public Response processRQST(Packet packet) { Object[] response = manager.request(hash, packet.getSubData(), request); if((Boolean)response[0]) - return Response.create(200, "", String.format("%s", (String)response[1])); + return ResponseFactory.response200(String.format("%s", (String)response[1])); return ResponseFactory.response429(hash, request, (String)response[1]); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java new file mode 100644 index 00000000..57b73488 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java @@ -0,0 +1,5 @@ +package org.stream.local.connected.connections; + +public class TemplateLocalConnection { + +} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/DataState.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/DataState.java new file mode 100644 index 00000000..da3b46fa --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/DataState.java @@ -0,0 +1,10 @@ +package org.stream.local.handler; + +public enum DataState { + DOES_NOT_EXIST, + PARTIAL, + EXISTS, + MODIFIED, + CORRUPTED, + INVALID; +} \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java new file mode 100644 index 00000000..20251152 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java @@ -0,0 +1,22 @@ +package org.stream.local.handler; + +import org.framework.interfaces.Hash; +import org.framework.interfaces.UUID; + +public abstract class LocalStreamConnection implements UUID { + + private final LocalStreamManager manager; + + public LocalStreamConnection(LocalStreamManager manager) { + this.manager = manager; + } + + public abstract boolean authorize(); + public abstract boolean isAuthorized(); + public abstract boolean isReady(); + public abstract boolean validate(String... query); + public abstract boolean contains(String... query); + public abstract DataState state(String... query); + public abstract String get(String... query); + public abstract boolean modify(String data, String... query); +} \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java index bb88996d..01a34967 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java @@ -1,10 +1,53 @@ package org.stream.local.handler; +import org.framework.router.Packet; +import org.framework.router.Response; +import org.framework.router.ResponseFactory; import org.framework.router.Router; public class LocalStreamHandler extends Router { + private final LocalStreamManager manager; + public LocalStreamHandler() { super("local_stream_handler", "LSH"); + this.manager = new LocalStreamManager(this); } + + public Response processINIT(Packet packet) { + if(packet.getData().isEmpty()) + return ResponseFactory.response500("LocalStreamHandler", "source"); + + String source = packet.getData(); + + if(!manager.containsTemplate(source)) + return ResponseFactory.response440(source); + + if(manager.isStreamDefined()) + return ResponseFactory.response443(source); + + if(!manager.setStream(source)) + return ResponseFactory.response442(source); + + if(!manager.authorize() || !manager.isAuthorized()) + return ResponseFactory.response444(source); + + return ResponseFactory.response200(); + } + + public Response processSCAN(Packet packet) { + // data: query + if(packet.getData().isEmpty()) + return ResponseFactory.response500("LocalStreamHandler", "query"); + + if(!manager.validate(packet.getData())) + return ResponseFactory.response445(manager.streamType(), packet.getData()); + + if(manager.scan(packet.getData())) + return ResponseFactory.response200("true"); + + return ResponseFactory.response200("false"); + } + + //public Response process } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java new file mode 100644 index 00000000..fe427cee --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java @@ -0,0 +1,117 @@ +package org.stream.local.handler; + +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Set; + +import org.reflections.Reflections; +import org.stream.external.handler.ExternalStreamConnection; +import org.stream.external.handler.ExternalStreamManager; + +public class LocalStreamManager { + + private final LocalStreamHandler handler; + private final HashMap> templates; + private LocalStreamConnection stream; + + protected LocalStreamManager(LocalStreamHandler handler) { + this.handler = handler; + + templates = new HashMap>(); + + try { + reflect(); + } catch (Exception e) { + e.printStackTrace(); + System.exit(1); + } + } + + private void reflect() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException { + Reflections reflection = new Reflections("org.stream.local.connected.connections"); + Set> types = reflection.getSubTypesOf(LocalStreamConnection.class); + for(Class c : types) + templates.put(c.getDeclaredConstructor(LocalStreamManager.class, String.class).newInstance(this, "").getUUID(), c); + } + + protected boolean containsTemplate(String type) { + return templates.containsKey(type); + } + + protected boolean isStreamDefined() { + return stream != null; + } + + protected boolean setStream(String type) { + if(!templates.containsKey(type)) + return false; + + try { + this.stream = templates.get(type).getDeclaredConstructor(LocalStreamManager.class).newInstance(this); + return true; + } catch(Exception e) { + e.printStackTrace(); + System.exit(1); + } + + return false; + } + + protected String streamType() { + if(stream == null) + return null; + + return stream.getUUID(); + } + + protected boolean authorize() { + return stream.authorize(); + } + + protected boolean isAuthorized() { + return stream.isAuthorized(); + } + + protected boolean isReady() { + return stream.isReady(); + } + + protected boolean validate(String... query) { + return stream.validate(query); + } + + protected boolean scan(String... query) { + if(!stream.validate(query)) + return false; + + return stream.contains(query); + } + + protected boolean contains(String... query) { + if(!validate(query)) + return false; + + return stream.contains(query); + } + + public DataState state(String... query) { + if(!validate(query)) + return DataState.INVALID; + + return stream.state(query); + } + + protected String get(String... query) { + if(!validate(query)) + return null; + + return stream.get(query); + } + + protected boolean modify(String data, String... query) { + if(!validate(query)) + return false; + + return stream.modify(data, query); + } +} \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java new file mode 100644 index 00000000..17ca01d2 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java @@ -0,0 +1,5 @@ +package test.protocols; + +public class TestLSH { + +} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java index eaf18792..5eca334b 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java @@ -113,7 +113,7 @@ public static void testSpeed2() { } public static void main(String[] args) { - //testSpeed1(); - testSpeed2(); + testSpeed1(); + //testSpeed2(); } } diff --git a/DeFi-Data-Engine/Rest Application/.classpath b/DeFi-Data-Engine/Rest Application/.classpath index 4f20f1ad..d118e897 100644 --- a/DeFi-Data-Engine/Rest Application/.classpath +++ b/DeFi-Data-Engine/Rest Application/.classpath @@ -24,7 +24,7 @@ - + diff --git a/DeFi-Data-Engine/Rest Application/pom.xml b/DeFi-Data-Engine/Rest Application/pom.xml index 9975fbd2..9644e635 100644 --- a/DeFi-Data-Engine/Rest Application/pom.xml +++ b/DeFi-Data-Engine/Rest Application/pom.xml @@ -47,51 +47,9 @@ json 20220320 - - - - io.vertx - vertx-core - ${project.version} - - - io.vertx - vertx-web - ${project.version} - - - io.vertx - vertx-kafka-client - ${project.version} - - - - - io.debezium - debezium-core - ${debezium.version} - - - io.debezium - debezium-core - ${debezium.version} - test-jar - - - org.apache.kafka - kafka-clients - 3.2.1 - - - org.springframework.integration - spring-integration-ip + org.springframework.integration + spring-integration-ip diff --git a/DeFi-Data-Engine/Testing Environment/.classpath b/DeFi-Data-Engine/Testing Environment/.classpath index a3622af7..cddbf79a 100644 --- a/DeFi-Data-Engine/Testing Environment/.classpath +++ b/DeFi-Data-Engine/Testing Environment/.classpath @@ -6,7 +6,7 @@ - +