From c390be483040c86af3091698745b16de7e63567d Mon Sep 17 00:00:00 2001 From: Conor Flynn Date: Thu, 22 Sep 2022 00:42:06 -0400 Subject: [PATCH] Integrate MongoDB classes. --- .../Internal Manual/Packet Spreadsheet.xlsx | Bin 17283 -> 18868 bytes .../DeFi Data Engine/config/app.properties | 3 + .../DeFi Data Engine/config/output.properties | 28 --- .../DeFi Data Engine/config/stream.properties | 48 ++++ .../config/testing.properties | 3 + DeFi-Data-Engine/DeFi Data Engine/pom.xml | 2 +- .../src/main/java/org/core/engine/Engine.java | 9 +- .../src/main/java/org/core/logger/Logger.java | 15 +- .../java/org/framework/router/Response.java | 4 +- .../org/framework/router/ResponseFactory.java | 26 ++- .../org/out/consumers/SocketConsumer.java | 4 +- .../java/org/out/handler/OutputManager.java | 4 +- .../org/out/producers/SocketProducer.java | 2 +- .../connections/MongoDatabaseConnection.java | 111 +++++++++ .../connections/TemplateConnection.java | 5 - .../connections/TemplateLocalConnection.java | 83 ++++++- .../mongodb/MongoDatabaseRequestHandler.java | 214 ++++++++++++++++++ .../local/handler/LocalStreamConnection.java | 7 +- .../local/handler/LocalStreamHandler.java | 97 +++++++- .../local/handler/LocalStreamManager.java | 28 ++- .../test/framework/router/TestRouter.java | 12 - .../test/lsh/mongodb/TestMongoDatabase.java | 97 ++++++++ .../src/test/java/test/protocols/TestESH.java | 4 + .../src/test/java/test/protocols/TestLSH.java | 106 +++++++++ .../src/test/java/test/protocols/TestSRC.java | 19 +- .../java/test/speed/TestRouterSendSpeed.java | 4 - 26 files changed, 845 insertions(+), 90 deletions(-) delete mode 100644 DeFi-Data-Engine/DeFi Data Engine/config/output.properties create mode 100644 DeFi-Data-Engine/DeFi Data Engine/config/stream.properties create mode 100644 DeFi-Data-Engine/DeFi Data Engine/config/testing.properties create mode 100644 DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/MongoDatabaseConnection.java delete mode 100644 DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateConnection.java create mode 100644 DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java create mode 100644 DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/lsh/mongodb/TestMongoDatabase.java diff --git a/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx b/Data Engine/Documents/Internal Manual/Packet Spreadsheet.xlsx index 86dc1b834d4751ff595f76407a6430dfeca230c3..fed61c8937b76b90426801a8ebdbbb0a8dce75f5 100644 GIT binary patch delta 11272 zcmZX)WmFwY*Dbv9;O=h0-92b*IfiHF)v*I5Nk?(YZ=hB^#TbcJ{^B73HzK{tDpb%Q%Wc0?=d|9N?yla1 zN9)kIT$WW1YxA7cLdzQ$vxBnyVhm?|QjO505bTh5ZQ+N)f=8p@JdXgF$J}WR^w9U+ zd^p&%kQ^u7Q;Jr8Opt3&@E*W|v?%yXgHo44L>tJogfOXK-cD3g|h0~tYSHJNEDw?2=F|Z3p74GP+3&h=T5g1qqD<9RJAH5#1|DQh4jvdbyM>aJAL2Qf z69{u~No2+xRBBH)UW}^cT9$!|hGqJJg#6$ft5Uz#hwJ!j8fI?PEC)0I=bAi2u`N{Z zVSUnkk-!d zLk0+xf{H!9&M{(k?1TzJS(SHxvW9kA(#F)rlT&=`Fkq<-U0qc5?1$55_bLXWIo1iH?W$(ZCM4SR|Foq^^@ zcv~5yK6)!(iH{4%XNZ^#4!TJJ>IEfy_56jO!_%bKgc*XakrWjsO>iw0gmUlyTD=c@ z`fE4O(N+BYC_T@y?u+3jLn9U9khq6}X{5LEcq6Zt-$nN;cJ1c(lqq{kFT}!%0-LtB zd}3+}2F++wlH)qE&{em?fj?VpO8n=Hmoy1T5+;IsvMi2BFo`KrqPKt-$r~ zo6aj-6-XXtD&W(3$Esgf>}OtvAmzyg6|x5dsvikphh21bB>s_m7$XR}zB@lqBw(Y< zh{c)M;`kW88FMXNUR)?@TrJ*DL}9G2MMdNZV&W6a5bu#$D{g#Hjui^+-r5ZIQXIn~ zl6NYpRY%&hi2kiZhSOO%&{->GSXU~yt#E7>&F(@AbWQVZt1tH#fj7zE<`%ftbcZ(9}kIVE(0wC+X~wI6Ws7 zq+6EOZf_Mo7qn3Gw<5YIFO>M9X#_3ON=ED&RfgjJrT`OX&KzHBL}3!!vzBJyy_z8D z_!PFoykMkaTv#6ISa!?)hG%m=#^Jo%My;)<$)>@12QT$jRvr>{4<6KyMEc;Uf;Mtt zRa+fZHj(Bht3il1+!{2%8G>?Dw&||Gq(VzRA7~07@i2S;8 z{@u|fC(_2(5tZcFwee7n*-rM;Jt2&(WPjTb&+{0pZE#16{t8~ug(NMRbi*zNOi!=hh<&wAq{Um%r#BJ zdD>>FQ^aDT2Quj`E$T$JTwkvRM+YQlyrGN#MbdU9p&L{NnU-fPY(D(E(c zu`h_V+P;%dTcYkUc>N}+uf5|+z-Ty2iwx~RY0$>l2R|Yb05ZP#wu5qb_-ez;v;SXowkW$lVM+?F=<0>s95E z=O5=Z2q34xB4qv`OPGKG03Pu{3HbOx_J6Wy7Dzw3kQFQJ0{2cNWOb!%E-q!+xJ1(3 zNITU`q#Ep5nIGu{T0~8Ky>Mx4&tf> z{T4gZ%$rS!5hfvOV>9O|`km(-@eK~4Zxg%6X|b2@l`lPi^gWJWJca^zTJcx-H*IR+ z&n{gR(Q%(WU|KJ3!pgu7x@_ITA4V@XonXvEmE@d0lcOG5iA|7YidI9?eu7K{HMc(% z{fZi9!zsi`_npTM`2KrVmbG75J)otJQdvBaNTt2~w;prL0r;$OqqFN(r&FYA+}rKd z$-8sLFn*v$vny64o3!i2>$l$IkA)R*n@{blcbC9mf379ht84MQ$lZ#4%j4n&_W=@# ztmE?fALMLTk%@}9!a3ip#6cJc9ry|ALjt|5sA%0R*j%V3pi!7m0@~)~1!6vf=4zLr z^dgE?LW)4_FcdNb%eKH0Q+!*{naFNbmel!Y7yC(U> zLo!WZE840n+TzmqV#C$<903?DAcBI!F|wwDF*y74dxB;5{v-Sq{ z?RDjsJYq+e29BYfIVzg?$t8Qq((^a5aVxW>Wk6%8v2sz5nB}Ob=9TmgL5|(h5Kosw zA9-R~8qR59c`~y{d&4;HMX5o38dg5%u1dRUyNT$eX$v}b7(!{^rq`reMKde}G3(?E z1)PvC)15bspuzJjC*r&DaG0}yDu9l_6T zOs}*{+XxIuLb{U);49vRE(G&r-j0iL!9dEglOZ*PJWHK4wOz%gmO_UiAy1}{@3EYNTiQQ8-#T=nuXI3z>&-Hk(9}RD^V+~Xkq_sTlqe^sXL~am=?pv z;<(<*vZxDDf^m|JqbbRl#TUO#+J~8;Q)`WMcRcr>EeZ_5gP`gmh)UIx30hQ2*}T-x znz!7ABEZUEHtLCJe%zFD@hQR@Qvl^L<%5>EZ-fUsnbDaAd68H5jO+Y`b7or zXDDz`v)!z^j_hx>v?gO*0XkT0u92qbjwFl@f!f$?*FgNtCn8kV^K>Jn!U;Dc^?tLl zw%rd?x{y%9R;~D@HCUy_vxkvLtv#@&?7@WTY73Zx-PHq28WLMwdv{TPWUz<-+HSy{6mG_e zs&S=?MRkRV`uW-nLt@w(4|S*i!ILAY$A?{p3 zzQgLb*w&A4>iGJOvO3}RF-uM9lVwEYv!XEdw+QIJi1H>4P{I!XiexJi7Q(hib;JT$ zf=(vpHW-KV(VjI($`m{})fs zdhlYt-aY{LKPkIXN8bUs!;SXB5D4t>3{EO3)vSasgtxE!IOt+A5N00R-hMdfT_}7$8L-E} zeNX$lb+z9DY*88_#T`X;Z~E2lce6WErw^WfeLu4|v1b8poZGXIu8H8<#%PU2 z9WI(kG_1#vKVMk~GFM4tV(H7;GI+~YDzHVgV!m?QEV-~u+eEt4=>R;@^rLwzhxGXv zJmQF5Jdp9rVA>eTJm_>P$-L-(!PcfY$t}6jC?B@tTGj`6Hwi4=%Ss6+zg9HJc}^(h z`=vBSCs&&n&(;A!$phFVsfAzYis@wOu=UxB*_j#JvL)DxA&Zkoe*gOXI^f^FlN@$u zD_cWW{C26OC-~;(>-YAz_j>dP+z+t@R=?y2G9jj69@~KoJ_gU1FuG)KGb*Uj<~d&0 zS;Z}_hz3pGqDXEW<~z)<3pA2)h=kh)1qt8^*{#aH6YX~3K!dUzucP%~q%3${Mj3+@ zW+9|ix%OSozPvxX+dZ$9?#A0-BR}x!@1z}Oc zt2iRVsWEIPL63|fXc75@iep$hZNj52;KPdv$e}GaVEpp7lycf#qJ^lOUv`p$KV*~> zRrzw$r;q>)3d8#LS;NSz(2=_~ou3KCfG#rzGCv!ex2ZR(tO65&p~)Qj_HE4CJoTDh zN9pSv8DCR8U!4oId=EU9gFk+H@(C%d`PUgMs%0T-Dzt}9>8DT*0OQ7|?qKz~fL*tY zBSAr8x1fH_%-P(jI}F;<S8dJwuxjN}fNvBkS zs(P%1z%}PZ%qd-E1dB1h%^)3g>lWLpSi-BZvpVJFOWYt2lp-UpXO2TzZ2XJ|kX=;a_s67OuN?qDKG*y#2UqLlIc)3HBGM@`i#-Q?-``$#>p0^E$nLc4;k83kz z@edeCE;-q?qL#Bf9y5vqL;bEJ0tn*MBK4}+!omJ96t#lnoW`2`6T>{;S42Ui;)g4sk~$$iSfozYV-swxaFF4z0GnMNFej0S9dJ8 zd=J-pNp-YlcE+lgP^XAb_s~0h2j}(qQ-T|5LWkotM-KIB@}5o17a88IPvtrp%^|{^ z$6?JKsxpVI#B?r{`-~e?l)1f@PZJ|s$yOhBbm4Te)jBt3st3j+2x5` zzWmA*Wu`cx&>|n-&CSarz4BQ87tLQNFe7}Y2aj=`$7&ckO>spBj(}gE2_Y&5q(Uk$ z0(P%Sgyyupl$cOcM`E#Q_w==x9=a*o=&dvvQQDR#2J>!X&FU!4W^%4Y#p7p^Pze%z zoeNj^7+T|m>&jqchZU6X3N*TRC53=5Ik;zP7**Dpat?rGbw5bnPMQ$gl1zld>qrRUY9=UNa%PO;>nhY+gHkCH&pVvOW6UzflJfA^qSQ|H zu{&779j1gZIkjd}aTFkKP3ZU%o|Q~~%Z zt_c-KV++NL6T(&qsuAc2eScG*`kLk3C=KJCmsrFC z1p%742dJJ=UY<&78r{Ar%*m7P38nvP)-Y8;aP(ELMl#o?w&Uy=Ym^3hauZ3ZNEL&> z7&mO+yHB?>YEv&|AFCar8bVh{ZF;3lOYx@Yzp`C3d7@jtlp8v6A$(O%htl>9pR-QX zGH1e|Dpx(gYH4~QH`ZB0y;#-*ugZr~sK0QdMz;s?$(C?pU_n>mL>ipXQ|&FPgvt3I zQ*xxF4qa94e%P^l&XB-ik14YkCme#KaNt>{vc*)`i;*<9Q5jiwlnmdY{;gPW2=ZRj zpa6gy#D8HV=l=&ISK>1NDO?zCi5WNGOj2thLMypt3ahnOu~#{OsoZH)cx*J zc54|VD`rvv@n=`}ldL;ey#@=Wi1T#d-DEg&>M3c}GV>Jh0crG~2#fewQN8LVKm4O`xofnJ z!5$^JUcc6ou5Wpecv#Oid%9(yt}5c1NEZIWE->Jm?LVUOc{kKG`&|K#gac7`4*L>_ zjoryYCEQV4pt-mvU_1;n|H_yAa!MZcEQ`lRNwkfezz& zJ*!e+S=z%n(F<&E(B+Q^n1Fw;R55$M*hKY!6PYh^!bWw&}3SX);#t1AI# ztp7Z6RVroR7fJw@qjY8RN6S!~(67r-orM!W@5qS|5#BFYAo~qrw_h=)(HL=>Sa;J7 zoGL4_(%6sU*$bOaAhY>WLlm+%=R^$Cg#FrMTlF2vA~c!}YQkcZ+Yd{F%T{}Lk}I`7 z!HXXXul-s?^^^Pi;!hpQFik=>)C&DR$7^yI`-rVMDI`NF|L(&C@ruqi4;PjJ(iSRG z6$B5%#ia9jWY!E#!aYZxWK4ds)%pMX1pn-Gz7VFPr$7O+&o}RbBFKi*4uF3m z3*qWFmbL`Jeh7yAe@qYv^M`NQ2G|}?>g|zM8})!I=Uj4eY(h0r$rqM)3-R{j-Yzm+La&^uP>pSc&@<#h{&~1? zzVCky0v-oJ57mexV2VWK^sk?2QuO~WQ5s7{trkMsz<*0z(uomTAkjCSBbo!svxFf|9{;j^q|PcvUyw z;T4aR*td^2w?noPicG5}&o|6ky9XxrAY2u3A4tt=7vSXmUg$TqizPLDOX?ccu|OQPml{s*}2`wh8AcZ{`Hx&qUnm(B(G-vXGUW12W(D4 z0+N9tj2K29JF^)`Gc6YVf_IgZdpd#}dqe7P_({n(cAUBinQ?$|+(wf|OvqiNIJe^= z?@)%6GtDoiks^1?*jngP*8mzNgW#rzNUf_Tt!k2Qf!gW0*k1Iw^`O#j;9~zD!x+sAY#(*O$^CouxS_Z>Wtfk{E^4H zulZ8q>o~&8qrFS!#req4Xpq8wKVpWbSN)QgOvl=(yBUeZpSw|8;&{(|-E?JL?_E5_Si&Ggg1gfXyC$qtVP5dfH? z0|2o8Z-`kMJDZuRx;k6gTl^bol{$6~E4&zQ{@t%|6r?!X-Btz(%`I!3hzl~hhF~fi zfoRC+P*J%UCbiTKiWKhWlMDQ$dNY*0qHaHH@x)%QT#GQ_=d)*aT-1sgZS9UV0e`1P z8nwh^D~+)asI{iBPWob@WJwvVwr`fdelA4cdB5`5_3l>EgwCsb*y4j|qTzso>Q?_v zuG}!0L49-7Pn)UuPww2yw;FnC)y{7tjMu%O!a~h(xpZtF3c8b?jx`@I-+G#3isyC> zs21nHPnH)i@)idc`@R$h7UBCZrwMPaay#DcXxeW;agFq>J=ySQmKqUO4A3-{-ZN)M z8^Xd<@GvR5vqMkQ&rH<1CdZY-d$x9WL3}@yd(oqm^a}NK#`TC?=WB)=k#{S5e`KBt z3cm^N2sZQYzFQr7$WQVxq=TgCmRcqSbc;%a>Y=2tlv2)r&x7QUXtWh$vjhU0$~QyV z62FcBXVuYjx`a=|+JH!J)hS)5b4%8;cj zjpVd8(1=&En;3n>`6Kck#m4G2sFs^5t2B2nPw3H)awjvj%s4{-{Iyu=e4?1!NMzE( zrluZ8`6#*1P7eQvkstx9;qIF&zEeA5F^FClh+p(Gp$E<^qyd>dwMpTZNm=6RzIOhL zFwUEH&NJ~=DO;$?U`yh)kLdhK1pL!}D}sk!x9c0I7AMB>Y!{ud{cO=BoA<>Zh64e_ zr0xQiwXRYT-H&3IAl*LYoEeBauCKEpT*1nwo!>J;Cq6}*BB39>O_c_Z~x?hnX1g(^|OfEQGxC-X#gr(O>1GAB3L}7 zhcTJfrm#UR62?fZx&oKcT?$G=*a)Q=xZlV#?yIp})t%<%PIwTc=S>Z&8Nwv^Eq2ON z+5S^}0PYn6j0iJ*uE(1I5L-^CX6S>I!5_xqn`Is^(%2Frg+TD!&Lb$56q}t%kp1=i zcT}>kbC5EX=4o}NR~U|$l;tVQz|?LRu@Wz@JBpz~L_YEXqgsI-)F4^p4#G|Xs1{;oGIJvYlHL=VTJda9gHMpTV7aa?3g8C$%q8$GT4Z z!Wc7j{!u|wMJT7;niOZv34V4SFa(m&&NV(b8E!eOF32(0-BEtkHzj^+@Gb1Pf*#ZC z1XPXrwBeV1a0&>ioU&1Sq|^)WMowyW#<|AoQ-L@tV%f}b%h6FLRzW{6vjkA=xMq%Z zj{R-hCdpL4gxhSpd^nn({&O_V!d6j65^I=%!BG{M?qR6(mH&<_B2yUv+&!0+&2hA8e3uH3^B1Mk@C|)r&4#`lLE8^Qa{=})ZiYE$PFmbUI8T=yZy8$~e z8tBr|{mr&o#*^Qly%O=p5Zb!afuo$`1sY|9M=9lK)V1tln)||Y{C+706R)=?PurD_ z?u`g;$936A@a5;oQe83R>pMj4cS*2o^wvnG8j(TmzwVV*>5Y zocOZ7s1PCZW%`AfxWYJeG%z`Qad@aU;GxI&AYqe`BG9SPC3)8cj66jb)1yN44Sm<-h|e?zNj@p z3rUnbpcPL)j%Fg(Dh`i-L|rEP%Hd!8oN;w=?cP>r%IGusQFetj=$^QVko-l&+iU!Td)> ze6ZpVhiU{s0`~+9PSZ$W$EDr9oi-b*x04h8%<%?_c65<6@uw`Ms3t?!Fg^f+KsWHT zq-d<4tJX+*Hois=vtM}miyNgkiblX5TPt)LYP=ot zi3w>T*x-VH%ij>G*Q~-cX03#R?dN)J`V0=h>G~FUspd3PGPW~3&cq;6qQ@tXLDS&w zbUdg(F~;Vsa!N;r6gmEO_eJzqZ*t2wa&*CJ*3U!jq2KAf;?*J+0xQI1888tsydFK^ z*@WfWdWxM>t4cp3Kj(I)++F^Ra9WOdU8!R$~U^LVA~{38|Qzezzzob7kRhjVBBJ zVLWDs_pGpOOrgpm=P-yfwK(jm=dTFh*mwvY)ag1InFf?pDN`cj=@&-NVz9?B-bnDE z)oo``I*Wd7b2IVm-^1nR$@>x6HZilP+lTD|47kbz606*fe-;u=nLV%{L4IZ7uCh0V zj5DS2wU7B>TX+ZWg=X;WbSwkPr)fgIzKop&v0))}k_Wev*LMIF@0_Fi z=(hV0_~R4=o~OPO&4b}(<9GKKX+6}mk4T59H%B;_O%byson(Z`z&+vXstW82KP-KjlZT~z ztT}TOy6W3EHqL$TyO7|(8eyJ^SA1c`YpZWJ_RcIM0!?V<%p#29^l8sumC5q(Yy?S= zZ^pMba!7&8Tih2fFZn7?&|CJZ+PzPK&ADNIohKrdJ?L^y`pl+Lq~~Q94EL-vY_#!* zenRaIZA8ft`Z|`U$A5#}AOoq4?NYFBFsaw?Th|LB-8z&W9^H~mpFI8unRAlc29m;u1|BY&D-{k z$Hhe?7uK6k?tUXM6ac8hec5Vmm{3n^UYPWVlaS z3}@%H`SEtC@3WpqTYnTq`{~1M7}=JqhiymX0g|Wc)mD%?aeiK+z|1>Qdlh4wmO(}1 zJnl)-6MSwAv?wHGt%fG_OsbdX#9!uHV2!C-6dkwQB_4?;w6i{c0nXVv_4dNTe*fKrKqU-C`221p0y2N5JU@Wy?AIdL_@%TW46LX~cVDqWX?B!7_6{muBuBX7p>+AjG0Y{ezXGYUz zaQ)}lYFwGVZfRXw)#7QFx4mQWK1c-BPj<*f%5daulk2~?2pxz$)zI*?(-Ne(3UmjY z61X@i7|U$SRCG2z6&2S2zw4&T_AHP1LBZ~1Qnf~J1sEaFR^vqp6i?Ms>{Md$1LBq7 z-v9*;%TlzY?Cab0Fp={OdH$*^_7rz=(1%VlW!MJr!69>_1N~?~8{8=ik*bYQ0svIJ z4D07`$Lj3CV?TF+Pqe(acd(S7uqV)6LJ4~Aohbzk$bM&Iq9RdVM2N3RGE|xKaVjDf z8DxDcJ%od{PyjU7!(^?QC;4)HN}&l3A{Jsn%)Q%e2Zaggs4m@>MjzN$twPL{D>U!` zbF+z!4LsaDX^$s?BECoS;eSq~|J&#SH2pEjfB4`3?nkEt`B4ynPDSbQHa`E4 zVAI9Y%*@q=<=+Pvh*^vol7${*AVy8}-yGL}eyExalq*I`^xp{mKL&(cpk=YoME@qB zk9e_w1OU+g)BDj1`2r#mrzHBHfcl`)(EgVJv^YptoB#y{fCBi3%!ZZvr}zH=^^PF7 delta 9800 zcmZ8{1ymf(*6rXrKnQMwy9al73GS{F+?~PQXK+byLU0cW7Cd-x27*IyClKuAyYGMZ zzW;Ww)z#f+_v*E}>+GttceOwfB4ZJV->M=Z697;F=l}qK2B6~gW61>p0AOw;rbC3l z%$&Y)6U3aMJ>W95mnL`e3^fEcrjDuSJRlf(y^93ndzQ>XdS>tXNc3Js2h2|I75okf zRQmbsu61OKLzT%dd}L3V6~Z}<7a#kZQgLl~0SCgeyL-qyh>OxvW)0O# zKYD{oc(^3O*wUVoWQC=+UjBoD%q+OtWptKhFZvlZfJk)6SkS>*&NRFm!Y+x`3xP_94AJ zb<#FTOnpBFwFs3-UBHuW0Aorei@SQK>N*H|-!m`jB8?Gif4$u5W;&r)_sDl)YhfmC zvj;mq%CdQjU94-bP)b-|y3vNhh@oE+Gu)0a3Bz?i5oEgBo}WZ#~D~4O}EGPF9~tm3 z+Du_$VrCQF)u(V{$D{l~v?!MUn7VL6w-r6t=*c?uIG;+iYJq_DUH_G9dw~EqtLoRj zPbjWD*5JlGMUM~TB;W7PNjIaIo8{Bk4PI?2B^&`iUc6JQ_T@*b?CsJHl=6YZexJ|% zF=J=^gjQRsi~Kek9mgDJ{e3CMBrjy{ip~K)c&aq0WA^M9p(}|uj1se#qVBDr#Ubqe z&)65Ijuax4c>E!cD#OOtY<&X}^>WSq1Fg^zFG$3j@e7><5p#+Gx(}62C@P-~1En53 zm>+83bvbe@O!fZp3*2u#ZxiDn7t zBv3jMNMO%C;fRVmJc?PpKl2@rN2DG951dh8dsSWiNqI#Q zJ53EK*^A%^d+^XsIIyUMH=38a^QW}tBDUsnZ&2v^Wp^w#J)YItkBIm^qXeS-HtWwt z#T{8-<~MA-)WwHJ+oF!Cag6xkqq_KdN+%OEI~FmUJre zm*Z#GhnVC&_jt9nbad7%0+PK`d6nmgN3TYqfQGX69$wNF#)hHqMi7!R zJ14;0S1jTBzqH%F6g)p_`Li%AT(s>ZyV6uTG+cY!T=ZX7P5fK{q)->P3fC4M@eb8g zNqdOSQno;Zw85nv{hG}-ZEt$Vc$E3sub+fR4GUOXcf7_`DvF>ZknE z-g1RW)$!z#+Wm1Et4T<3%o2WUfv0d~pc(#3z&e&GjLLc~gYE$%Eu{_jsj?r`$a~#}*q^q%; zz#xhB$=Hjhlf}I!uv1(9{qVg{g=XrIn_y3}qy<&aV?g(0_R-M^7%sNt8$`l&an2;7 zE;(`iwB6t~v^H$DYKHGW-e(0t^Yfm468Y4bqH*P;U`~we$RM=);nUrgZg&TS!7)xa z4S3st!i2o^BuWGGlz)EOigK9-HhKEloNvSc`i5}nZ5M;uJDJZxjDEOAkX(OZw z`_gCuO~_5(UI|r20PdAHW55vagdg;ZVL3D_RvI=CRoD}(?$_MV(mY}096hxc%10J0 z*Q?AtM1G(?k>#l*X`BXw(r~HNL&tsR6Y|a^feZU4nS=Ka`;0c49wda%Z-kf->{UjA3YBgG2l>&in13;0+-mr)Y^95b z3xXou_#`A`C6s$=SQ7#W<9e`MElxjWSB~*fORvxTW{oS#9p6Yl!B2nP<&jp&#`y}p zq4Y%EAai)zV=+<&20;R~KaNe^-`+l5-M|-jhgVR$w+W3aKaAaKU?#}mq2H}@14v@S zqeM+95b9bcBOqI!w?!-~cneG07a5;!gs{;^3j3Z&EKlXqNh@b9L?GQ+7{A9HW5;&f=TSPE zjztQ0W-bPE>->`m^7Oy<4LTV{p$5l7-O;u}5j(VPE8#o`clat6Q5kOWgZYO9^_wk% z@_#exIEnSamIYe__JzClstqYr0B! zeTlcK)}pXEJxM^UYs=63ai+H^fOqm79ltTHxhdq{KRE!}+99Hpph(ou311<}ToFf; z^Xu5Ga-+Ac4|S;)QLFkmDQ_{m0-J3I0~pxRvLn1nDvQ4R0MY$v8J;|lvKw^-sq8kZ zDTevVl=-5ZTV5^HmNQuo{9XXxe3>fdDl0;GnB%bwwt7p(=pjO;qZv?{Av4=0hMc`} z@ujuQ>HlZg8e<6Dbz6YoF|!xcz{4V(Lc9#fIxY!=cLrXanxvtwNgP)w{b+f5$jD;% zu*V@}`_+$VGs^$AcIBzHtmNY@r#6z8IoWPSQ!{aw8H?>KY1l`==^4TMgjejDj8o*BO3 z;*Ngmn7Uz;SU))B6H(%`BUBOFN!kbMMCCr7QBB(-P&ThwuCpZtGpBxWl+vRL|G@78ztR70G=1$<9(O$irQm?q4i+n(V5d|H z1x3;cHx>;hJ`FTpK}*Ujffp!RKfJp%Hk}i)p}r7xTQAOo-j9e5BQ3{;1ewSc8yT<_ z8BA(|U%FcL#Y-PVy?y|Z|&_Vo5xyw|=MxZt(ZN83FVl!qzIzd@g88u3_ zx0DM_U0LtnhZ1ev-04yDDe^WqMW7LFW;)Yt4mx;;#TbKHwqBTyt@`1!l~R*U1TbtV zO~_AVmS6NpL%o=F&e)R)0y;K>n{0NOqf+oXQlZw_1jBkwK)F99eNwEofP&v%#{v)-aplXNVl9 z(q$ff8+dQKO2BPg$r%`^t+vqM>*1i`*_BD&T*qI+R{;EQ z*#vkiNPM8s0(`vzim|3G#|Cptn1oBX%-n7cQUUX4bYNc~dra%CR@~<1(@3 zxO~?D46Yd#H7XM}Aqi|B=6Lu`g?IFm=6(QJbWt;qY}CDiz#5OF$UOg}gCVI{*v)y2 z96C@Wn#L<=J$dOPeo2N?amEfDRx>#P5CU~{P5H0&bcn5H(p+zc=o`T}kg}v0;n{_o zk$1n)aD7u^(u*W5UfBT=%+5uSb@|0ou7E-FIrI`vNTUl4NG*&<2p@+69cCW0C zaEYcW1VRrT-L5J$!>^7}koC)wWaSLK6R*p*u*Pc5uIYV65fkA;=S?5`{ zwpZuD8L_D)AsHO-JN{4~<6uKn)fwaOAkg#O)6(5gW!3j#rnK@M2vtg= z$qBez;AOb>9_B*qK4yr4?t~jp%Op|Yku%R-gW#sP)T6xOZMKv1P?Z0*Yx{IjuaMFniivP+_3MBka&7A>t^UZYe3lU4~G4pS-jS$W8O=hWy? z4zpplOA}k~i~bwP_iLdMjkKlkT-d(T@1_Wa+=HOujHw8G&99g~VM1u0e)V(xmAlga z&O(6ySMKui4sf>f`lp^N(>F=Y7a|C?`1sOyDcaA_D@CBqKT554sLwW0GiQLNHdM_u zBGhba@UR}_jBaILEl_wF5hC1I|JqW{uKXP%F3R&m@ilLf zh^(!8M@CC8(+bMw?;}45RGOCpVnJ4p!qYI1*5ky_;NyUcajGbt_9ucn9xbZ8st)0m z97Md$yj)Hd8~$c;eALQ{zIt~!QaA5dxML-j=i^&Yg@{a-1i8l#17GA%Er+@`u7E`Y z^?uerD{HG$A=+kXfO{7ut1Yyd=B1z~g|el^>-gpC&711K)L*5o2&T@3Y*dw7-s@FE zY{8O7n;%-ueO^kKzDeU-L%OO749G~8D7p9PGtfmGoiQAOCTfgjk+&=%w_s#e%Xdq#_u0LD9Tev=$%F4pflF^7h9+b^IEjDkzr7^S z#~Ui$pLoUg6B!N2Bs_T?E=+T+Tr8(r&E9sKY}!{n}iQ{;YM;w9^qZjmkF2d zFKl#NSOU4kT#j42qxSm@7rxE#%m6$T_>05uVYgMP)6Pg%+-JaIj8-kG+Y@GN+Kg}3p@Kho!$C3qA$EnD zSuq&jbH#{>HDZH%EsCHSpO#?cEtNskUbPvHPr#(!H8J zuR``{#F28-sm(jDT0YJyDIqQ#5b`y{$b&kgiE|tt+5Fz!&CC`uUwx*p@$zuEv1RH& zyuWA7O1wY3xAmCw?5zq73?(pnhM4K+7VN9UKEy79Na7Ft$?8HV%>F9QUC-CQ(LW1r zuFOxwE*W>YJi?>QL~~OlqaQw8e;=RgRLgQ3!2NuW^2g$_fx9P)TrXY1fqWo`n@7US zyzY;9Ma0-B!)u_E8Xw97xU~9Q7Ba4+u@3AQaLeQ;*)Com10yNViwPJ-4h*uQDfwHzSMsNt{c&(0OrzS_1(H@Lg9(DYniLo@ToBkyNmuYRv5mfbFuZdJIQ5Hq&q$C$ zxjr;`hGJAt$M`FMpLLZd?BJ2fii2cxWt6VX7v4N9;&JzcX-~jt@^P=CbC&Bi+E3wCZW^*LPZ!Kzp zqQOUW!Y!#7l^*0I#jb&VI$2sV>$=+H9Tf!L%G=@ctTjiD>3zj2R#_^r))}Mf0eo&6 zgLzdV(QOGf5I35M7i1;hPJ->kz;ayVA^8n;w)xl&)@4*oJ>CQG==3n{BD8+$T1L7> zlK7S(EX~(`6thuITcJrFr&ZPKqn`g1gA{@$h3+L(wSEe^}jIj&y3nn_GX6SJG;Jqyk zF~#xS8~VOIDZat)g*9i$?&C;^gHY=)`p2q8rSl>NYilq51Xx*iT))>i20c{Z9W+6_&{efd~#nx^`-8oCLGldkNh~L8B}}?%Kaf(C8Ia6 zHx7Y&}vGcM1}XTl3_=-f{qCOT_o;Tc{pcu5nDKD%^S> zOF6rJ84jou(K&pXFr~KjsqkJ9pI8{rc1|f}FM&hUagPxd{_NRE2{XDjDP@-F4E&~h z9?6r3KU-hD<2L{&L`hLa6@4q=ApbZM$~g`#6nzbW;f~PdBTr3Y-G|A^EHjqz$zXFF zn(BYa_T8-Y{qxRVnHD~6SX_eWlvuAS6@(Z#G6DNsTnp5^#nltH#juHS;N-jHFjFf& zsw=n^cfhS}?R2^=5J@8*fAr&v4`)7FSujQWloo0#c@o2SEgrL9^8^>ZpUz@$QF193 zNGyZwyFUA~(A`aUfn<503=`1<*65fB<5FnROd7+p#|Hcq-ehg*%}o;164*|+ zK+B?yhPYp9eL|dMl@j(XIg%vas#8IMX8rRmTn9==mE{Ry_BZdiwkOSJ^ zsxh3UV_t~7)l&l7SxeJ$y>QYD#VQ++sQHZNI_JrkORFcg1m-iZCeZK@8IcR-a)3)n zt1oX;dMaC7<6EHJ`8^qfB^Aa;CXlGKb97k-_-$#{=VX&QZXgRW$|}hp)7oht>vI_? zF2XCtcS&Z)H$H^xei)n z-vfX#Ecg9ON>q-|an$wL5-_#BhDFoebPg`6%Wwirs(!UE6?}p(sRPdxWpDL0M8K#>a9@y1e3<}Ei$~D! zQ%n2xNq4gAN9d^i%F5s?24YD?H%8QF$}wN+hou#oZs_E1u~b(~_90Hkx|KSrpV2Q? zRe3yi^rY<}cW=cS19X?#l`hCKeg28o^9@){L{5CHyEEhD5+a0Nfk~^Y|FSSdLH1WO~!j?7B1svM*LCZSw)QgCO697@#?2&8fFHiO&M|7I>A| zFZje(P0Uc?+@H{cwK#uN0@Ch3WP`Z>GG9enB1yXbny>lI&DVe2r8C&Hv61wHt)D-ezqoBiNTFA?@lFFs)y77Xv+R&E* ziz(j8Ua$pe`{=#R{mJ1Q2aL>5up1oD(6E8CgjCQD>mR@?AiV++=k8Z&Y%H1#*AIGN z1v|yem0^7v%54S6U8exqY1hxzjkl||%e7(R5uZMEVwTc?D2OS$7h5hn_f<7_Y4F@> zdZH|o^_9$sB$HDl?#Y%HyHME%TbScrfq}$8ssy!!1FgE&rUv@$5`BSig8isqz&x%? zxtdz^vHMK80?3FL0(KTG5dVZMt}!V}nCz)!*ySulVM7kVAKFSyh0g&jGa`^>FG)07 zXP3qk@3o}*${Pepq^jKG4ar*4Lg(2CFMwP3u)7iNa~vT)v+ETTeHro^nY=+#pQKP( z%%2RFsuq3nf~7lOWS8W4tCmKuTsBvZIH-kNPbRx+G73Y_1GR@Yb*5f?l;5N6;^>l; zf`(jXZI}h0#I*{Df^uO;3i4IIIFk}VuP?)W$z0r;oMCV&pC4a2uNTk3{!W88C+rWf z8DRcSnNUyl76|$xPmJ;JRZUQkf;MKwU&X<~)6Ut8D?^cx{J)pn{H><`XB&flQlO>y zw+D|50FeE^(7)O(Oz44v0s;q=Tak_8KaMm2K=Qv}-(Ev`6bYe8ii8yZaa{fz+^P_Mt2jSSFKB?gNBmQ4QhDsB$wv=WHoze~ga01~*N zSjvnP|DuY&{n`N$0C@GE=-;yn5vaN{9mT&p{@?F}_rCzl;?N{zG92Jvu}1rEkX-7Y G=>Gw;+;zPG diff --git a/DeFi-Data-Engine/DeFi Data Engine/config/app.properties b/DeFi-Data-Engine/DeFi Data Engine/config/app.properties index 88474e56..1b081086 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/config/app.properties +++ b/DeFi-Data-Engine/DeFi Data Engine/config/app.properties @@ -3,6 +3,9 @@ # delimiter used for internal processing general.internal.delim=,., +# data delimiter +general.data.delim=, + # enable all packet logging general.logging.packets=true diff --git a/DeFi-Data-Engine/DeFi Data Engine/config/output.properties b/DeFi-Data-Engine/DeFi Data Engine/config/output.properties deleted file mode 100644 index 8c2b5ca9..00000000 --- a/DeFi-Data-Engine/DeFi Data Engine/config/output.properties +++ /dev/null @@ -1,28 +0,0 @@ -# === GENERAL PROPERTIES === - -consumer.types=null -producer.types=null - -# === REST SOCKET PROPERTIES === - -# Rest socket address -rest.socket.address=localhost - -# Rest socket port -rest.socket.port=61100 - -# Rest socket key -rest.socket.key=rest-key-reserved - -# === OUTPUT SOCKET PROPERTIES === - -# Output socket address -output.socket.address=localhost - -# Output socket port -output.socket.port=61200 - -# === LOCAL STREAM PROPERTIES === - -# local stream type used for database configuration -local.stream.type=mongo_db \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/config/stream.properties b/DeFi-Data-Engine/DeFi Data Engine/config/stream.properties new file mode 100644 index 00000000..ec810c17 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/config/stream.properties @@ -0,0 +1,48 @@ +# === GENERAL PROPERTIES === + +# consumer types for accepting input +general.consumer.types=null + +# producer types for writing output +general.producer.types=null + +# === REST SOCKET PROPERTIES === + +# Rest socket address +rest.socket.address=localhost + +# Rest socket port +rest.socket.port=61100 + +# Rest socket key +rest.socket.key=rest-key-reserved + +# === OUTPUT SOCKET PROPERTIES === + +# Output socket address +output.socket.address=localhost + +# Output socket port +output.socket.port=61200 + +# === LOCAL STREAM PROPERTIES === + +# local stream type used for database configuration +local.stream.type=mongo_db + +# === MONGO DB PROPERTIES === + +# local MongoDB client URI +mongodb.properties.uri=mongodb://localhost:27017 + +# local MongoDB state database name +mongodb.database.state=main-state-db + +# local MongoDB main database name +mongodb.database.main=main-db + +# authorization collection +mongodb.auth.collection=auth-collection + +# query delim +mongodb.query.delim=, \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/config/testing.properties b/DeFi-Data-Engine/DeFi Data Engine/config/testing.properties new file mode 100644 index 00000000..ff2bbc03 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/config/testing.properties @@ -0,0 +1,3 @@ +# === TESTING PROPERTIES === +lsh.authorized=true +lsh.ready=true \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/pom.xml b/DeFi-Data-Engine/DeFi Data Engine/pom.xml index 3df3471f..a1b5e418 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/pom.xml +++ b/DeFi-Data-Engine/DeFi Data Engine/pom.xml @@ -29,7 +29,7 @@ org.mongodb mongo-java-driver - 2.12.3 + 3.12.11 com.squareup.okhttp3 diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java index 33c006a8..34196b7b 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/engine/Engine.java @@ -19,9 +19,12 @@ public Response processSTRT(Packet packet) { return out_response; // start local stream handler processes: - Response lsh_response = send("LSH", "INIT", Config.getProperty("output", "local.stream.type")); - if(lsh_response.code() != 200) - return lsh_response; + String lsh_type = Config.getProperty("stream", "local.stream.type"); + if(!lsh_type.equals("null")) { + Response lsh_response = send("LSH", "INIT", lsh_type); + if(lsh_response.code() != 200) + return lsh_response; + } return ResponseFactory.response200(); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java index 528488b9..e9e326a0 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/core/logger/Logger.java @@ -1,6 +1,7 @@ package org.core.logger; -import java.time.Instant; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import org.framework.router.Packet; import org.framework.router.Response; @@ -26,20 +27,26 @@ public static final void terminate(Response response) { } private static final String packetFormat(Packet packet) { - return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%s] [%s]", - Instant.now().toString(), + return String.format("[%s] [%-10s] PACKET - [%3s -> %3s] [%4s] [%s] [%s]", + time(), Thread.currentThread().getName(), packet.getSender(), packet.getTag(), + packet.getSubTag(), packet.getData(), packet.getSubData()); } private static final String responseFormat(Response response) { return String.format("[%s] [%-10s] RESPONSE - [%3d] [%s]", - Instant.now().toString(), + time(), Thread.currentThread().getName(), response.code(), response.data()); } + + private static final String time() { + return LocalDateTime.now() + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.nnnnnnnnn")); + } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java index ffa3ecfd..cc71631a 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Response.java @@ -95,7 +95,7 @@ public String data() { * @param message Response message of the {@link Response} object. Uses {@link String#format(String, Object...)} for formatting with {@code args} parameter. * @return New {@link Response} object formatted based on the passed parameters. */ - public static Response create(int code, String message) { + protected static Response create(int code, String message) { Response response = new Response(code, message); if(log) Logger.log(response); @@ -111,7 +111,7 @@ public static Response create(int code, String message) { * @param data {@link String} of data to be returned in the response. * @return New {@link Response} object formatted based on the passed parameters. */ - public static Response create(int code, String message, String data) { + protected static Response create(int code, String message, String data) { Response response = new Response(code, message, data); if(log) Logger.log(response); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java index 39228d78..7cdb11db 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/ResponseFactory.java @@ -1,5 +1,7 @@ package org.framework.router; +import java.util.Arrays; + public class ResponseFactory { public static void responseNotHandled(String message) { @@ -92,8 +94,8 @@ public static Response response440(String source) { return Response.create(440, String.format("Requested data source <%s> does not exist in cache.", source)); } - public static Response response441(String hash) { - return Response.create(441, String.format("Requested data stream with given hash <%s> does not exist in cache.", hash)); + public static Response response441(String source) { + return Response.create(441, String.format("Local data stream with source <%s> is not ready to handle queries.", source)); } public static Response response442(String source) { @@ -101,7 +103,7 @@ public static Response response442(String source) { } public static Response response443(String source) { - return Response.create(443, String.format("Local data stream with given source <%s> already exists.", source)); + return Response.create(443, String.format("Local data stream with source <%s> already exists.", source)); } public static Response response444(String source) { @@ -109,7 +111,23 @@ public static Response response444(String source) { } public static Response response445(String source, String query) { - return Response.create(445, String.format("Local data stream with given source <%s> could not validate passed query <%s>", query)); + return Response.create(445, String.format("Local data stream with source <%s> could not validate passed query <%s>.", source, query)); + } + + public static Response response446(String source, String query) { + return Response.create(446, String.format("Local data stream with source <%s> does not contain data from requested query <%s>.", source, query)); + } + + public static Response response447(String source, String query) { + return Response.create(447, String.format("Local data stream with source <%s> failed to process the query <%s>.", source, query)); + } + + public static Response response448(String source, String query) { + return Response.create(448, String.format("Local data stream with source <%s> failed to retrieve state with the query <%s>.", source, query)); + } + + public static Response response449(String source, String data, String... location) { + return Response.create(449, String.format("Local data stream with source <%s> failed to push data point <%s> to given location <%s>", source, data, Arrays.toString(location))); } public static Response response460(String consumer) { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java index 95e00451..eee258fc 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/consumers/SocketConsumer.java @@ -26,14 +26,14 @@ public String getUUID() { @Override protected boolean init() { - int port = Integer.parseInt(Config.getProperty("output", "rest.socket.port")); + int port = Integer.parseInt(Config.getProperty("stream", "rest.socket.port")); // create server if(!SocketManager.createServer(port)) return false; // accept inflow from REST - final String key = Config.getProperty("output", "rest.socket.key"); + final String key = Config.getProperty("stream", "rest.socket.key"); if(!SocketManager.accept(port, key)) return false; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java index d9818b63..d274dca0 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java @@ -61,7 +61,7 @@ private void reflect() throws InstantiationException, IllegalAccessException, Il private void load() { // load consumers: - String[] consumer_types = Config.getProperty("output", "consumer.types").replaceAll(" ", "").split(","); + String[] consumer_types = Config.getProperty("stream", "general.consumer.types").replaceAll(" ", "").split(","); for(String type : consumer_types) { if(type.equals("null")) continue; @@ -77,7 +77,7 @@ private void load() { } // load producers: - String[] producer_types = Config.getProperty("output", "producer.types").replaceAll(" ", "").split(","); + String[] producer_types = Config.getProperty("stream", "general.producer.types").replaceAll(" ", "").split(","); for(String type : producer_types) { if(type.equals("null")) continue; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java index c59aea56..d2fd265f 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java @@ -24,7 +24,7 @@ protected boolean init() { listener = new Thread() { public void run() { while(true) { - String key = SocketManager.accept(Integer.parseInt(Config.getProperty("output", "output.socket.port"))); + String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port"))); if(key == null) { System.err.println("SocketProducer: Could not create connection to socket port."); System.exit(1); diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/MongoDatabaseConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/MongoDatabaseConnection.java new file mode 100644 index 00000000..545b6805 --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/MongoDatabaseConnection.java @@ -0,0 +1,111 @@ +package org.stream.local.connected.connections; + +import java.util.Set; + +import org.bson.Document; +import org.bson.conversions.Bson; +import org.properties.Config; +import org.stream.local.connected.mongodb.MongoDatabaseRequestHandler; +import org.stream.local.handler.DataState; +import org.stream.local.handler.LocalStreamConnection; +import org.stream.local.handler.LocalStreamManager; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; + +// https://www.mongodb.com/docs/drivers/java/sync/current/fundamentals/builders/filters/ +public class MongoDatabaseConnection extends LocalStreamConnection { + + private MongoClient client; + private MongoDatabase state_db; + private MongoDatabase main_db; + + private boolean authorized = false; + + public MongoDatabaseConnection(LocalStreamManager manager) { + super(manager); + } + + @Override + public String getUUID() { + return "mongo_db"; + } + + public boolean init() { + client = new MongoClient(new MongoClientURI(Config.getProperty("stream", "mongodb.properties.uri"))); + state_db = client.getDatabase(Config.getProperty("stream", "mongodb.database.state")); + main_db = client.getDatabase(Config.getProperty("stream", "mongodb.database.main")); + return true; + } + + @Override + public boolean authorize() { + if(client == null || state_db == null || main_db == null) + return false; + + Bson filter = Filters.eq("title", "test"); + MongoCollection collection = main_db.getCollection(Config.getProperty("stream", "mongodb.auth.collection")); + collection.deleteOne(filter); + + Document document = new Document("title", "test"); + collection.insertOne(document); + + Document resolved = collection.find().filter(filter).first(); + + authorized = resolved != null && resolved.get("title").equals("test"); + + return authorized; + } + + @Override + public boolean isAuthorized() { + return authorized; + } + + @Override + public boolean isReady() { + return authorized; + } + + @Override + public boolean validate(String... query) { + // parse query + return MongoDatabaseRequestHandler.validate(query); + } + + @Override + public boolean contains(String... query) { + if(!validate(query)) + return false; + + return MongoDatabaseRequestHandler.contains(main_db, query); + } + + @Override + public DataState state(String... query) { + // TODO Integrate state handler + return null; + } + + @Override + public Set get(String... query) { + if(!validate(query)) + return null; + + return MongoDatabaseRequestHandler.get(main_db, query); + } + + @Override + public boolean push(String data, String collection) { + return MongoDatabaseRequestHandler.push(main_db, data, collection); + } + + @Override + public boolean modify(String data, String... query) { + // TODO Auto-generated method stub + return false; + } +} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateConnection.java deleted file mode 100644 index 05a9c049..00000000 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateConnection.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.stream.local.connected.connections; - -public class TemplateConnection { - -} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java index 57b73488..03df5ffc 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/connections/TemplateLocalConnection.java @@ -1,5 +1,86 @@ package org.stream.local.connected.connections; -public class TemplateLocalConnection { +import java.util.HashSet; +import java.util.Set; +import org.properties.Config; +import org.stream.local.handler.DataState; +import org.stream.local.handler.LocalStreamConnection; +import org.stream.local.handler.LocalStreamManager; + +public class TemplateLocalConnection extends LocalStreamConnection { + + public TemplateLocalConnection(LocalStreamManager manager) { + super(manager); + } + + @Override + public String getUUID() { + return "local_template"; + } + + public boolean init() { + return true; + } + + @Override + public boolean authorize() { + Config.setProperty("testing", "lsh.authorized", "true"); + return true; + } + + @Override + public boolean isAuthorized() { + return Config.getProperty("testing", "lsh.authorized").equals("true"); + } + + @Override + public boolean isReady() { + return Config.getProperty("testing", "lsh.ready").equals("true"); + } + + @Override + public boolean validate(String... query) { + return query[0].equals("valid") || + query[0].equals("dne") || + query[0].equals("irregular"); + } + + @Override + public boolean contains(String... query) { + return query[0].equals("valid") || query[0].equals("irregular"); + } + + @Override + public DataState state(String... query) { + if(query[0].equals("inv")) + return DataState.INVALID; + else if(query[0].equals("dne")) + return DataState.DOES_NOT_EXIST; + else if(query[0].equals("partial")) + return DataState.PARTIAL; + else if(query[0].equals("valid")) + return DataState.EXISTS; + else if(query[0].equals("modified")) + return DataState.MODIFIED; + else if(query[0].equals("corrupted")) + return DataState.CORRUPTED; + return DataState.INVALID; + } + + @Override + public Set get(String... query) { + if(query[0].equals("irregular")) + return null; + return new HashSet(); + } + + public boolean push(String data, String collection) { + return collection.equals("valid"); + } + + @Override + public boolean modify(String data, String... query) { + return true; + } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java new file mode 100644 index 00000000..feb11bad --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/connected/mongodb/MongoDatabaseRequestHandler.java @@ -0,0 +1,214 @@ +package org.stream.local.connected.mongodb; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Set; + +import org.bson.Document; +import org.properties.Config; + +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; + +public class MongoDatabaseRequestHandler { + + private final static HashMap translations; + private final static HashMap queries; + private final static HashMap requests; + +// define translation +// note translation maps all get, modify, and set queries to contains queries +// note alerts the engine to skip contains protocol +static { + translations = new HashMap(); + + translations.put("get_all", "contains_collection"); + translations.put("get_item", "contains_item"); +} + +// define queries +static { + queries = new HashMap(); + + // contains queries: + queries.put("contains_collection", 1); + queries.put("contains_type", 2); + queries.put("contains_item", 3); + + // get queries: + queries.put("get_all", 1); + queries.put("get_item", 3); +} + +// define requests +static { + requests = new HashMap(); + + Class classobj = MongoDatabaseRequestHandler.class; + + try { + // contains requests: + requests.put("contains_collection", classobj.getMethod("containsCollection", MongoDatabase.class, String[].class)); + requests.put("contains_type", classobj.getMethod("containsType", MongoDatabase.class, String[].class)); + requests.put("contains_item", classobj.getMethod("containsItem", MongoDatabase.class, String[].class)); + + // get requests: + requests.put("get_all", classobj.getMethod("getAll", MongoDatabase.class, String[].class)); + requests.put("get_item", classobj.getMethod("getItem", MongoDatabase.class, String[].class)); + } catch(Exception e) { + e.printStackTrace(); + System.exit(1); + } +} + + public final static boolean validate(String... query) { + // validate not empty + if(query.length == 0) + return false; + + // validate contains query name + if(!queries.containsKey(query[0]) || !requests.containsKey(query[0])) + return false; + + // validate parameter length + return query.length == queries.get(query[0]) + 1; + } + + // contains functions: + public final static boolean contains(MongoDatabase db, String... query) { + if(!validate(query)) + return false; + + for(int i = 0; i < query.length; i++) + query[i] = query[i].trim(); + + String[] copy = Arrays.copyOfRange(query, 0, query.length); + if(translations.containsKey(copy[0])) { + if(translations.get(copy[0]).equals("skip")) + return true; + else + copy[0] = translations.get(copy[0]); + } + + try { + return (boolean)requests.get(copy[0]).invoke(null, db, copy); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + e.printStackTrace(); + System.exit(1); + } + + return false; + } + + public final static boolean containsCollection(MongoDatabase db, String[] query) { + if(!query[0].equals("contains_collection")) + return false; + + MongoCursor itr = db.listCollectionNames().iterator(); + while(itr.hasNext()) { + if(itr.next().equals(query[1])) + return true; + } + + return false; + } + + public final static boolean containsType(MongoDatabase db, String[] query) { + if(!query[0].equals("contains_type")) + return false; + + if(!containsCollection(db, new String[] {"contains_collection", query[1]})) + return false; + + Document document = db.getCollection(query[1]).find().first(); + for(Entry type : document.entrySet()) + if(type.getKey().equals(query[2])) + return true; + + return false; + } + + public final static boolean containsItem(MongoDatabase db, String[] query) { + if(!query[0].equals("contains_item")) + return false; + + if(!containsType(db, new String[] {"contains_type", query[1], query[2]})) + return false; + + return db.getCollection(query[1]).find(Filters.eq(query[2], query[3])).first() != null; + } + + // get functions: + @SuppressWarnings("unchecked") + public final static Set get(MongoDatabase db, String[] query) { + if(!validate(query)) + return null; + + try { + return (Set)requests.get(query[0]).invoke(null, db, query); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + e.printStackTrace(); + System.exit(1); + } + + return null; + } + + public final static Set getAll(MongoDatabase db, String[] query) { + if(!query[0].equals("get_all")) + return null; + + if(!containsCollection(db, new String[] {"contains_collection", query[1]})) + return null; + + MongoCollection collection = db.getCollection(query[1]); + Set out = new HashSet(); + MongoCursor itr = collection.find().iterator(); + while(itr.hasNext()) + out.add(itr.next().toJson()); + + return out; + } + + public final static Set getItem(MongoDatabase db, String... query) { + if(!query[0].equals("get_item")) + return null; + + if(!containsItem(db, new String[] {"contains_item", query[1], query[2], query[3]})) + return null; + + for(int i = 0; i < query.length; i++) + query[i] = query[i].trim(); + + MongoCollection collection = db.getCollection(query[1]); + Set out = new HashSet(); + MongoCursor itr = collection.find(Filters.eq(query[2], query[3])).iterator(); + while(itr.hasNext()) + out.add(itr.next().toJson()); + + return out; + } + + public final static boolean push(MongoDatabase db, String data, String collection_name) { + MongoCollection collection = db.getCollection(collection_name); + + String[] split_data = data.split(Config.getProperty("app", "general.data.delim")); + // validate that every type has an id associated. + if(split_data.length % 2 != 0) + return false; + + Document document = new Document(); + for(int i = 0; i < split_data.length; i+=2) + document.append(split_data[i], split_data[i + 1]); + + document.append("_timestamp", System.nanoTime()); + collection.insertOne(document); + return true; + } +} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java index 20251152..37b711f1 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamConnection.java @@ -1,6 +1,7 @@ package org.stream.local.handler; -import org.framework.interfaces.Hash; +import java.util.Set; + import org.framework.interfaces.UUID; public abstract class LocalStreamConnection implements UUID { @@ -11,12 +12,14 @@ public LocalStreamConnection(LocalStreamManager manager) { this.manager = manager; } + public abstract boolean init(); public abstract boolean authorize(); public abstract boolean isAuthorized(); public abstract boolean isReady(); public abstract boolean validate(String... query); public abstract boolean contains(String... query); public abstract DataState state(String... query); - public abstract String get(String... query); + public abstract Set get(String... query); + public abstract boolean push(String data, String collection); public abstract boolean modify(String data, String... query); } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java index 01a34967..0f4a6879 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamHandler.java @@ -1,9 +1,12 @@ package org.stream.local.handler; +import java.util.Set; + import org.framework.router.Packet; import org.framework.router.Response; import org.framework.router.ResponseFactory; import org.framework.router.Router; +import org.properties.Config; public class LocalStreamHandler extends Router { @@ -32,22 +35,110 @@ public Response processINIT(Packet packet) { if(!manager.authorize() || !manager.isAuthorized()) return ResponseFactory.response444(source); + if(!manager.isReady()) + return ResponseFactory.response441(source); + return ResponseFactory.response200(); } public Response processSCAN(Packet packet) { - // data: query if(packet.getData().isEmpty()) return ResponseFactory.response500("LocalStreamHandler", "query"); - if(!manager.validate(packet.getData())) + if(!manager.isReady()) + return ResponseFactory.response441(manager.streamType()); + + String[] query = packet.getData().split(Config.getProperty("stream", "mongodb.query.delim")); + + if(!manager.validate(query)) return ResponseFactory.response445(manager.streamType(), packet.getData()); - if(manager.scan(packet.getData())) + if(manager.scan(query)) return ResponseFactory.response200("true"); return ResponseFactory.response200("false"); } //public Response process + public Response processRQST(Packet packet) { + if(packet.getData().isEmpty()) + return ResponseFactory.response500("LocalStreamHandler", "query"); + + if(!manager.isReady()) + return ResponseFactory.response441(manager.streamType()); + + String[] query = packet.getData().split(Config.getProperty("stream", "mongodb.query.delim")); + + if(!manager.validate(query)) + return ResponseFactory.response445(manager.streamType(), packet.getData()); + + if(!manager.scan(query)) + return ResponseFactory.response446(manager.streamType(), packet.getData()); + + Set out = manager.get(query); + + if(out == null) + return ResponseFactory.response447(manager.streamType(), packet.getData()); + + // TODO: push to output + + return ResponseFactory.response200(); + } + + public Response processSTAT(Packet packet) { + if(packet.getData().isEmpty()) + return ResponseFactory.response500("LocalStreamHandler", "query"); + + if(!manager.isReady()) + return ResponseFactory.response441(manager.streamType()); + + String[] query = packet.getData().split(Config.getProperty("stream", "mongodb.query.delim")); + + if(!manager.validate(query)) + return ResponseFactory.response445(manager.streamType(), packet.getData()); + + if(!manager.scan(query)) + return ResponseFactory.response446(manager.streamType(), packet.getData()); + + DataState state = manager.state(query); + if(state == DataState.INVALID) + return ResponseFactory.response448(manager.streamType(), packet.getData()); + + return ResponseFactory.response200(state.toString()); + } + + public Response processMODI(Packet packet) { + return ResponseFactory.response501(); +// if(packet.getData().isEmpty()) +// return ResponseFactory.response500("LocalStreamHandler", "query"); +// +// if(!manager.isReady()) +// return ResponseFactory.response441(manager.streamType()); +// +// if(!manager.validate(packet.getData())) +// return ResponseFactory.response445(manager.streamType(), packet.getData()); +// +// if(!manager.scan(packet.getData())) +// return ResponseFactory.response446(manager.streamType(), packet.getData()); +// + } + + public Response processPUSH(Packet packet) { + // data format: data, location... + if(packet.getData().isEmpty()) + return ResponseFactory.response500("LocalStreamHandler", "data, collection"); + + String[] data = packet.getData().split(Config.getProperty("app", "general.internal.delim")); + if(data.length != 2) + return ResponseFactory.response500("LocalStreamHandler", "collection"); + String collection = data[1]; + + if(!manager.isReady()) + return ResponseFactory.response441(manager.streamType()); + + if(!manager.push(data[0], collection)) + return ResponseFactory.response449(manager.streamType(), data[0], collection); + + return ResponseFactory.response200(); + } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java index fe427cee..4f4c07b6 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/stream/local/handler/LocalStreamManager.java @@ -5,8 +5,6 @@ import java.util.Set; import org.reflections.Reflections; -import org.stream.external.handler.ExternalStreamConnection; -import org.stream.external.handler.ExternalStreamManager; public class LocalStreamManager { @@ -30,8 +28,14 @@ protected LocalStreamManager(LocalStreamHandler handler) { private void reflect() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException { Reflections reflection = new Reflections("org.stream.local.connected.connections"); Set> types = reflection.getSubTypesOf(LocalStreamConnection.class); - for(Class c : types) - templates.put(c.getDeclaredConstructor(LocalStreamManager.class, String.class).newInstance(this, "").getUUID(), c); + for(Class c : types) { + String uuid = c.getDeclaredConstructor(LocalStreamManager.class).newInstance(this).getUUID(); + if(templates.containsKey(uuid)) { + System.err.println(String.format("Local stream UUID <%s> is not unique.", uuid)); + System.exit(1); + } + templates.put(uuid, c); + } } protected boolean containsTemplate(String type) { @@ -47,14 +51,13 @@ protected boolean setStream(String type) { return false; try { - this.stream = templates.get(type).getDeclaredConstructor(LocalStreamManager.class).newInstance(this); - return true; + stream = templates.get(type).getDeclaredConstructor(LocalStreamManager.class).newInstance(this); } catch(Exception e) { e.printStackTrace(); System.exit(1); } - return false; + return stream.init(); } protected String streamType() { @@ -101,13 +104,20 @@ public DataState state(String... query) { return stream.state(query); } - protected String get(String... query) { - if(!validate(query)) + protected Set get(String... query) { + if(!isReady() || !validate(query)) return null; return stream.get(query); } + protected boolean push(String data, String collection) { + if(!isReady()) + return false; + + return stream.push(data, collection); + } + protected boolean modify(String data, String... query) { if(!validate(query)) return false; diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/framework/router/TestRouter.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/framework/router/TestRouter.java index bad2c908..c45f751d 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/framework/router/TestRouter.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/framework/router/TestRouter.java @@ -20,10 +20,6 @@ public void defineProcesses() throws NoSuchMethodException, SecurityException { p1.setAccessible(true); addProcess("", p1); } - - public Response process1(Packet packet) { - return Response.create(1, ""); - } } class Router2 extends Router { @@ -31,10 +27,6 @@ class Router2 extends Router { public Router2() { super("Router2", "RT2"); } - - public Response process1(Packet packet) { - return Response.create(2, ""); - } } class RouterTemplate extends Router { @@ -49,10 +41,6 @@ public RouterTemplate(int num, String uuid, String tag) { super(uuid, tag); this.num = num; } - - public Response process1(Packet packet) { - return Response.create(num, ""); - } } public class TestRouter { diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/lsh/mongodb/TestMongoDatabase.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/lsh/mongodb/TestMongoDatabase.java new file mode 100644 index 00000000..42d45a2b --- /dev/null +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/lsh/mongodb/TestMongoDatabase.java @@ -0,0 +1,97 @@ +package test.lsh.mongodb; + +import static org.junit.Assert.assertEquals; + +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.core.core.Core; +import org.junit.Test; +import org.properties.Config; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; + +public class TestMongoDatabase { + +static { + // disable loggers + LogManager.getRootLogger().setLevel(Level.OFF); + Config.setProperty("stream", "general.consumer.types", "null"); + Config.setProperty("stream", "general.producer.types", "null"); + Config.setProperty("stream", "local.stream.type", "mongo_db"); + + // add testing database to system + Bson filter = Filters.eq("element1", "e1"); + MongoClient client = new MongoClient(new MongoClientURI(Config.getProperty("stream", "mongodb.properties.uri"))); + MongoDatabase db = client.getDatabase("testing"); + MongoCollection collection = db.getCollection("test-mongo-database"); + collection.deleteMany(filter); + collection.insertOne(new Document().append("_timestamp", System.nanoTime()).append("element1", "e1")); +} + + @Test + public void TestINIT() { + Config.setProperty("stream", "mongodb.database.main", "testing"); + new Core(); + } + + @Test + public void TestSCAN() { + Config.setProperty("stream", "mongodb.database.main", "testing"); + Core core = new Core(); + + // test contains_collection + assertEquals(200, core.send("LSH", "SCAN", "contains_collection, test-mongo-database").code()); + assertEquals("true", core.send("LSH", "SCAN", "contains_collection, test-mongo-database").data()); + assertEquals("false", core.send("LSH", "SCAN", "contains_collection, dne").data()); + assertEquals(445, core.send("LSH", "SCAN", "contains_collection, test-mongo-database, invalid").code()); + + // test contains_type + assertEquals(200, core.send("LSH", "SCAN", "contains_type, test-mongo-database, element1").code()); + assertEquals("true", core.send("LSH", "SCAN", "contains_type, test-mongo-database, element1").data()); + assertEquals("false", core.send("LSH", "SCAN", "contains_type, test-mongo-database, dne").data()); + assertEquals(445, core.send("LSH", "SCAN", "contains_type, test-mongo-database, element1, invalid").code()); + + // test contains_item + assertEquals(200, core.send("LSH", "SCAN", "contains_item, test-mongo-database, element1, e1").code()); + assertEquals("true", core.send("LSH", "SCAN", "contains_item, test-mongo-database, element1, e1").data()); + assertEquals("false", core.send("LSH", "SCAN", "contains_item, test-mongo-database, element1, dne").data()); + assertEquals(445, core.send("LSH", "SCAN", "contains_item, test-mongo-database, element1, e1, invalid").code()); + + assertEquals(500, core.send("LSH", "SCAN", "").code()); + } + + @Test + public void TestRQST() { + Config.setProperty("stream", "mongodb.database.main", "testing"); + Core core = new Core(); + + // test get_all + assertEquals(200, core.send("LSH", "RQST", "get_all, test-mongo-database").code()); + assertEquals(445, core.send("LSH", "RQST", "get_all, test-mongo-database, invalid").code()); + assertEquals(446, core.send("LSH", "RQST", "get_all, dne").code()); + + // test get_item + assertEquals(200, core.send("LSH", "RQST", "get_item, test-mongo-database, element1, e1").code()); + assertEquals(446, core.send("LSH", "RQST", "get_item, test-mongo-database, element1, e2").code()); + + assertEquals(500, core.send("LSH", "RQST", "").code()); + } + + @Test + public void TestPUSH() { + Config.setProperty("stream", "mongodb.database.main", "testing"); + Core core = new Core(); + + assertEquals(200, core.send("LSH", "PUSH", "element1, e1,.,test-mongo-database").code()); + assertEquals(200, core.send("LSH", "PUSH", "element1, e1, element2, e2,.,test-mongo-database").code()); + assertEquals(449, core.send("LSH", "PUSH", "element1, e1, invalid,.,test-mongo-database").code()); + + assertEquals(500, core.send("LSH", "PUSH", "invalid").code()); + } +} diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestESH.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestESH.java index 5d6011dc..a7282476 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestESH.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestESH.java @@ -7,12 +7,16 @@ import org.core.core.Core; import org.framework.router.Response; import org.junit.Test; +import org.properties.Config; public class TestESH { static { // disable loggers LogManager.getRootLogger().setLevel(Level.OFF); + Config.setProperty("output", "consumer.types", "null"); + Config.setProperty("output", "producer.types", "null"); + Config.setProperty("output", "local.stream.type", "null"); } @Test diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java index 17ca01d2..af60af21 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestLSH.java @@ -1,5 +1,111 @@ package test.protocols; +import static org.junit.Assert.assertEquals; + +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.core.core.Core; +import org.junit.Test; +import org.properties.Config; + public class TestLSH { +static { + // disable loggers + LogManager.getRootLogger().setLevel(Level.OFF); + Config.setProperty("stream", "general.consumer.types", "null"); + Config.setProperty("stream", "general.producer.types", "null"); + Config.setProperty("stream", "local.stream.type", "null"); +} + + @Test + public void TestINIT() { + Config.setProperty("stream", "local.stream.type", "local_template"); + Config.setProperty("testing", "lsh.ready", "true"); + Core core = new Core(); + + assertEquals(440, core.send("LSH", "INIT", "null").code()); + assertEquals(443, core.send("LSH", "INIT", "local_template").code()); + assertEquals(500, core.send("LSH", "INIT", "").code()); + + // enable to check engine catch invalid property: local.stream.type + // Config.setProperty("stream", "local.stream.type", "invalid"); + // new Core(); + } + + @Test + public void TestSCAN() { + Config.setProperty("stream", "local.stream.type", "local_template"); + Core core = new Core(); + + assertEquals(200, core.send("LSH", "SCAN", "valid").code()); + + Config.setProperty("testing", "lsh.ready", "false"); + assertEquals(441, core.send("LSH", "SCAN", "valid").code()); + Config.setProperty("testing", "lsh.ready", "true"); + + assertEquals(445, core.send("LSH", "SCAN", "invalid").code()); + assertEquals("true", core.send("LSH", "SCAN", "valid").data()); + assertEquals("false", core.send("LSH", "SCAN", "dne").data()); + + assertEquals(500, core.send("LSH", "SCAN", "").code()); + } + + @Test + public void TestRQST() { + Config.setProperty("stream", "local.stream.type", "local_template"); + Core core = new Core(); + + assertEquals(200, core.send("LSH", "RQST", "valid").code()); + + Config.setProperty("testing", "lsh.ready", "false"); + assertEquals(441, core.send("LSH", "RQST", "valid").code()); + Config.setProperty("testing", "lsh.ready", "true"); + + assertEquals(445, core.send("LSH", "RQST", "invalid").code()); + assertEquals(446, core.send("LSH", "RQST", "dne").code()); + assertEquals(447, core.send("LSH", "RQST", "irregular").code()); + + assertEquals(500, core.send("LSH", "RQST", "").code()); + } + + @Test + public void TestSTAT() { + Config.setProperty("stream", "local.stream.type", "local_template"); + Core core = new Core(); + + assertEquals(200, core.send("LSH", "STAT", "valid").code()); + assertEquals("EXISTS", core.send("LSH", "STAT", "valid").data()); + + Config.setProperty("testing", "lsh.ready", "false"); + assertEquals(441, core.send("LSH", "STAT", "valid").code()); + Config.setProperty("testing", "lsh.ready", "true"); + + assertEquals(445, core.send("LSH", "STAT", "invalid").code()); + assertEquals(446, core.send("LSH", "STAT", "dne").code()); + assertEquals(448, core.send("LSH", "STAT", "irregular").code()); + + assertEquals(500, core.send("LSH", "STAT", "").code()); + } + + @Test + public void TestPUSH() { + Config.setProperty("stream", "local.stream.type", "local_template"); + Core core = new Core(); + + assertEquals(200, core.send("LSH", "PUSH", format("1", "valid")).code()); + + Config.setProperty("testing", "lsh.ready", "false"); + assertEquals(441, core.send("LSH", "PUSH", format("1", "valid")).code()); + Config.setProperty("testing", "lsh.ready", "true"); + + assertEquals(449, core.send("LSH", "PUSH", format("1", "invalid")).code()); + + assertEquals(500, core.send("LSH", "PUSH", "").code()); + assertEquals(500, core.send("LSH", "PUSH", "1").code()); + } + + private String format(String s1, String s2) { + return s1 + Config.getProperty("app", "general.internal.delim") + s2; + } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java index 157bed0e..957d7c71 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/protocols/TestSRC.java @@ -7,12 +7,16 @@ import org.core.core.Core; import org.framework.router.Response; import org.junit.Test; +import org.properties.Config; public class TestSRC { static { // disable loggers LogManager.getRootLogger().setLevel(Level.OFF); + Config.setProperty("output", "consumer.types", "null"); + Config.setProperty("output", "producer.types", "null"); + Config.setProperty("output", "local.stream.type", "null"); } @Test @@ -137,13 +141,14 @@ public void TestRQST() { assertEquals(200, core.send("SRC", "INIT", "external_template, key").code()); assertEquals(200, core.send("SRC", "INIT", "external_template, not_ready, true").code()); - assertEquals(200, core.send("SRC", "RQST", "key, correct").code()); + assertEquals(200, core.send("SRC", "RQST", "key, correct", "destination").code()); - assertEquals(421, core.send("SRC", "RQST", "does_not_exist, correct").code()); - assertEquals(423, core.send("SRC", "RQST", "not_ready, correct").code()); - assertEquals(428, core.send("SRC", "RQST", "key, does_not_exist").code()); - assertEquals(429, core.send("SRC", "RQST", "key, irregular").code()); - assertEquals(500, core.send("SRC", "RQST", "").code()); - assertEquals(500, core.send("SRC", "RQST", "key").code()); + assertEquals(421, core.send("SRC", "RQST", "does_not_exist, correct", "destination").code()); + assertEquals(423, core.send("SRC", "RQST", "not_ready, correct", "destination").code()); + assertEquals(428, core.send("SRC", "RQST", "key, does_not_exist", "destination").code()); + assertEquals(429, core.send("SRC", "RQST", "key, irregular", "destination").code()); + assertEquals(500, core.send("SRC", "RQST", "", "destination").code()); + assertEquals(500, core.send("SRC", "RQST", "key", "destination").code()); + assertEquals(500, core.send("SRC", "RQST", "key, correct", "").code()); } } \ No newline at end of file diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java index 5eca334b..8adecfce 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/test/java/test/speed/TestRouterSendSpeed.java @@ -32,10 +32,6 @@ public void defineProcesses() throws NoSuchMethodException, SecurityException { p1.setAccessible(true); addProcess("", p1); } - - public Response process1(Packet packet) { - return Response.create(num, ""); - } } public class TestRouterSendSpeed {