From 145e1b7517e5085327b77236892e1e2f209abc44 Mon Sep 17 00:00:00 2001 From: Conor Flynn Date: Tue, 13 Dec 2022 17:36:34 -0500 Subject: [PATCH] Update so Socket connection can accept requests --- .../java/org/framework/router/Router.java | 6 +- .../out/destinations/SocketDestination.java | 1 + .../java/org/out/handler/OutputManager.java | 2 +- .../org/out/producers/SocketProducer.java | 8 +- .../java/org/out/socket/SocketManager.java | 80 +++++++++++++++++- .../Internal-Documentation/Documentation.xlsx | Bin 19580 -> 19599 bytes .../java/org/rest/application/Endpoint.java | 1 - 7 files changed, 91 insertions(+), 7 deletions(-) diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java index f503c2e1..f839548e 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/framework/router/Router.java @@ -223,11 +223,11 @@ public final Response receive(Packet packet) { * All {@link Method} objects must contain a single parameter, a {@link Packet} object, * and return a {@link Response} object. * - * @param subtag Subtag of the process to handle the incoming {@link Packet} object. + * @param sub_tag Subtag of the process to handle the incoming {@link Packet} object. * @param method {@link Method} to pass the {@link Packet} object to. */ - public final void addProcess(String subtag, Method method) { - processes.put(subtag, method); + public final void addProcess(String sub_tag, Method method) { + processes.put(sub_tag, method); } /** diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java index 239bdd4a..2b23eab4 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/destinations/SocketDestination.java @@ -21,6 +21,7 @@ public final synchronized boolean send(Packet packet) { try { out.write(packet.getData("data").getBytes()); out.write(10); + out.flush(); } catch (JSONException | IOException e) { return false; } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java index 65ca05de..fc834178 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/handler/OutputManager.java @@ -111,7 +111,7 @@ public Object[] producerListen() { return new Object[] {true, ""}; } - protected final Response send(String tag, String sub_tag, String... data) { + public final Response send(String tag, String sub_tag, String... data) { return handler.send(tag, sub_tag, data); } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java index d2fd265f..e29be73a 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/producers/SocketProducer.java @@ -1,5 +1,6 @@ package org.out.producers; +import org.framework.router.Response; import org.out.destinations.SocketDestination; import org.out.handler.OutputManager; import org.out.handler.OutputProducer; @@ -9,6 +10,7 @@ public class SocketProducer extends OutputProducer { private Thread listener; + public final SocketProducer producer = this; public SocketProducer(OutputManager manager) { super(manager); @@ -24,7 +26,7 @@ protected boolean init() { listener = new Thread() { public void run() { while(true) { - String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port"))); + String key = SocketManager.accept(Integer.parseInt(Config.getProperty("stream", "output.socket.port")), producer); if(key == null) { System.err.println("SocketProducer: Could not create connection to socket port."); System.exit(1); @@ -55,4 +57,8 @@ protected boolean kill() { return true; } + + public Response send(String tag, String sub_tag, String... data) { + return manager.send(tag, sub_tag, data); + } } diff --git a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java index 7148a737..0d923738 100644 --- a/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java +++ b/DeFi-Data-Engine/DeFi Data Engine/src/main/java/org/out/socket/SocketManager.java @@ -5,11 +5,16 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.UUID; import org.core.logger.Logger; +import org.framework.router.Response; +import org.json.JSONObject; +import org.out.producers.SocketProducer; +import org.properties.Config; public class SocketManager { @@ -38,8 +43,69 @@ public synchronized static boolean exists(String key) { return connections.containsKey(key); } + public synchronized static void createThread(String key, SocketProducer producer) { + Thread thread = new Thread() { + public void run() { + // perform logger verifications + Logger.log(String.format("Starting thread for Socket with key <%s>", key)); + if(!inflow.containsKey(key)) { + Logger.terminate(String.format("Key <%s> not found within inflow thread configuration, manual review recommended.", key)); + return; + } + + if(!outflow.containsKey(key)) { + Logger.terminate(String.format("Key <%s> not found within outflow thread configuration, manual review recommended.", key)); + return; + } + + // retrieve inflow stream and listen + DataInputStream in = inflow.get(key); + DataOutputStream out = outflow.get(key); + String str; + while(true) + try { + str = readLine(in); + + // parse input + String[] input = str.split(Config.getProperty("app", "general.transfer.delim")); + + // validate input + if(input.length <= 2) { + out.writeUTF(new JSONObject() + .put("response", "502") + .put("message", "Packet processed from REST API does not contain a TAG or SUB_TAG. Review REST API endpoint code.") + .toString()); + } + + // extract non-essential data + String[] data = Arrays.copyOfRange(input, 2, input.length); + String tag = input[0]; + String sub_tag = input[1]; + + // execute valid response to engine + Response response = producer.send(tag, sub_tag, data); + out.writeUTF(new JSONObject() + .put("response", "200") + .put("code", response.code()) + .put("message", response.message()) + .put("data", response.data()) + .toString()); + out.flush(); + System.out.println("<<>>"); + + } catch(Exception e) { + break; + } + + Logger.log(String.format("Terminating thread for Socket with key <%s>", key)); + } + }; + + thread.start(); + } + // used for generic channel accepting - public synchronized static String accept(int port) { + public synchronized static String accept(int port, SocketProducer producer) { if(!servers.containsKey(port)) if(!createServer(port)) return null; @@ -68,6 +134,9 @@ public synchronized static String accept(int port) { if(!synced(key)) throw new Exception("Connection inflow and outflow not synchronized"); + // start internal thread for socket information parsing + createThread(key, producer); + return key; } catch(Exception e) { e.printStackTrace(); @@ -132,4 +201,13 @@ public static DataInputStream read(String key) { private static boolean synced(String key) { return connections.containsKey(key) && inflow.containsKey(key) && outflow.containsKey(key); } + + private static final String readLine(DataInputStream in) throws IOException { + StringBuilder out = new StringBuilder(); + char c = 0; + while((c = (char)in.read()) != 10) + out.append(c); + + return out.toString(); + } } diff --git a/DeFi-Data-Engine/Documentation/Internal-Documentation/Documentation.xlsx b/DeFi-Data-Engine/Documentation/Internal-Documentation/Documentation.xlsx index 801e6381bf2a8e2f2aa23b44e27535a33e05e971..a5b44e771127e6046b30acc0e692ca8c443c1cd2 100644 GIT binary patch delta 4749 zcmZ9QXD}S>*T#2cSBn<0q6X1>uMyp{IzcQ^m*_0f^RP-n5OuA(qLZxNB|`Mxdv76n z4T1#klYDreng9Liocmn&%sq4F%>1tNI~5q02CP`bM|}#`j>ZE40H25e0CE5T;O8Rf z>+b1nS(;OFX6Vr=HVCPfuw>3FN0^i#~E2b?>mUuK%`Ghvr@4*n<-sa(S0G37yQ zBz_whcL!){%vvZ)h`sQXLv3P*yPu*Dl=Qb(|EGMBR8aS^xI$oTsE@0hfIjz|Q7@xmn=?`H4@Lu>qTzXM_Zq;qH3N+9K+98z>LV;7se zZp~>9mK=QcyjKf@Ggve3poOqy-~_y8!@tn4nBVuRc?}=;(N>#uC-ZFH)PQ;}U{E$d zrH3o4wo*eKH5(cf;g-a0&V1@UPC%K3UuVVXyOr&sA#E7x%lbTQODkSrNcqUxM0@f1 z!)T>#KWQUn_sdf7&y<$XV>^{O`&POw3_BdXfvo}ks4*GJotajOeOdV`MqQ~EdBAzM zB4nEIZb|1^`jo9Sw&X^E-+qPh0tI(EjL=Pr8;+aRbzmwI$(B*^+B`eBMtHUQACO)( zKX@MU6Yh0a`i9`ELYLyR*=FVe<^qRP!=eRcUH7t$j%GV8v3|)MT7u^9^E&zLmCjvq zQLNLRU&BNfjt#;e>+f~4JQ;x~Y%pT&>$kjR4#?9g^ii3Vh zio-08MW#=$hCrI|k9_Kr3MhUKq+QSAFPGElh+r{@SU=$e=Rxn{C7-egb&*u|41S-K zzEX0l_)8FQj)3<^9)@Z=y}c|=ghINOF>@wXZtiCB-c5km(*u*H7y8Jm+CUq(YpY#~ zl#T7WsYr86iBlF&Zh@yCL{3Ht970K6K1%2{UZ~4OwE>IbS!`*Wol#nRgKyO&2>ooF z5YDzvIgqr<{Vsh5h466P75za@^N?TrqyU^cH5%N#E&7`!6)oGspI5H!dXhhu#uWW3Xy-#SnP3dnJub=SDswcoHyGU@)6?)y0+ z@qNo!cju<=k1z?dc|57m*GtJY-fmnp)w$r#eJ%2lZ(4aVYh;##sPNfVKzUJ?VYkw< zZrWN^k-d`jf%R5l0-mwy9@XgqiSUR=#M!(t*}2lwwTIZX7mf6?Ig`O}YxnXWYwKww zaal+-N&K^Dz1+?GJ(Pjef@Nt`^~F_CUVgsN+)meE{J@a-a$Qz6*29B+@f`Jl<84|< z6NBUtDI};QpZQZ&8%omTRM8~hO-#RCRA!0hRe6A)g`Gc@f6(uwS00!G3+gM)a@?5g zk1#s})#6((;d1taqZx?sNYW`cS97PdUG$}G_A({exw#hhVYAY=oo|X$W&Dnm!)rqRJ$M70Hujmu zs3Fl-!%2`IxOx|210vR{_cGfj1OT=K(LgF%G?ESs`GMI`+{lD8MuJ_Z5qJVX3=N(j&A@OhoQo{A_yFC6`D7M72l6A0zAE(Pq`-0E*wj!1EO(UFte3Zum zw7V`xHn#pAgzn@aUGvIhhgUJgC8JS(a=ajH#V)a<#dkl&!f4$v615*`z9YJ=y5Ei7 z@h!@oq>J|F5Vvl{8Jr~O`}!e}-tyOL3~*n;%*c9;ea1cDn9T=g*^bGAfhbm#>BC&Qz}hfcq=Gv!`Z7W2LBw|b1tD(oCjr^knek6Eu} zAaM8~*#xSCI1uYtHIr^`g;DH%%0#h;ZB8{CU2d;gZ92%hcrJ_0zMY+sNNEfQAEzm* zr*h0XXfbTWXyMUMV@kYb8QBTGor(Wm@gO8CIIg>bzHTO- zPovq*m0{Pbl??$`LI1qEJh|LB>(ArOX%q@fx?p4E$4R~`w;dubv=n}$QNGaI(p8p* zm_`;uzqOn!^ZtCKU30APzCAK;_kP}|H>AhR`l*iks1BsT+F*(Kx7iQuSX99`P=VuN z2Nw}dn8~0FzW*T?Yqd~SQya@*jjUuvhD7AzgypcSpM?*3eZ)Bu>~iESYI`@&9>!CIQbuO2cq?|zYY;&%allSTdOC40oLb&%MQ)k3&9JHc8v`ARoU!8TMyeIU6&V?B*~s^%QfWc7u<7>VCd=(UcP~5L zXF)~0r9Q91&ZiP6c}@Wtb1aX#(;A?8SU&a6FlJnah{uIgWrY;&6pJE_Plkz!Suvg7u zIOY6H8cvMtqff}H!O0AyPMo+md)3K^GT>uEQoP5UW~4)>CQ!$M7JiEYGh2>*IO*veSFf|fqL7%3$6j5gX5RPx%oNsRLQY~NPJn{}5; z&g&WJ?m^}to_dY1es?D49N9eG=py&NLf0sdQ}-wXTTrRo9wV8ar|#xQqPcH}whMHC zQbG27!0r?s$jZ$0n<$;;+KL$?lnVs;xPLKn?BqmbM16utq<77%r!Zq?Hb}2ug|t61 z$pl|mOGf_Px2L1SKvcB`0Id<+J{V1jbdmwTH>O=|iI6~amCCq6-{gHLIsI*x8c=Fmb)jE4>ZWF2 zl}xl;`)5eLkH`O73>Q36^?Hx^VL~X&k6xo#DDanG;jV7h^+6TwSGDQeuBrGCiI!uu zrm}m--MU2_>+k9cUYd0^3_e_H`lR`icT%!eYi%>+;xWhVyk@;wM>*(nW=wrW$cY{8 z>E@VbC0%P;I5E6e)i8XHNe}AZ*|wf-d0zsGTE6tbTWra84w)05Qzwcj%uBhJ`K@p+ zk6xv9sn%tU0uc|mTwrPM006@Te>edp{>LL?$6;~v%v6AtR|Pi&9gBA@D7&LX`*1_0 zxX)OkF5xI)fp|N^v#&qXI?@t69bV*1k-IY;w*ENX=A-vopCg>4PQ74(ezC(*?U%L4 zH@kl!VMREajX`U_vnRkFb;*>FSz75MnK!GI4?X#*bR~n3O!x z_HhYZIF3y|V4nr))5PMzRHFrz%9w-=jflFQMKQ9u?}JCb z_bjjy5>p{a)~R0g+kcuzyPk9+kb}r>t7P>->pk-EhFxD#SZj?4u1c<_m3{Nq31=%? z_8!OL^eu&QqQZ~7Ey*JcDoQSy66S7v8r8|v0hKJV6v$*!fv?E#cFH`8=KGOcEOnAN z_|Jj%n)Y{9I71wax;!O%!%{XQ8PBJo{9j6d`01P&g@(;eoDKhBgh86K{RShaHc zMUDVg$LND{ZB3yAO$vg@>8TEhhi-bCKb7y09@ap zczY-Dl8tB?QcWFSCCW_JHKUvBg-y6EcUY<0WU8j&(WV9C=Us(CtDTfZfhdV_?wtLW z-uDcII#kO_p7oFndA8BA<)xZMr*067;HS+6%b_MVO9orUz%Xx&F}N?0N;SKI8Z!sk z@~)0L)L)l<8aipJ7-EWa(6_MMSkNoumo;;3xgvf}rQWCSMqKlAN$(xwa+s|E5gDYT zv@Y=yS^L<9=GOLc3!C<)2`bb+)Ia>Ol?(Kd%p)*U!uS5ECkjXsado|=RleS%^64-)4Bupnu4W!>sG%`*NTzZ@QtiwAVd2c4Tp|Br8Gs2up5lyCyO%kb}J@KekVuuk$ zjyEDDxO1wL5--($UcpfhUUKyy!f)Tg3HBpjFD+@k?-Fgi)9#KnSGKb2MG@(yLzuuT7~D%cOI2{v=AC@ZksXz3Ai zL5!Ne=IlDz-uxy%A@NpTPUs>QP_U1IaVS5Ao8}u8_ukX}RwowW*aFc^Zo5<;LVH!L zC|I*hMf}$0z*=SwG2}-{)^!cVk{ZTxqQ%tGNThNkT!gx~j9JCpBcbcD{`E2DqroL4=11*qOv?>=3h z6zdP94&T3}4JH|#hrTWpj=_Jqde6>#8Me*TOLCtvMUp&Uo(2fN)DXq=d^Hf5&@@G_ z#+6pZd5dP-^w|yfTZvek&f?HwC&5D9GvQ#R@^zPHxKdkll;X zP=frpfrFQ1TOweuaz_+=HbDH@Ik8|O_=nV40TXy8MWJ+*a)OEV3LmTH=S(YTd z6WL{8MP0VRnc}r>#`*xa@xSej%Gf>mBXGO3dBg+jyARq&}h*9p@! zk`VtQzzvH|`oe;~rQ$@q%N8P!KIA=T zd_6<=Wr65cPL=TS5BPEXah|39(4hp(l*lE==?-Uf&-~QwR`LtoW20%ehX9+xNR!1c zHrQ6W$G5QfX|P=r|&j7KU77 zW?vbQQv6at(tc8elD>hKA>yVk4i3vj1|WSkJRVyKwqhMxH=P_}qQPZ%WIk6U-ZMcm zirg1c?@P*`^Lk=g#80E4jOBe0i<6uyF~yCre-NeV zZcc}#XmvL~mHX&sf`x}c78)yqEIx|1j(uELevR#6m5zqJx`nK*YK7s@0nt~%|`VKC% zSrXlo7t3MSJ~4WQ4mY2w6RER_PUVi3)x8@62Ldl@Y&tLjDNU$(sQwkXgMa2}$c>%2 zqvAisw|C68fqi@o2AyHG26y$`$A7-YTOsQI!LRQHxgavMFL}Q0te@NFkvyK_Gajmh zsBcn}4K=DzDvvfWUv6E)q0A3OGa5CXQ7cukQLIHGc@v+{Zc??mEO77jJbgWGiup$p zpLSjhir~dhdcHuqYDN&alGcS)jO`H7ZT|6fT~R}=jkn%cb|1uFWphYAvbDNC{E4`t zo^3g6vrGQv%<}b{rYB_rrBaMn8BG5P(x}j|v$L*z;G;_t&!vN^9n%Rrcw@mHd!p8m zSC94buerA;AP-D{h^L7Z_R8akQ=z!qehQDY09EhR<&2}n*K{lNUDD}(gm?G=z{A7e zSO4Lm%GAPVMV$POm+3yt6!4Mk4LX^D6{q)ru@~LKpooz0^?g8ecRvkSIJ^v4cz%$i zDema2;b`x>_q50aV>fmXRHBgave59Y07nZ|3T34oiMKebJMoTXVdnVC5WFRA<5{Gm zO-G^sb=9(@zd1)|q{3M5?~%hE-hA=;;^C$Fbf}EU zv{>paGgCrk+2L?&hG4#xWw%TRWe0M%Xn<=MuG?i3%Hi|U^Ojn_{A?_Z#C0!A(jS!J zoGL>@Mi8Zx1wTc)`9i&XLu^G`>z**N6N&J}BWQJCpE8?WWayhCPiuX13PksFX*qy) z%fM$qESbPS&V3}=1z&GBk9#^KWQ&CW2I()E?b#u5amRp*;SNNAhN z9@`6Yosq&b!E$h5BeqzHg-ouDU?dN8cmFKbsuhS4uvML%mXfAWG7^5xNR7DKuPfA~H)J!H!! z$MRiuma6TCCatzF=B1G@WD5xVu#C&8upV;ysLVd%pIJ~z)D9VO>_sRugX%I!OnUo7K25*!f zCzI(Wz7Q2injz>Pwna%oeOQ{mrFchjT%d9+vSAbI-9F*HP?Fg&lBR4kB#lHx(3G1L zM&Z#`%wcJ%DUES;w(a6$_{z+)xI4`>QpZGgqgATnNb_sl+}RPIichr6EFb4CD!$UH zeYPO&0^`^0sr6CSIc4zx8ClQAGX4Xn|`uxcG&CT~~RuzGZ_jRtZ?gRGXr5_6VzS>4w3$pCq@Kjl=o<2}()1 zBf3}H(x9JP!k7-J2jco#7;ESph_rAGA+2~K8?Wa$slo+LD`sR+8X+T9Z#WRA?bG~q zu)7;!R%jf6ES#fa$>_@dNmsd0!S3tzu2^slXze3}Wse}mW4Bn{*c9M#cWv1D2*-sD z=8D5v7pn^J-DH9%3YvKcjhfjw#o)y6Si!qe|4IW@2&V+ZjukGOs0(=ZRhzI)gIM^v6-!xtj8YNhza` z12t*fID(C5MJ*iw9E)a@U{C^KR)_8B|ah#{@Gh=bH3#P9HY3Kd@+K|i9NV`)1= z=x5&w!0=}Q1K*SCsKJ^WyUMT?lLMX9^o$IoYN1ZD2F<#n_*!D_K7O;-Y+a6eoCA4( z@o7IaDKyQKm@0-V-{_wJvTmXqB*rjfd;Apkoff6R_T9xgs-NehQN&E0p+@<%`m`Y> zLZ}i)=pzo){1XINR;s_@lr{wlhKo3^R?IjWK;y??-@2pBYSXl60g0sB4SB_LZ$4>aTG7%%o9P}8%>9a1e^Mrrj+Fu4N3-B_iaO&7+cw(ASqhQh3 z%iG4B``+JoIS^lHTbKGfB_u>!^+NQuc|)0EyX{W8yL>xU?qJDD{E0ZEMK6W9G3n`q z>H3upu={a=gj-jukGri^c}(e`y-`@fRjvouBuJ}+rre?^G1U_;6|_{_PiFssjozSh zX>@0c$00n+g@V|z0f2Qh5W<2$YbGrVkq;lH-TY<1xUEkF6gGqUYi4tpB`6->=KJNJ zB^B!-T%>ipUXV^bROZ0eI$}tKPA=CVk+jeFV>aas%YeG5xlw~j#uX+>MR0gTYTlq~ zJ`r+kVtglhbLSK|>*yk(2*rsUN)Oyxtzj8BNqjMm7sLfNHgPI?AAvAx*piqIxhJ!M zE53z}v>fYkQ_Cjl9d!M(8h&gyS}e*vGF0 z$F|y@p9ae1YCU|PszXhFZ{e{Ys2H#_9jDY%Rg#zQE`}~k$a%Y7Tg-+_1Hq4q{ADss zJyv>NT|L5@x(Q1kX+z^brS`Mf)WD+pGwc#ZjP;}RXui9G{|=#m5G$wTZK++82)SeZ z5wxWs9wodY$z5@(WlV}q)b}j4#h}JTp@)il>5r3;_1h&Ru0T zk{(B~ZdNvTs5u)or(2+rhjp=7J#3O(=7ie5ALgE9d=7l`L3yd4yJz?MtF>S@H|R!` zVc>8C;C)VBX9QMfnq&|DW}+~b2fP^D!PiK=@VR-Q;`l5p*vQADO^2IRINqSv7^)-! zL^NgVE3P2XvXbOEkk3&++V+2FT~5+2!#Ga_+cB$F6J6|9Ob(18NWZj~xG<`4 z))(dpuzljQ28-bUh<6@$uXA;Q0zb5gueR3P9kM`^S0!#8f-6A8z73JUl?wEL483Dk z5YE7fA==b28x3w-mKA@CsFU4zn~a-|*vPcb`%X%mof4~~-m6x8-Q-6`&Y%RMObvi) z>)ri6w=TJD{9weXV{$zonW^Jd=Gu$z<;!d#2F@zrbvsjHmD=%sJ zIBwZ0*6DIOZ*^1RykDikJFav07%`tvs`;);h1FKfyr$%f+b4LUv5MLx*|Ey}zz8X3>4@3X}nt>RM-q7FyK1UP7c