From c9323b3c12c14936a48052477bafbdf16ef4258a Mon Sep 17 00:00:00 2001 From: Owen Nelson Date: Thu, 26 Oct 2023 16:06:06 -0700 Subject: [PATCH] add support for message delays for memory, rabbitmq, redis SQS already had support for delays. This diff adds support for all the remaining backends _except for GCP_ which does not provide a direct way to emulate the delay behavior. The redis impl is by far the most complicated of the bunch. This version was based heavily on what was built for the OSS svix-server, with adjustments needed to account for "raw payloads" which omniqueue uses for persistence rather than encoding things as JSON. A single e2e test for each backend was added to demonstrate the delay behavior. --- Cargo.toml | 2 + _rabbit/enabled_plugins | 1 + ...abbitmq_delayed_message_exchange-3.11.1.ez | Bin 0 -> 43830 bytes omniqueue/Cargo.toml | 3 +- omniqueue/src/backends/memory_queue.rs | 66 +++- omniqueue/src/backends/rabbitmq.rs | 38 ++- omniqueue/src/backends/redis/mod.rs | 302 +++++++++++++++++- omniqueue/src/backends/sqs.rs | 8 +- omniqueue/src/scheduled/mod.rs | 3 +- omniqueue/tests/rabbitmq.rs | 60 +++- omniqueue/tests/redis.rs | 24 ++ omniqueue/tests/redis_cluster.rs | 24 ++ omniqueue/tests/sqs.rs | 21 ++ testing-docker-compose.yml | 7 +- 14 files changed, 528 insertions(+), 31 deletions(-) create mode 100644 _rabbit/enabled_plugins create mode 100644 _rabbit/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez diff --git a/Cargo.toml b/Cargo.toml index e33f55b..aa403e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,6 @@ [workspace] +resolver = "2" + members = [ "omniqueue", ] diff --git a/_rabbit/enabled_plugins b/_rabbit/enabled_plugins new file mode 100644 index 0000000..0dfabd2 --- /dev/null +++ b/_rabbit/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_management, rabbitmq_delayed_message_exchange]. \ No newline at end of file diff --git a/_rabbit/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez b/_rabbit/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez new file mode 100644 index 0000000000000000000000000000000000000000..fb7530f361d85e40c4a30497a2469d8e442fd4a2 GIT binary patch literal 43830 zcmb@tbBr%c6eZYpzvg>w+qP}nw(b74ZQHhO+qP}H_uJV`CfQ_nGRam_m8yU4$w?|F zb?e?EF9iw)1N486Ef+bJ|FQYM7YZP3AV&j3Lknjc2R&mGYXc7xV?7%aCnp0l6Fn1m zBXa{=GZPvnS_TGM209gGXduwsSV^P2|E7yOED$i*GcXVk80vq)%9|D;W8XkuXV|3x*}6Pi0|`|~xbehRM8 z;7Vhphp7Q&BxS58^h(GGBLhP#1yw;&Awi-+7)pd(A{eo_C@v|CJi(Yk!gH3Mr#EN! z_q+G@E$g#q#---_BgZNBld1Cxdl?^*CoH_ZKAK@!1QNu61Js|t%-fg`03ci@i$gp? z6N7q!CX*-k?FSFnui6|HQ%TY{H`SM*RPje zkez-|kW@Hi*d~%NIDybmoY#23pB&_v?k`MoG-^G?V<=<=a=$1^2nu0AnKeE(AvwMy z6>6LdK2b$=7NRl;N0vAU?K&ubxp&LL9?WaxBbl|yDT**LC_qaxli4s5Pclp>7-euE z3|}Bl8b3H2Su7uUz~`(ce*&M9QXTj{kRFQKKo}+&1ok!Zh$z8NEs&reB?vEIWjK*#;v7cdO}#BF>t87gm7R~R2@Gs&n^mr z%CFxlczaA#5k*y{6JvElYU|A^A9=9#P-G;LKlpV_DTDn;eZnQ=KY;2VGLmw>CRygS2-C5~ea!Q7%0g>?gNMq}YZ~afiF#z|^IlPs+u&}u@@d;j~FfzLtw0|EX zm?2CHA6Nh?gc%cN!?OjD5gvMiJTv_C5Dl=u;k|T%yDeOf1dbiB|9oAGI~1zeAcI3D zf1Z06G{h`?;l541c^S`iPWUDs`;p(MkXI{;l2SuUlh_H*N)Zcbd2808fMia8w= zzWhXtiO4>)V|+?I>E8zirt~y}GH@1d3v{NmpGi1en?FVY=}d$A_DwvP86uS?2cnJwc_P?|P(q#0cHpc{ zbb;Sd48ueW^tMnCno?Mh72M|LR{Qu>f5#(12o@e${PPX)Mf>~--hWQ)+8nc-((Ra*-4?P=jrF(Dob``^|Ml-;l!AriV4jA2Fo(~?CQnDC5k!?$R)CYu=Ixt zFi$MpkEZ3r{!Rj{+DPP%6j6Ags74_Uh5mTBBB*7-h>&1wpoD@`=1VmIWAx>q$AWuy z2uLdE6L01>SHvv#e=w(N!q9!v>~rd#9{1?_f@!uOZAJYe{Gy7%2F}QlNhf`V=Nbm? zKTV=hJD4b@h@ebBTGhys=p#X)26x8SyQ9|QC@qc&0kaTLE5u^_6wEamuS@r*m zSkh0@x`23`U)tZ84)OAv6EfDjfM~PlKQL@=PZ1Y_Ee%29x^n3K2Z{gY`Pg{}4=WA&*MJt%y-IS)V^D@Pdx1%^MHi>F3|ik4&we z1ml;+j2pgb*#ZY&1B6`$JlhaVj?wXSoMl6fAk5t@gkZ$rQigDEP9BhF-Sp#7KpW=&Ck)JlBeuRf-`OP>m)&G5Vn z^BE^5e^gCjn4+`jLCkxAOtHZ2b}lW$TveDGL`8iT_lk z?+1utu{oa}k$_HKr0<=+IDuaqv#0`gwEtZoTE)ViC(j57Gw87Ifb}(r7y-Vf_c$vZ z9;79LXPbsSBh5p9OMOo~8e$hZDax-92OE(Z-nPyofPs|(H-vd73c+~+F)SU3p)PD& zTRC9fp;RIe%m1%ZT|uyrh0)pkKnp7qE~=nCmDRtGb;w{Vt3qZ5t@pXTk<7^{tx$Yz zsF)Bt$i3zKei=U=SpEw}d+ET;0NE>Me}OZ|e~pBa2!UYwEK>UZ#7u*8>L25={aUEN zsV992cdE%2cRGFkP_UTCFxKb!^@vWU9qKfQjebV^1^Bo=nBHvmrtn~{1^7S%h@3$L zD(2{XbFE+>h8^TYX}IX>9bP_g6Hv;t|4hP~gfRH`A@a~NV}pA&E!sR-8bHh;_3J=R z`VXOOwZjH}*U8j7P&#l+U|>-V@KbqO`u?KY3I=L{f7k ztbq-#K~&A{A?qYa^6X(T!RlTE46h(IA%uycgSBk${mTA!C;CMPO9||I`MLDHg6Xub z2(0&=iBX1(qKopi;|Q!5^#|Ib6g_>ppUDsR-HF@56BhB)z@)O*R z(c@m5hWr2K*`oo`C=W{mq0Ayafwk3cLjgxzua=)O+FidARJ~^J3^ZTzv1*e`EMS2NJ~(MEIfe z17kS*)<^uU@L&&&0XqZ|B87uv3Grb>M}^3hm0Em1sdxV3&%oFE-knr$HJn2ft$Bdk zMAjc;xH9OEGoIV*M>L!J(uZO?Vu)htkfEgyLT5eiaYy(5GqRDJgOARPqg4gRRTZVl zkM2eaG2KKJAsoA#L4ek;{)@sDH7)A_wIzh^WELU^6?i*aXENHxH2=OLOc@u*=|{ov zVt9a&Ip>w& z;0}KI^ZaGED!fw}H_M*TULgUarLZVnJD({VvSM3|msw z@yC$$-l@`xdUw{1GWIQ}S`Z%3;>3l=Sm$XRi78oel2GzQ=1SwtWAZ(0B%%Y`M!JB5 z$)eiwyuzR=aWjVMoLx_`x?4J&)$_jW9Wi?P!UMB;b|BpDd-(1AIALmtq5FM>;eA$` zt~J=S$BjD*?~t$g<@pBv);>q{zYp8((^g3z|5~Tm?L*4efRQsKo|id^^9@;(9TfTS5Fg@90E#bpA>U zFqqxLVmdm*`_M4#&vU_Wbmn?u8V?e%@-qN?pVctvtApl+^9l|(j`)74sOz*N5(|f3;3Y08N>XvD(vfR^spo~ZVUtLbPLbv zJu5)R|0cZxJldu7%<^j+A(6>}jSY#vv(?FkKW3?rsD*GR<3??}tA}lS*04&cyKe?_gkob2}>7ik>C*d8XVctvGA{=kDa}#tj@WDIP2qAU7H%p zVUFtp+=ukG?QHuqvt$w%Mw~|2PAG|jIF7hSbmTd>G6QEn-8hP3$Fxmc72}Xe#t7y= zh;)^U#6bRSRFUe0Gr;|_<%+J_U43-~sqqC+=h44b|yWGp0& zUxBV+A80Hy`PZ==D^auQGbvX)soZ(5ylx8Yj#!gj_iq>+JSix>Ih<~aJeIG;Doe}< zVn^1VDoYLHTsWv}_T$=F!|%w-X%c0ykWP~f(#H%pYbh{jEopmL$Q!p#{}8BV)6+>e z4>}=d-cyf~vr@Q=l2sga4b#VuC}%a=!6{D8P=b2Shp=q;GHuz%*|1`3lR4xjbIO(? zKlg{ekHj<-rQ<7iuLX-sQ^9$qk7I>9yPySV$?)O5St*@~2`{2KL5IscY7rl^oHmkP zk6(5t2>1#MqNY-D6yZSwbV=6CYjnkB?a{mEtp>5UQRbA)Ny#J;SxPmYZq!zN52OeY zDt6|1JWAQ5zNZS^FxBLcBV`+4HVxmdoz|o}VRpJH5`^|QKr%K*re=2Be7%l^zbE!z zeYlzGZ%f?|4)h^X2cBW*28v9!_y_p9pl@w)qNG z_aK+9eI7hU?-|FiQp-?Ey(I-yg}S*Ekxk}4)21oNLi8e)lgafqRL$nK`D`dS}3 z$I^^u?M~-y)c2v;$T=-G4uuRD#8&6jgfW^4Z50>a^SEZ(oU@xd}Cm9NnN{JYRY?MF!2;P$??iD%g8x~nPiW` zyYrCTyl8<-JUq-3_dSy7ZPBZhpaiA)vvgIdr9?m$mzLNOBFg)L|YS&OyMbOKb z1wYl6Rf-_>EZZ~I4x%g_P&62up`h6w-FG7_4pK2OjuA^Y;2eEmpl-Ym56FD}Gr`#* zfx1F>X1Qk9*#2tC>)!c@xuEBm2TKL;6gxVgQQD0ECzpSpHR97P0(-Fr_Hwy6 zTapU(obIYS?P1C3v~CtknQl{yo&t;8W|-DNyGyRf{Uj@vFk63Ib|=)oJwEHQ^C8j` z-;!5kkf59G-BA+|5%l2BUkq~Q5dx5OMRD{Z;9P{7(VASmMH#F|=_JpwZ-0IFrkh`m zbw+X7@&OG!8ywjImQR^=+)^&6kLd<(;tADE7dK zqs`iVvs`C7U21)fl%hR5alguwiXUNaiW2Z8d#%^axV__M-I7EzS>cVH2A6H^wJx>X zwb~*L8hOVeB#tchIoW=a%(0`DI&xqGx(d3J%nEMgRO4&b?vf;LGCt3%2vkB z$;GF)V~*J8rFO)CdpVnt8kdJ~`iBalZ>)p=_UyoOTFYpWi#!SeA35d^_O~jJxmo4< zp{41U+0eEdZ2B(pFJ*Q;v5kG6f#fqt^*Jb2`nRL_$0?h=Q**cB2v&J_!vYbvaD&ms z=fb-cNvF*V16T5C0TXk>u{7~g4kctJ#s{|kLWaVO{XRh(d*+8O0f%yGbBL@>Vezc5 zO+1$8b^Qa_Ws=PmdvAfP-%fbGk6EQMK*M$`Ifln;@h&u^U(EqDv}5KRgZDXS6vKVR z3)bL3A(kgXuN8kVg?%f4gJPj%Y%-tS%dp%eJh}MMpfRN7{9ZoCs|r?b zg-et=+DNiW0`bvw%zXd81#;1J-G*-V+Na5DMY`_I(X>+AzkRR}@^yiQFOKozsogGi zd8s4oAxW>+KPvp9cOcpK-5 z@iV{a)1_Fx#?iXm)+b@?QBI)1R2{A5bFQ`H%5C#h0k)c5(tdDV4WXd&b&p2}Zj(tA zgBs0^uL1zDdIwGej26irE=E9d4&d3J3D03X3GHBysVZ((_e zH!ml#bXHhY_z&t;J*seutzXv*=covceJS}2FdQ<{RTf4{U?>w^jVHU*B949thI6zm zePDHk?zMz-Og6zyHUmYoj#&l0#^)!rF~{JbbkBKNmY)6D-x}K4g^d zwA$!p!tp8XeiE#r9f83Q43s9ja!~McZqqSf=;N7h59S)%c$aY&%IW?@W^V;-SE;{y z#*E)WEcE2s{}U=vH5n*IKsY`nRd9Vyjo5Ql*Em=R0gBw8yBOeRIHYoMEH9rUXsk=~ z2!5QmC|w$#T|b0HmbV_IxjR$0cRBKDU$U1YzU zb;A2xpTXn^uj)P0@P%%t=c>72_QkDW+Bi}Ay6V_+^5}6vorHi#vvC=j-q(1)AJKzOLgoiF0c@^<1Cf*g90^->SrXv;;kiA3sRdYUMw|N64q*8Nz|pE#C!v$@cU+7Z!{vO=BX&=TEE97JR~YvHRH zQ{`-3)Y0?EcTcLLmbG5lp>{I^-SdTl`X_lTb~KU>6EdUB#bsLN0GF4QeVbl|Ip83S z7%eA;#G!rs#uo!?p2_|h6Ch^kJ<*e3xr;+l+R%c>uJQv@dwcCsy4)bV&f^SZ`Dp-ts1PrnF6tUq?U!SSPYGax5tD*UhKE2z(s5zSL} zq_$7{d6Rxt&6n_N9W&|Ck98g({tK8bC&EJMoz&+Pr+r((qQ9kr|30U1d=;VwH!10qO_jFB!KWs}@J74!+QBJikO;FIy1;$oUa=?>%Qj&V&Uq_a{_Kd?COVcF zQ=9`mkvO7=BIMIC?`Ea)vEKL^8C20ApBuw{({_f9iU02_oqqlXyCTET1ZqtD=)d2X zFP9(igLO-In?j>c?mtDo^lu{`P1lhD&HLE8Q!*h)iLd3ibP&=H9> zD?&)H9q*b&uG+$i8kp41r=N^gskO?bS>x`Sa$7W#m?u!`Y#HfRCS*pLZJM@_o%?lN z_v}lma+v3m8{98vTC?vhiSrUTKy6_yBw7&8sZA%Y3)$?_^fxMubn1uxuAOVq{h+Kv ze7C%Ak;%+Hgx*tgW3-LWe8%YIR}A`$#fo7LT{zF`%*on}9T&TqV^xnP8)kA#uOLAU z0WGKTMm4#|tP=yE-1#SURC}kFdhk8(hBSh@box%W$G6+J`u)6blH4peHSn9RP?Zax zP5E`Km)UmHf@l=AQ1B*LN&gPA!Jg?a7(ma*7Wkbr5ni<7*q-t| zYfc7OO0A|bHEKCPGqz`Y-@5X~5X>v#Q*)rFZ@|Fh*Epj`E!VwySB~yC^c{yQumfy7`Znm%(dU86)p_N=?>+_GjL*+lC&$IviG zW_R~Z>GCm>zpifLDvZzA+n!+NW!&RsXWXq^;D%1>{7aH}BjhEDhjYu}Ecn&R*kT`g zF^y&x%6B>~H;OrjJx7THSy34J3Rr58c0#S0mf>&Qc{`gE$hJ zAkmB+kLq?~b$#z|&8BJaq!}MF z?aT&Ze3PcFukSkkD{LxF!mpOJ*H!J>oXjpQJ?lQ*6Q&isp~U%xG`6W9(=NS&HdD<_ zFp8MEolh6(CdE@rq6MqA4l1XG-b6B0%vW=*M$r&xcFf-?XB?!P&jpS>d`&l^2K`lK zR_Dm&(sCe!TdRkPf!}kzrToq}cdnV_runjF^0#vh57V^Kby86`;dxCZGE-MF> z$d_u^dQiAXj&xMbPSm&m77OHb6P+Z9K15&oUtKHyyUuvHhn*yWT{%cJ)!uvlaZ=or z&q@u&QB8y&lXYKViVn^rttBx5fA0So_BbsMic3dbp?FU{#+98yGYS=|cnDk&fK+A7 z;;PFwXW9Vv9ApXD)_m4^D*EVZYmv2GPmv%ThTz6!hMB4GMVEyXqxv4&e3G z!eSDQEN;rz86L)Y*Q$i~d#WUreR1VH*X=bvYj948*ZyR2ycTNNP!+n+J>SS%RUi)3 zsuil;!QjXciY<(bEjr%bcDjE*5%>rNl{^r9Q++|Xo|5`RyOEZEti2^U*`28Nma;PD zgNl>O=w*UAapiO3DqU= z-C_Pcz*}=ZP)Hehjp5t^|4H+0NB3pgMpY^uNNc(79vzY~yA@?4t?03Fe=_}w4wuiZ zT|WyCV|GsW`% ztF)JguABJN^)x(gJ8^U|d(Tncx8Y`L^p7{h2|+u`ri_&hC6x*i9<8UB2I*}6@i8M- zf!Vg5qS1BtW;p+5ekdY(CvX;$`*Y$n(CH64#}#9`%mgDkaRe{b8cW1uFFQOBN1|Jn zP2Z608yTuwpW$-s?U4g(m8dkL?7q#s3~hb@i3#8QQJLsC8@xHs&y7&;<1)^1Lx?-m zr5n$U6en6I)pfI_+OKTId~Mx+5m-wf9mK98btmO*1IZIl!^izKn24>;t8o;$Hc_G9 z^nlX6=S9DPO3xBfZ=Fhg0@}(alv(MkgU|OgaOX?SIomDQ`Z#X-1QUDj(Om9)SUVRc z1)p-wwJvLL_^J;>xN&-mx6C$hdi?>4xOH&+{g0i&WYg~4y>T<0yxR^Wi-}tCYXj!N za-xBTd^8SwN#b?&z<#aP`aj+QjtVjnmMM6*Oso?L>J}G?u;h?%^~aw(3ni|uO7)YZ z6f^l4e=&d8-?n<$JRauf3`6kC>7?)lRx1y(Va{ zQ!=qfvptS%+vpFTtW|)C+MmPtjKx{ZUV8=J(NbM0ZXL~R_l^P`3pB>M>x(bhM3hq5 zPu3I+G-&f2L3881&5uBRvD1%S$rBNp=UY6VdS+7wbz1+MY_@~8?)=s2aN=(14l{H}e-+@_F`iOOQLF5xKsfEsne zVsFL;wK`r;Yg^*s>9AFg;$P2f>}CVjYy`se91aEekC+_Yuld;%SB}rP_4Pr+xip_f z0>P^48@^m(=$|XDPr@eIl3Qn*aJ7ANAv{ORz2~RFKoalpZaGj911Xo=5@RjUTh5+? z2`Uxx$4u`m8aD(VaWz4^)uf8y(InjQwf*t^5$|zo+G?osIR)A_$pJP>rq}!9*}_Es z`&}1ty(acaSajv#;;)3mPFuYtZx@FfY4FL+iBnVLRQp^1>tMP#sguEXCZVK4)u^#m>ixa_CB`G6qiCZB*qdtawC>x)Z*#57 zqcAj&kl=F6#Q1xwmB$~-)T1TXMxlUXHu3y*g5>Mcjq+(gDfP|YqEqi1xwXZXc|C;P zWu%ah>Lyk#O{%k7eyJZI^`$^y@ZT!1b@{W(GmJZO<`rFYPCu$cL;{1=?*(Bg%LkRB zYg$`egOQffr6BhC%1R6Kk{N1ezjVbcy&m!bS8`eN*15OR%~k6#`t-H)GXJr5w3Kk; zRI!5`EQW>fB1wPVTSJUxao4KOVFb-wruyFP)uGqZXlS_GJTY%$2 zqzf{ka<-9r{IAD~0xgmh23$5|&4iN`CN$Bcrr8s%dN}eml4mso+c;G(lJD!Dx%s1= z&$o1iR&!rtgVE$Ws=Llk_r%UOmKeC%kT*qE_EkbGQ?g!4rgr z&3i%L^2GaTv6%<$LMZJfl~CrRQQ2!K7IKS)>pj^kKe_$J+v>)69i`Uk**F5jhp0H_e3cRV?4Q5pXr@^roA<%>PX( zF2VJ);yNne(~ptqN2lmkE5_xUbKUH)dq2-ekWxlX$#pnl%tnC9s+lZM^+KoDz>Oa! z-aFt$zVvuWqxa~w7%7;F&DAyJf37OiH3l6CyE8}>m|MU%D@uVg#-_c{`C8cc z-VoVC>+dosVU%Cj%_7*VmSe8Q4knyx#o`ad|aIl!B=W zlDfC%B^`QcpsUtfr5vSoP@_=!i+#>s-kCqC=nkXdLe=_cL#5?B8~pNKQ@>F$6lP4& zj{r^25TtpQcr8M>&*Hn=Kos+I6~XzrT%yqgu8`{1Yo=1-z|8m3Vd=iQeRxkk3v=zJ zH|Q#Tk+soxS<~+r{0PGvX@axzj!o>gs{KPNH3OnoKb2SdxU996eE4Y$T4HCg1Zv8a z%iHtJaU{T}y5WVcx-#fG`}=7zPlU3HgoK>_r|I&!9u*o? z?sCZH_3xbC=JhJQ$z;PNLRPg!?)6=#k7f1iHjj^Pgt#Z6rUxIPGi|*I<;vFi&seKUyCVq)_YVFkLkhvE<{A^T8IXDF7^tOupuXJ7A&^QASGC^I73GeD?)mNJ{(7$4O3-+n= zw00b%rujKi>-=*W@aS4y{LpHPj-xBu11J}ijN5yxj(!u>^_H;4U2Y9uQ|tSv1ab-_ z)6;$Mp)8&E6VEv>u7Ra$$Jy*;*P^jhuc={W#*tDrXk6Wdf*$v%*XEF{@@;B$_#PJA zKF!2E6B^n!^bfyHhp54lt)As{(P6u~+QK3C(HC(ex2orMC8rcgIQI>m7R+VYoR#k0 z-y`4WaIw6;SWjt6(hJ5D5fp7W^#L^3f@w;xEvck;`f7f>JAK1lx&1W@Gr!vZBc3&J z%9UDq8x_kjdo`vJYo6i2h2m);qE)S*^q4v(CDV;oYahv=nGW58(7`8tNejsP`Dk{M zbdPRe`p&3K=l-~Kdv`kin=~LadZNLH{v&Dx_KXld$zHro<@uGs$LdagM&z}){gb#* z%LJS3BD(cJ((tD_7aBF+gv6UX>Uz^{tTk|`lOXqU?&s|=Ei`#Y-ZHkXys5xBNZ8WL zb}}MJD`|B&OasZzKn)5&PRDB_(6{y7@>yp$$vcWU+&_Bo?1g5!3F4R0rgh1PZIx7DA9qd<>c zlHBex3@v{vT4eR z^ECt9^=AT#sg1}}SWI@qzQ}w(8JHRGv45!VUq=^fiZllgZ2xN12wE=xpBzwb~p~!0{-EamxSRw*Xw_@2& z%1SD`p;v8OA)?!1)Y=MGEWT`>T7Vm8U`SO5sgL#*!S&SDwb4XDiO~R!TY!is#R<@8z6a+Jm$F2|FLV(fU397?WY)4( zzP2a4rylMd--^)IgB6eOIa}SjEbt<>Y6GzxfY-5hXNU2({TZfyH7A54S8A8Y=E2C> zGSTSB<7L4b=Li~FiG8&Bj9WorcnzlEH&>n$r{g$o*QVdp+wf?d#F9{ym8DU*M~;Wo zf2KRx1U{=6YV~Gpuh^OeX`EeQ(VCWSZU>#1hNKMo1;l$k;_tRL>;3qzu%E88kfoc6 z5^brzGk8gi7POZjS-^O)tOhqbkKGQPY;lC@pAm|r{J0ex|8AA?c)IAyZb-$3&MDyt zO#;1L1GT&9zBx@fSe{&W7E4?kVscLGaXy5;8C-wMt_i(JuYI`C+&d9hg&sgVZt1}< z#1+Sx&}2Qz5o36Y75+51%b8-TM#hYXFt=22FSM+y&?yz09;9D6E!DW6LA*E0mm9%z z=5{KOu6=P8{ z-7$Ar~|@X<#oIt!8>ji2?TsFF`=ql9fVkJ*?msxEy$%6|TR-pJuq z9VB*^kB<`6793!)yT3WkA?w02l+|Eog&N8bFZdO`;W*;M`?x*wJ_8%TGzhE{S|h?n zl7&(W!Ww|x4~60fBZLb34nq`1C&U&I7RDz;QWi!f{O5oeA0{qPL=;01LnKQmOE7~! zL+FUu9tIl*BFs<#<^Zk`QzoD-rY*E3$V`xd-lmp8WvxaPe^Ufc=p5 zp#89Uq!6YT1}{ug@W%mE0l}YrRAj-4$@q&Gn)KyUE}|C$kNK#VBeU1^gAMWl@INdl z|I-@chxL>m3RH`BLAM0V9zbhS6r&#~w;!0+obr(g7! zC=>Ua@6Qj{NMq191xlPF1|9)RlsAQR1I`ogmxdtC7lDg{|06i%3jz~`(&r640{&-e z0G|LH)D$9QFQEW2ZaAkM9Df5WOv5h?LoyJE3b9dOnBNwZGt3_;eELWec;ymEWfDC3 zG{6vX56DoqjD3vSl^n|$5(VzlIK^)Y8r1MNf@2&S8d?Gx8hHH_xoKfG=joNJkV9v3 za`ZAuwzT(DW0r-tC;QaL<@w9|%j?^P8C_MzKquYt;;TJK3EFdZHv9GFnXh_x*th&q z+Dw{Q=oJsz2m)evDM2`Iy@8=G38Qu$?x)^H7c@GOaPjBu0m_hJ!! z6AHMkKH=oV^epv+^xRM#aK!|Wj=ps!I(Ty0XnQg*4NCQi`6W z^kYMrpniY8RTxTHhg#iR{Qb@lj%5$0X6ayy?WVb3j}qK8mnXYF(Qc;yZ4RW6Td?Lznv^{4HHb$6K8Db@RI(Mp) zhVi8_F+4Je!GNGSEn+hnkiOamQl=7yl^_qHeq-QnQ#jf2kRc+cB z41E1QUds)Cu`kjL_zXVQ3<-446jNCH);7dkO}>Bc`6fL*ep7nVS+hPqGXpB8Gdg7z zCVfiY?=|afsOWk$99gDKa&1!XR*K6joStohcbB%{!?kVZRKI_0vz!9#AGoLCvRfjc zbq5-Eu-6;~9M2&?pG&;a-c&k%0hyX?k26h6C=Q6g&!bT^U+(k`Pf{dHMVAn1}rXBLoB^3 z*Xr(?5{m8>XF)++RF1PtQ2{?+o1Je3wJAm0p-*EyW}A%@*SlHQUu`d1A@4BrOxXzQ zxtMq6Jqth2MJ<=^D)0qI?I&IV6V>TzzSFgso zI_ZyRWIj!lFsm4nAMcdXalb2XEnRocoHKowjT{?|kq^^YPrdz?w(QZIa@=_(9 zHW~nb=p@E}w?nIiVhf}|7FMfpIQi-*eipI z2m};{^#7w4I=TF>wa`Z$ntjP?6ORd<$ryuJ5T(Ib4@(NeM1rBE;3}|}Au63AGA>LZ zEQ#8ITB8w7P#9gTNt23zAXK>}kUI1U-fZWUXVuMT*GunrmwVIa_vW+xca~@M`D@zd zibViFzmjZPx=P0UQ~@8+JQ*Ab8fIk@R_^f*7-~(vZ7c{2?C&2!VS(bqHP_N}W(vfAy2EL@wSNXr;$X0}G;aZ=0*0`fLubugsI zE+YbUpz>z^41qqx6?BXI`c-FsM0|rko6=hJhCoC6?NWUQzi{JEL4@%WtN1uq1Q-!Z z@+Cr?uFd161lVd!JQ}^NOal5~*uV#4LPYrJ0?cC1yC?zJ*XCh&H`ojRKpgWFX0B~V z7~{G&HS7DPL4v^PJA)hg25(^NUJ*=RgV%iax0(7uM)^MU8{>FnpKN2)`Ri8S*S>Z9 z8=asGKaY(s?HfGYz^#KzuOH&n#bL)^UY?$YJUurcp4@SVyFCm7J*u5w&oC;P=u{4D zsCU&>H8n%`)YFaV# z^7X=~Q%%UP@3?_pW)(SKgwcdS{WG^#5dt}enP;HETZ|sGUG~+-i!b08_Dop!C!fx* zc%A}ic^ANDL450Ul_o&Sj2T>v-E0QB-s}&i6z&BkED*P|&_5AE)CMfaR@^e>k_`2$ z1D=Mq7YaX;c$UP&f;Y!kX(}fa=Tp=zPQ21y0(=C3bbkA#6Ji~#p(LN?XHU;ihbGOXL=-=oeKhH!?b(u6gqD{{LG!!`1&`0GXRDDQ{QnUhR_hlp3n z9uCwW2%_&J=(d^%FpF;{9yJ9P?D|$0fpnh}9Pj%**;ZQ0n9=j(UteZ}6&Dp3pCHpc_wTBR!VT5}_%HayT{9YC$Cu&aKl?ZB zD?-BGqmYR`;-jb!ZxEm6C`FdX2D9h0E+E6%6qS4nL*7HCy^G37B+9qTqAJghY!nz2 zEd(n35uAJ>Nc}xm4gV|p$XYJ5T~qjQnr>TO4+p%w#cg`9r9>U8ryvH!3Mh{Z$ zqGOe$L$h43rYC@6txPq;9{1sHrutyhFo`F|H6~3Ch~W*uOo8M0q{>#F*TzN7V#!A5 zo*-GZo#0R=Ce><&Z>(Orw00CZWvQWqM%|hA{vDP^X(rO+0}%HJ*3k#Ky{3)dA#m@` z$|SnQWeO?6vHs(8IMTF$%hdJ-MdzxdY#+&c-OeqJ*s%aSYi-$AY-gVVc`{erBLz@f4iu*Uxo}je)zzqR9R>&dp7;f1+q;p*d@rsK)M9fxXuzLm6Rrf=xZa zj~5lj!@W*c#r?rAplEQBde@ND3D5n^IEj9T%8O8;&0C77#`b{Gk_fGnb~pG|K&5+I zEi7odxfkVvz%2|9$((WzLC)|AhUFxuWq-(v=h7Xoi9XwhBf2-DGy+vq=jxg>e(aKN zfO10g9ruO~Okitr1nxXsgM3!0*Yq>`%}R&Ns5W9&0w8Z0#E@-_wusZmsi?AuB?K2fJAZEx_ntaje zqt#9#I_L&l*Ynp%I_3_ZxKYJ!*wFEP_vz zxRNJ+xMqw?p>PESc%G^e2ZFtgz#mm=1mcAz8M|;Xh)oEt94VV{F}09%uN1Oe38g6e zHXPr$E2PRZs|@EcdNH5pYmOE@lv{sN74yd<>1(o<4tHBM5Ea>iz# zcOvS4Kp}rQF)HaO{_@75B^b^-|3VCF1vOQ%D;J$P*MPU=kbbc5TMq`6a~uU$TBpRM zcK7xK{G=8bwC&6}y=QK1-6AyU?Jn%@lpo_W>&|snkoz+)2^7!?OB=UwBoc6Ip7nLU zIwa^JmtuMtq9=ZTz2It0)gajyz-!c7KWjGC>Bn#r zec%v1c|)J-cyC4bjC~~7I|dz-qO=F#DN_()2~%M3mrR(DHW zD*@5yrHBIhWp45@?P&r~UdKGWGo6@xD?!rOo&O)Ky~C0)3=mj5wr$(CZQHi(nQv^{ zwr$(CZJYV;;#MlFR8o0}Ui7Kcr*|MP>jI$^yfCtISW|4}>bQ4?%Yu)oiy8K2(#Yl! zvq8Qv?>KlZv7;9-qdjjGi;(Y%Yp$$~$bMOFC|F*cx_j|M5}u%N-e->oSuB(BUZwp3 zP?z@N7nTq_mSj=m)tnYok7th!$4`FCk+OAVp<+nd*&HD(933LyZ^LP7CQoW6Ny~vU z;6;?Rj=AA&zFTIuNU`TVsZ}JbK!5iIW3H~EGFh1;LqjU#o6+g8c!B47h7F>aKCHGC z0+%B$VwOn{`f6AD#^<<-2yWx6PuuSqaxWgQwOE!WsC(GvRUKJ)SQQFA$gCRbCI%VK zFsv$}d2@0#-1#J7T835(mtfuc!3KLX9>g~n#_mL=Zg324~|6E9d;XnFPfy`YfHvZo5PX3jXBBm%$Sf8;L$gzChz*r9$ zwY~Uig2bpeB2r(asR}be z!MkQu#eW~SEfy|;KQ&=RPUXQbY;X2!PucCNEV(D0eGo5hwpgAu`w+kfwE`PSy>lh{ zeCrYI^qIhy6r8NlGh-Z$m)LA1vCC)yOWSJeYa}APhk^6Epw5&V2_bnHHC=x~>E3GU zs)@mDL=APMfIZE;~SDB8L4l`h{d1S(P|ZRP9X}4QeRA(#tky7O&fq%?gg)JqcFm$sl&u0E@rg1%keuM(^%i)Rob-9X zm1;=4Y{R1v`-47)kF_Q?+g@diG;-h9mYvL+?|czuaTn*QY!LZsaTDqImZIVD&?RlL zt9@|T+6;}Kk%uWV&?n_0!rWBqWSV|3syw^XZEp z)CJ*>psV`?=z1vMn;p1>j2fc@J`8sxAYy9NA2=0sq*VG}|=IKJGcn|WB*mMT=xtX&yqt7KSie7Irw6_6YE$zw5 z7&BulA2S{|jQK+{>|3J+i@&P3ZnmHMl7rI=u}{&-=}OP*f+@j`xp6YnWKy%!bI8KF zmU-XonZUVcyzZ1qWi|nOn%6Nb+gYuW zyqC`C9ILSMF4Nx5XXB=pU=BK8?ZfcW>3AkKLLAtZFLLK{`lnR@Cw4+_eZR!AB3~J3l7*4{9Q9UfoN+D^4|HG+CM$EjXE>7 zJ%Va;a&>oqcCwzWwdwuuscHGyiU4;OLv8!)O0?wR;JP_WGG5eO!&ugio&mR8b z$ISlT*ZT|hJhIZ>(D3Ho*cA+NP;bY^IS8`8ioF+{d-o6Urvm>$_FS9UI>@uIQ75;@ zx3}?jSLasW-U6Fn-`V=0AITHr?cV0?{?qsqL;T2pD&n2DIS1OQnQu94AK0va&qCJ$!aJmQDg2D}Z4VI#JI^hI24ZFPwIYd(Hj2abh(dvtmmY;z6X>fHX+()8xs zc6Dxh5usfei!*wW|hVZ4hh-XiSKp^WQx?iP+WpydQ)ZBG(n|3-vO!Ora!&7&;!z zXC7U6hJgT~3VLSlG20x67NPK&U`;-1q>vB9y~RyxSOlUjB(e-zIM?9G5jD?{mkYSl z7EJtue8{%|3WO81O(IXHFa+@jbo@400EGQNOu{&B5rzV7fYKl~q<`=41{g=OLKxH* zab!r*#4N(dXkZY5Tvk9v0T7A=Mlyxm!w4`4h$6}G2w;E$ErhH9Mot2-!HB#9pbNlx zJYE483BV{X3@<1rFUP2$C!na9pqNzw!xRV+WL%&SHSvWR?N@|0os!P*H$Ig1|!t!|mT-OJPLI$_7KMoDY|P?&h~|VNn8r6pw`P1m6k#~ zOci(_Kzbq=Ofk5)ckuZ>x2`RKAU2bJ{UPgr3ENqj^8wC1zyQF)5ye23gM$qN(tnMG zl$rK&ftpb>qXCTs>1j8ogY8mK;Q>VJTChNNKoEWyETuy{HT-WtEILXE1)(qcTrT}b7Jp-UR_^|!&A<7%N{#ySu!4UdnZHLe z{w@Rh{PO)r41ZS`Q+yErmi@Jfm-M4w>~R3ddHvyd6MkOEKe+#l$^X3NU)gW*xkvQ5 zNA`Aq8RU^ACRY6XV`T(%Xv<4o3*k~nC;Pn~pZBM(?B{WJm$?SeDzg|!1HU4s&hW}~q z42&tjct;4sw#U8P@QHNps@;TJf(P^?w$a;gQ+v0!#MPm1Z*5@0R&;Ise!+Zu!(RNo zLjP<%)#RN1RS)&8UG80d+_94XRv{OF{?X6zpIyv-B)&X6bo$QywW;Oju^-OC0s$Gc zP3n?7PyP`k9RT$ErLkRH)6@ml_wm~Qy^M+a4|eBlBhmrbFyaB?`I!d<#sndMV)Vo< zg8@*{0zER(xcx}-na1-EB|=}JRkZmU4YA*k%ZZ!_05$D$M3YoX&pNEFm23Lf(U`*n zI-54@-aI99BVg+a2da!JSZ2voS6g}tmd7M?7xXihP8HXX7GXv;k@u?g?y**K&u;nq z`My0ld#+|{$52=4*$oxmY7ighA3R+L#oG}jU&4I&xQ=F23Fn(T>h2htd#(m*|2i)< zk-l)jWq62YC6}NT*fzpuOrgvp%=5j-mr<&7wuN-%^cTb%4?>=E`2RtB`!JpdyHj6H~Va=3Lh2- zCB_YQPOD?x8+tz1R-~8B8VKW}^3L%<-?$q-f{6yI5r)F>RyvKA7JC&FHqhZUhyc!{ zfT7-+B~#*)r05}! z1mhRqzjfO)AHFfi`{vl-lH8{Daa8bwq@GA(_^@v@5fy~4i_y)5%6zQtUpoYCqWafJ zKUY>0c`07K}&=yHQjLWFMazkdy?bg zWE+1YRvDnJ(OIYuE_`J=aAoM5^>Hv(i0LW2r}z{boaPdKbX}zcvp%?aQj}ChO*?_L1Qs zK=jU*2ubI!Yt24+2Jj8lIOB^`lmMg~bCnFxwtgMA(8pqPwlI6Wr8?Dw5ou1)aL!-K)k#i42Xx!lxb81C8v7a%bDH2wYMT1>ys`lgo?rDK*5`!)mq6GsMRWJNeT_L=9+VH`Z*RrqtDe(DjP zIQtMQF{SEjK-DtL|1r!;MP=|f(p-6h4f##-u}T4qn%GTf8>BJr&Aox71|L_dL=S#3 zImmO#0+|*8$Wheo7Iu`S1A5g#q0YnD@-aZ`ludL;v!K;Z{uKE<)!fA|&n&h;8|)jM z0ayCQU9XlGdhe= zz1wC}?Q2Fl=(6~fJ>b69%k=;G1PUQoPaYS{hlk(dz5L)vm`c>s4b zbU$#QyFa!b8&f;lHZ<$x=ElHu^2{U@4O7e5*z1`^+ri()oLuCiAjdx%RVULQ%6g*` zJ>0-huFOQRus_`LP%unD5NhOu*E-ur6Xv5UMY2kiZ1T7`FQ$j39?uavyd@xmF*GlI zAj?${+odFFMSpPWEF@G}Y|NA;D0G{+l3)Xu(&19lare#3W(}M~G>t2=r{d?RRcCr; zS*87lpq}lXt}`E=WM}ir*Ch-vs95DkCX;Y7tuCy}(93A-H*K^Oi-l1-UERZF!QqKu z`BorliAJxQp8ldaopw&#r4ej(4K^|X6mp;^vL@uG9q{h-{&EF#72~zD45gX^+MARU zMve{dETh*JK{#TxI%L~Zi&_%yp)IM%t1;8_rZ>z3zpo$LJ!_O7WnbT|!Hf1vrcVz5v*XI|u{?aM^Pud7Q7wG2Z3oqYaaWcV zmu)I7i~g+m+eCK^DU>uCo$J#5Xm!XBQq>jh=w-JFYAcpBMt=oi2jLXFmp=P^=8Sdh zyVWNh3*KLI@KHHoTk&XwVX`zwButY%NmqTWNzU9VLu)X^I#o7RHl$OGsX?Cnpw0?a zukE~#Vc3#h6^%^nNqq82h9{;oc&Mxo=;)Uu3@o+^Kl`dpeJe@~|J9!YCD>e-$25ux zz-Q`pPgSJ_GeNFMAJqrTq$umcYPlNso+Hvwsq>IbQg9Hpbypt0ROX4h>Xr5Kd|lYG z!wLIlhaOlYt|gs%^q8QnT-t3Va}`~_wV{l3Nu`QcWfVAX!3Xs9&oWwoxUj7eG-UVAYp$(C1 zjO6Z)X7Txf?%x&Uxfs=8qQ{@HlXgAoQ3hcBrOY~?bK`%vzjYhiJISv|z)$W8h1>Q6 z$%tzKy;Zfb=im?YAb)~CUI{9QINnj!UimoGkX)li^T z=pesjF@C=c(|AYijXL(OW1)Q#3Z$r+hU-*1{lvSWO7fQYd-qM+X9L#TZ7QOF=y2MP zYnbfKopor<8?T5Cj0z&BuW3@hM`#6V_&g5dUaoR1g&yG17p-$EggwIY0m;kfO|$Im zWX!w>-ZI&NI^l%8+)cDqUO!*NW% znC>sPZ)TB;U|yHGzMyX37jkUd<8yQADA!9D9CvI8bGK#;il?<`3BzQ^-F>_==UwSj z&D8q5L8eG84t*}CH570x+|bJLC37=MP!IOXosQIJII!!e;!kpJ&9fG>vGt{`4U&bi zzO5`}iH}zjAI|kUVh-k-a1LD#svY45_DMAfYqMqg(5(nzJrN&TmbC8P85Ym4aoNp7 z4`re%>^^S}q-z$Tv7Yimc}uxrBwh@B4a+qKz=PpH<&E$7u7}*9)M*uH+(saLznMl3 zR|>d>EGd(uppU>O5dJ8c-Nf*d4m|ATnxuk6FTMMGM7E+JA?l7%$5_;0-BC*KzVf6nh50WM#S{h;^hp~xxTUh?dBgBNTS$DohC-m;Z(V5yPeLg& zOd`Tw&DvY^SV-ehm1u~=_hK5?ySv~ouovrp_T8&pt2-7d3>PszGToP&eMahe9yxpV zDQb#R^j{Cgdz$w@?(rSAKdJ!}l5P6z5hzoKOh2h_+_M;)X5@{A2kR{oEx%Nn411CT z-?#WwjFsFUPVisJv?mFL%bQEVEHHCpvZl$i9ChnS$@i^RJ;YFAHS1 zN0x_b30_f{XY8`Z9(Cs&gn5;bOd~7~U!cyiBdECWdXh{Rc>o`+Dq+6-`-VWw<6`p1 zALs{oOa@Yo!yxG$=<8-UBI#6UHc-tHna_xKtE=z9wytQlyey%iU?Rl4c^BmP?TxT{ za}qZ^q+c(INlE;ozX~Q{pk@|Rt(5#GPv}=0d#x1%+KQpa;xy>Qn$21Sdg|dM(yoi( z*JzvZL)ZMFcO%{toZN4$GE_lcoIi+fPd0+zPSm6KA^9-AltT|fpbe@fxcY5h2_Hb} zb=t6lqLND`(1V^hYl*)oU=v%XV6oj5hP_pQB7)Um7#tXf64jwnjJw{*^SQUiMy>v~NQ? zK8cyRnH}o(4K-;raNK^E&NYRygtTaZPpn(@6#LpbnMHe7cjCOHA;HutuQdlBdNQRw z>zkE?0M)|hngMlTtl^z|X)>tnO2##7AIm>28|l&;T$-{^9NLy56lHWwoDx$1YV~zh zI$KroBoEJF<>|_Y$-BT>$z)mz=atd0hgRwE6>1v_ z-3m5#%&--gOrHULDMoS$Sbf?mHsYB@adcFd*=}^WtV5ah+-<$_Y+OGNJhGE^INwQL zNKyxetrtc+dG92UQ{Kc>g`I!%ctCEy<(sLoItmp6j9wxpprax68{3y-MrnkexI0cY zam5PU`p2?#r`b})R%5T>I4hmbV-vtULnluWyDBsC?Cj~69VUA(w6{OyVmLH~p@tUr z8b0VrZF*^QY-S7jGD`|rQsL=}7qwO{OExTAQs$4VM;eu}Jdh0P(VvWOJ(b0J-HWu% zxH<9BZtENXmQ9bEHwuq^Yz*>g5v#o_zH}3m_O0MwGkEihQ=EZk2+Wp^WaXJ zxeX_9Fb4fW5P>AE$SLmt*3SdwB!SHIH^-A>kv0=#xs8!M|C7{&MvgKsbbNAp4LP9{ zV}*N?LoNRf2n{3o?VqK0JApJ%j7CkL-Yoo?F5S<^F?v zEN`+d!((lv?_=sNDtzJbzwGX`H)=0&LaLV8Na>Pq`l;j+Suzl_y#^+c^i*f-0VpF$ zDK0w2ZpokBUwWM@^K5&9?!N+1H)tzPJGT9Ap(|yUixbsB!|pyYWpWqs zL#>1ytzY-lS{AAhss8D>S%7Ara!k{4T1vA^70UJWLtTEv&x6I|zvAu{hvy&A%!^RN zr#;OW)+zpP!0-F=usuyR^__D+Ss6Na#hTF$$`0vQv?mt3%;ovKL*Hb)sNnI@(I0ep zVrEfbF`O0Bt}<2bly{pZKsANaW&?xO#aGd9;FkWYHEtSh=Gb78c3+)pjxf2mj&|pO zGaZ<0#ahLgc#4}!-*N43cPwx?P zIF@k#yBE9yk*_E79WYU56>ciu%=34C^}g&CnseS6f7d2 ztOE-<V(5# z^jlKQ60MqH%wUH&OJtz82XM>eV{|0-NyW-;ZZohP=?afeuO20mUG!+9{fR0)Rje{Q$!55KfD`EC6CIlL1l7Mr`kDRpu_b{~dkWp;0DCga!NO@fZm z%tSdoAJ@VYOBxfw_e!ZwsOrA(z+--0?Ph$)_iE$$K@etGPa zgwU@0_p$bRIZdv7QH2AMr>1JP2`=pkmd%V)b4!hrh5DMQ_LHv)CpvEt4@&tF)?|KG zw&ZoiLahH$f)G0oDSwBgn7ONVwa`iv{ArkB+p5xRuK!x^4+o2!^WB-h2yk_}%j`$; z0FXttCrHNn(8?7BtxnoT6r~kjluSt4kV2$7cE2v@W=#g2Q$wReWfUzmuACG1qdGq4 zSZTv8ocj;N+<2C01+d|AMGg7;4QtjYV%8{g*7*_$cq|{Av5hhXrTC1fBKow8_4~i( z0Zxp{?-%$?gv=vhEE*8-_dgXeWUo#j;Ol=tR=`FahgaAuqrxc1MPqrh$RZd`H?X2| zdnp)dmIip-e7c`R$aF?qM^xiTraxZ@)_{=zQBd3$3|icr0&6n<*%mc0uK{SlpvdN7 zYd~lT*y=ofOm~o=(ht)q%u9u{cxp;sJigEFrE_iq=w1xHL)Jb4UaoUWyBpyejkm=$ z@?@ui*LYorpZAwXYC{V@A^s@2UvTa%q3VlohYIm%uo?I2?)7=iOG0f4gi_9PI zAo$s-j|9g9a$CTAKC9<^T)KT5m40JM%|AhjxJJD5e5^wZ9lH6;NUG4PjWlQm%G9w; zS0)0Vgk3pPW}T)A*iWlj+Jii^l}}%6*p|bydsb1Vqa7HIYwFPPa{3%Bo*?kaIR#&# zC+$cqGqojItU0H^US3aEI@MTqp;4|2W}bMg+5w!74S~JyB&KAg5#KDS*-1BN0!7p? z?}pTF2a)ovq`H;38q@(IH`P=9?m`3F2}&}##5D86gOSCa&q8U~V$M-gGP{F=l4V=l zY{EU>?nI^ya?K9j=4zf(%O*M+Xj{kKnHKmbzAe*&ly~}X&3MN^nK}_f(dD3d=<~lkHzR0pO2lko(gJQF`FYg5`2%P zX$#^TXHR#*s<6bWSV(#GaGAMq86R+vMe-l0x1X%bPi`3?@!E9!k7(%}R8XbPH76NT z43&e~=w#rC{Yz%zixpIt&+ZP|49r|QG76^N>v3KVaUiSO!N`}Sf55Nd>8KqbY&9aD z;Eg6yo+jjJUBPKt2}!z}Di!sZ8;tZ0FKB*R>-ugd5}4ZoH1Zm^2tiHNtd_4gT=R93 zNS<`w<@5>Ivb3vDPNFT-DeLYg)}3!t+R@Pz(Zj@GtF{Uz z&7VH-R9QTWdvMO>WfS(81ol=Eevwieny>O!CgqWboGM=^YNL=)^pM?k%k`>bXRmVm zk7RfnIQo;DRksAup>g}tpPv)-wm5X>nHfReG+RCYB;}?=TRF3PzGSxhIP}g|%4Njh&R+%vs+ca-VOpxhZe?Sj0SUsUR9BSWuVg;Am>2Ycl90jWQ2^|^s zMp-0F>kWvhp2!f|NK^w!h`(}Q(hh~;zINQBry{w3JMi|9wG;BIbvkon$J_K6`KG~q z_pqvg(cjy46_fobkCM1VZZ($25SM+mqqJy%wf&=Y>4w~7x$WIC^3qqadDql_)l7Aw z54EmY^=Lh^Qpta57%swKO7<=e%d)C1T}=~z#)9cK^=TbBsU&br6Q4ylW&xrH1k2#%L{d1uLqSSF2pZbf2M20lz56SX&RYR6oOdpL#h zhgnUC;lDGD$!U{w$le&S#5g=oY>3iUsO^$JINXz5}- zL8Xovdpw9;21}J0gffkW`z+-tY3O!d)ft8XB-{OcRO*p)Rdoa-SZE=we!; z(xvDEnPYI-hyokq?q8zPqiPL5`o-mwRxb|!o$QaH)F#hqpv>Q|VQaU6ahstzVg=YBipLYbm`(YSbWUxXIkr=S1QGydfu7-DOtv8}1Zrug|wR zV@mOL!vIyf^=;>S(ju>HiUrJiFL{pEKGv(aoZ~gQL5fSCCDdGT7)w);@~D<$1;sVSw=#vpGL5w^A$wLQuFP*qjsM+v-gR?46@P^vWV=LJHf^AXw~EZv4(kDZo01=GG3C$hAgL=Ny)Gr9A9_orAx9 zd;{$K`uzsQLGXxyBhXMYz+dGL!`kWA4 zlYctiv-9)hNCo*Qxi0z`)FC-!*K?7hl+;Hahx`>}*Fss4t7Z!GCDxPro5ozB zr8A@#cRSvS!F_KjzpQXv+kt4z2``kX<&zPK7gZdTCTGHiQIxHff;^$jGl;flK@&xq z^uqMR*b{FE19iq#CjvTmNrmJ{n=*bMA64#1%utO!ZBWN7>Z7ngVHg;}M?l83LW$?vQ>-Jh`j0twW)_Ec)vBxiFHDBDx{b6o&>0M8U-S!f)B()55YMxzJ?qPbDxS#Ngw_L1g%52X4k631y zIFlwmmkSi;%PM;$*_>_+J>QqZCX%kCMl2apM058y4#nHpPiDF7UXP1hqI;jFyK#R% zKnw36peA4!f(Xh6A;ngp*v;3nF=$g(x)Og{gtE??uf+nr8>)=k2XPaJv(igHOoVtl zF4UEsxu~6-x!k#hg0-m2`Ts`|2+<)N`k@BM_=qOF@k4ih;CQ_8(#soA)#P~IZJcrJ zd4RPx+)oxKL;XVKLm$Sd^pW&{cqcJynId@ji9IA^)s?m>=`{UE_L?;yanxUEF& zmXNdUG*ho1prW8kP1*gOGD#2pQrz&)U5zv!yw=2HpD7sXK4hj%{YOh?llD$h096 z*v*Y};6QFan=I(TJvcXdjbeUUJJ4Mx)d_*tLmTrA7TsRj_({*p94!&dTx1;66gCgrIF!h_*4~Zj4nW|k+ywTheXV-b zoSJNG$mb!Yn7-3WSkgzaez?NQ5vfcE72fud_rAcyn1X$3y=C?bsEX$++D;}ic7_M) z3G2~};FPvJ0Y!`2OLHk2?$%Ej$w_%r>e(ZSFO3&{6a|DvwsqQv4peRc4b0)C!QTsS zD{XRF^ljZ90!Eh^NHQWqY8dq65fs+NCq6hD?w_q zZ+tx}>(UIpESa^s7q3|KmKefMrU*+3nf5TFvT z;FA7|uE>lSgD0@$SQ=uO&rN-4W{+;DCL(PMRB3isZC#r#WfM=ef-#TbDD3a=wktmB zd>HX2t!!wSz(qEr{#*4@i@I<36JQyRo8%H zO5t0fUMRek3hBzi_7tnTC4)#~EaT(KVq(P@#I8Pl>OjByS`EmoC>=qZXB-d3;BuQW zfdd&tPS;LBn z6}j6+?P0lx_Yz`e&Q=y?&71DaDQ%tZC8!~`U}_9+zymnfx^}954{&L&$a@h*08>Mj8ntm8t+e~)b z`}>Go@}2yH8Wiu}J$sX)P)=N;#)pe@BpXLgq6T>0XVURj{dYcO_E*dc2YoeD77FMW ziJFbw)=UNCb7hdzgGsyP7Q{4pzjNS}@NRs~Z-Qa)egC*1KrNZ(J3~&@KdPJ>eHcBP zmFx6dAbN8Lm8ogVUZ{z_bT;crTd9V^I_aS*X=Mk?oPAXPXvu1!JF-$efTF=&A0`%& z?lNgqr@$0P!R%45<o{Jw)(8Zf9O;5J8!W}mzjqedq5b;k|#~s`q z-JzgXJ6EV|->mr`^f^^9a%YtmuN*9tX#Ne{+jj_?lhiH8cB#d+r(*@|h{(b* z8bw8sZ)|zwyO*=%Tw#e?_@dB`^qFikWe@k)0o9KRNV@;@5BmW|~(ng>TaRi|C8i(5!11K|<1Hq6SD=bAc5 zwLR}^))V8^x%_hKgVR&$`Dwin(qBZw(;D}IX-l46i?IwxRmSb-N?!MtK%dMM^*awO zVF7uMm!>D~n~WoK2-KC)?eD31uP6`GfKFr}H~;__z`q{5KVk{@s(^_B6ay{>lxYAn zFz5i#fkFdE1{fOXRN$!~)B!02(|RZk*lM^{z^kB`{!#D{4-n)iqZ4YJ{{M&0vptq->&EUtHLcBLTxY~(vwsjhK zcDfNAvilx5D9VH2x0mwzu})76p4fr6R&L_pGyisag2=bQ1Rc2F#tO(ApkWN3&gc3E zR(2O5Z{v=Do!#m~2=_hO1kl0hv3)?MH!5gT1{vJ&}IIf!f*P_Sxk87Xf~5_7*n*Pli^0V`Ip1Si6HJabg3IJG(f! z*Wa~&N18^0Ha34@Ngji{+CRRBo(cmzJqLIZQyJvF>80Dl+JGmSw^zX6gfwv#1iSZB z9q`+!2cf6nPOm}7TpU`Qfp^peT1MQ1$}@3I2ui2oOpVoKyc1m`VUh;LP_e zkoXtD$8!u*a8TlN!JY51ehLHr6&Ns00D#z2{gbAOqY44XhmwvSCKOy4CphQW|C53U zd){Ll|Bd}895_rUTri<1viOG>{LLR2|A!n56CwPsP%&TS@lUYm8y;(fp9P8p4mDCN z0!rqiZoB+12m6G-Ly^$YLcxNH#RbO-Q^!gL`(U2H(L#|73xNx+Cb|)A0v7_1mkSOT zDeQJM@^Jnrh@hM{ZJBdR(qlVf@VGb-3BPNkM~b9 z`~(mHHw-mmDALF&Kiy^nq?-LF8tk(y!*SiNGa~?(;2+XBzl>by#8Wg<4+&^RAPo;-H&#-@=A{hr2z$@cdtiF~#T4M9l#k z+gjgghaJB+hoX@4RnV`Rc;8IWC7N&y(xmqEOvWJ_`lEKof~2C5TNL14?4KU>L`}}U z+wGm#{?6!N$H?wFqx~E^cxWf^A=w`wip&H(F@+v*TNHe8GP)eGbY#cP<zD%kT3mKbLbg>==b_v4lZg6233&QrxDVm+NWmxBim-k=~4z8l| zr5&4B+n2X8cI=kc&Nc5;&8*KBI{S-JvM2$M;^}`qOh9%&Kt8=%|G@UZ`AYYyVoZsA z?TV=J<~!P(r&xcWz1_R&oo$3vJ&5)%e#)Obyg<8o4fb)OHq8m6Icc=rXeeo}{JBZdO`ps;tT9*^$Pq7`R!{QcvI$mmykJ-gA3m3x=(+UNDNZ&KGSO zLzg;07m-UxS-pDWE6xAh@K^5mpXFK5ZkgmUqg*i3Wp0S}EYaN^p?rb<0MkIUt(*Pq zI=-)~+aPe!vo}9W`J`UA83$+PO3;qc0P<*-=VWvem+_x)8>J}~E5c;L^RA57MZHqA zOSuIua&kF8ExN}bDz^W*zonXZJNWx-=Cl4+a2H-0Xn{0KA4sLEo|TdhJgy{c=E>Yk z#B)arA&jF}eOYrQ^XAQ)%Ify14^qZq86p+)b^w2JmV>}^;ivWL(#m(B815eLYF!CN zz|%VxFI*}lx=*CX5rOD}r!|G5Q{vAX25FsHv1Xaj{^ zYC1)Y*ynrJY?E(vP=msTY}&lR9qgzc<>00oSLpB%@3|^g%l$D%xD`;{T$;5+G##gg zr5Dg*zO^_9@8M#z^ML&2pKbeW)t)j$|v;1em&BX8BFj zCdBB}$G@$tXC-Dy?jy!g&XuzI9e7q9vY9(H+~L9|Qu6!-{VGH)8Lb?#@i3^F)%7PS zac^+!(H;2r=Cvm$4?}H<9S85?@a!5q?w0S9tGzgMoy@g>H!*~MB?{X67F&$MHy1i` zWAEDFc~-sd$iydauhOYT5rY&WHMIm&*e33J9R$BP&BJl<=qECq=W30fvaf?-M?-xQ zp|g%mUkUMk71>uoX=dyjd!sS^bt*(DX`}o+;~KC=fMkhF-A9T~Rw97{IAS;8_X+GW zkCJhTUD8*xEg@<_-}AY{a6aUDDC#(MRp%2N?Hj$+-XAcV;OQ<@s;IH%MtXKEolPyK z*|zZF{h~!h;JX}fKiDnvPo5K1KpnfFMCyo&%#NAn0Z~<^};=N z24%2tkV-V3xFR3+9{bFK7B?(1mi@GmRVRUUdg~^EBUB(gb27P%=xAx|o3aA)+DeqK zs8t#yPfJ?XRxOQW(%P^&Lm#Cx_<;3hmh_RMZKii_;DpmKF1cHJd>K@GCSi3=eQ)16gwOee#q_$jiZ|a6hYCSz z0)@?Px?>}do)Q7Sb(q)2#@grTTy&7tIuvTiV?76z9 z6ifUwGs3s0*4V$=z`C?O$=Z{Tf7*lekE4m@dKbgHkd$$})+;=3=3Zn?-7_Ptm7P_@ zek9_Ot4{<6WmOZ-@;@XaM6PQKryEVB>$K}KJ-tkz)9(HKgMtx4f^_*AA<J3%bMhC({|LMb^YehfI?^I@ghzCD+(ePjAnp~rhnLIUs60jEEREPSyr4x~ zXpt&I?I8DFi|Ck{8SP9;mZqf@rKK{Gr-{b!Vq3(EWx{+hUuVZRpN-H!Vk|cdeL`N3#XU#jKL4Q+w&v^}S)%hQ-on=s6VV1QaL4yZ^TX1)G z2=2ilxVyW%yUWGh-7N%ncX#LFdV!DbnfbbVW~xVO>PWr!*LrHzIlJoAefM6C&bYGD z?{X~Os~n%#2_VyLYsA5?(o7upPlL;_qhO9h-HpxAomNQV#iLD2iS8Y8>*R*@RqpuI z)pe%0E0tuQY8D${&-bjznk(66|+!oZ@A2&mm>*FPa7MP*o3 z{xyx+RpLP3U1fPXt6^2-{af)l>CdyRRZTH?soRCifZs!tkfX^? z(N%qnC$Q7gbUlDpKdv$URO(iBR!y0iB4tTdYUXJXB1j?N=JWIzL)g9Vh`Hs9$Rs+~4JfXrdO3}$cIJAlcQUxWU=|o)k|ZaWX5O;PH}~DUbpqfWe|kDTakxSIa~>c5 zw`Aso^2SP%f7`rRQaY;AEPx5cn%&kffF!gi@ z73#nq{mL;hf@#%#=o6`rIWIStq9of8#}MSr`pe2qa*$&sb~2<+ z0@)Tkw?$98Y!n|1dzbzodZN3EkxR1hYag*1wnXa61$?|)A& zx=R<+x%LrfVN;M{IdpWF_)OC-0(4jq=ZJw(7 zBUWbu|E=3D`|aQsW3yziRpZQzv)+)ahJ=#>+yWmMFyYAxzp1nNy-Q`L=_4UWRnyGH z?B-<~$NPwzsohy-(#Oi~i=C^(Fxc}SUH!SIQ6_{+C)~tcJ_QZXGdu5S3<1i*q8&$y zT`H{&A362?Yn7aQ9Hf|%T*fj;#TgO9D2cAOWucFAvCol&+cmG@I&i*>bC3NwW0Q~< zqilk_j#>`0a^{D(gOiqnQ=RC2GqZCe>L_OnO{LCT7WzvjI;R(|#M)(M$b=&M;y=O&{&&$Vs&^xBZIeNvmf6{;AZtX{g{sySf$fmM^2G}Sc2gd zJ2}b*fM1lC&_i??jw-eNomColDC_f`<}9t4b=;=R$EaZQ%}cZz#JWA^b}?P+a!+Cf zq2((gVUn0N2!Fr_S`KonGd?z!RC~g>U)TOcu8poJQAP881MKDZM#SK~AEC5tArMT_#{eJ*6cL*`iUO^%@45 z%88qkQl(Ya=8rhCFPzjI83oX9Z}Ump5Am%gET?rEk^44@bF43Y4UDBDa$N#*K_Z!;IaU(vzpyH&bd$f z`*4w5?STnSO``QSXj|vSC%C}%$+cQ}M9zr%>Ii_=%Ec>0DZ_@`pfO;l<9zUTS` zHt|b%JDfS3?VaudEJ09^JvfSbmPq?`y>jls+#fd}Tl$*l9}9l`=tQm`%!QTTqgq9K zO~n^K3ubpVmLuWbEfh=hZ!oY7Xz@K(G)i5U?vH=`)A|-cnQ+7l(27?xP;>vT_v z4gjFCIy~g1FrP)J0~i3ZeLp_!(O_-l3whU=m!sm^z--HVNKW5eF@{>g{Q*}Hp~p($ z6|R>5gP)RT&{d+hzs~pbTd^yVwMgxE9cV|THU~Z37U*yu_Z!J=jFUc?jZ|5c+bh@Q z$reH;GrJ`HAdE)$%U!}ivgowZ6^}yNQLw?aK$1%rD}1oif_X`S(~-Cq5u@^ zm$-&vHu0X;rnhiAA1^lBie~kvhGv{fFtp)|2J~hP%mmfl-vxQd%{c*uW7WW{iU4J7 zS)Jd1HffH2XvhhU)^Ut=zbDs;Iwk5@VV+zs@OjYR%OD2ijF%&MHIPKm5oJ_2qnV{% zN_$Ef)PBvP2o&*<0&4>yg@!W2cg<=%m)cyg*1#MW)bOgF5b)=kWM~>L!%K~?YSku~ zaJyB%G4i?0wJRNSoE1Au(Mm*JSHST7is>iPftonDBbNNNoN4V6sRN$NpAkx0Igw6_ zaJamizLACk{h$yqX*tGc;SO@Q>oPZIl*TR)v;{}PqVbbJGM7X$Jt4)?;FPQxgPnx* zh~<_zn`^mbk-{>W?77tLoPxkhpb=;+jN__hOg_Juu&D2t*YBBEaWb-i!)ZhN=CnCI zkw-!RKP9pUV|qp69?5(5*-+Ggd}ncx{ppbMzSpqs1x1--!wy1t>Ef1!^KE)(fR|hl z8SuGC4~)|59rON11qwL>?xeEhM2xkMfMw8tj9N9iRI{K+k!4aZs7a5YPMbuW5*`-d zdSvj2%@W@t*vGl)O(Xn-zuGUE$y~6poe+nVZ1?ve$B_zhh&E{z2nl3o#Dv zr&0(HGsZM#OSvdZ1zc_mF@p%6-}=}R-+Q)soS~5NY4}DB?~{fxIL~~KQnJ(f=s3dp zD_=3WPjrV$+Xc{lVHg;Ro%Hsx*H_ew58|nPx=HRM5xj1#zNW`mh<>vdyZrg2=4p_5 zsNpy;+TMu8ONOMrzLiQHc+8qM}m0(SOb?i;}T{(R;;5o#~JY@Oht;7W9Xk zw4UB-KLtrU+Z5%#{tlh`ffq;1SWJXdNt7`%dt_~JcNh67C`2Tf6Jl7jC&8n;ZhmLJ zgjgHq7p-{r?X3?-%{FMDn(q+O@ir~R$sGQEaq5M+Pq?WLs4J$DOEUb~@W=iJDa#kj zm>p(&wKxK!E7A+;UrQ0O;x(vrP~r-2@KeqKqZfvi2l6Anzyc$+sW=5>pJ5p(rzI1_ zvADQ}j!@kBuitILLMS*wi=)~4PeN|?e8;{=hCQQXCH1YH{!;VffKksY=5er(EeN34 z@=&9tm}Xt{+z~UIY%xDujiDM@#funhDD?#TD{sxZdSnaOjoi>L)0gB!a?labNUiSo z&qg7DI~nStEDvi%O(QLCZ}7BJftiDAB1h$%vsjGZ6is=J3AR$ z-ZIN^&B$wLrpP~a*Fs`ZYxMp1-BNJT=J!Uk=i7J%Q2J)R$OAZ2??BapgrQ=DaFDIs zlC#nhR})*%RaewUE89DJS=Eu;m5h!x3#T3x-gL6$daWxXb^3#iiV92MS($@u#JQd0 z_VqBp91EPDAz6K&V!r;{Sn)G6^j(wo$w%}HXMY8Wi?V?t;l1pyxq^%J}lNPqz;o) zc!P^~l*Au`*OYYU;_~Gu06Wen^`?9h<|k&J!BKf24)5XHMFLa=?i=4bRuP>*PR7wz zWu8VUFVlE42Maaw`m~I|wIgxHJmF7gkynMX+sbTlU^L_(EVVVu|Y@QVdtXL?6|SSqWm)3x8soVUByxRFtF-oPSi#wwk>y@atqA0e7Z|-e;2zzH>2SgXiNU^&bfHSSRA2ai@j@bSFe6qFTKG;7+eNHFJ zbEhd_9~`ANt>}IoM9$*1`MLR}h=xRaY}Jxd4CCb+0uS#FMtohZ7d#y>Cov^hL`NU7 zmF?OCeo6Tui0*hvC7&;86-tM_?@QV;^-BpouXJR&e@Af&?T##}ffoXkxzAfDgpO~7 z{x?JC>?LiIA8}9LpTI~(%A|<+ zUmsS~jNPuW@yi-3Ul=uCJ`Ydhi>Ys-J{b^1hnpRxtcdPfDR|Q}iIWdt?B#J5&*rA- zd#%WCHa2I|CQKle7ZV+qy`2Ya&x9 zxszekI)NX=FTxS2s2{yd3lm7T{8kBUvSfs@1TtZtHyXG5@1z1T_4wV%2oj%hy#e3W zIGbbh(YJS@(F^BNTY3#^hgMeC6pCxzN0I|u!d-Xc9aZFWzV77+X(vF6QSr=yVfZnZ zE15SX_!0@CRwY<&94mC^5DDt0+<-QOWET1pGB^VqOa0u0P1;_$uhg5t6vLzE;!pPo z4Z>2F*Zt3nUeIY#%SULvAy?$caS`fWX>bS5&B84X^i$O?r7^>UT&3ag4p2+OgOt9C zE*Mi0VNfoPIpPLovmWf)kQB96xMm-@c~WJ1L*2P0-&QPXv=Z9mA1tG~4wG*XHr`+H zr0ejqiF?=sgi@^;Q=Fav(@#`c?W6T38Jx2TNL_iXrr#8wb=g}cuSI5L4aNrDcSU2? z4Apt2ft|0Ps4k2{?Hwks9d#`)=^dPFA<`MwLiW@wWznz-u`dvSh?m^Z=rv0l+q{5n zv|b<8aXMr8iKAuXa7m2nDnVyBh~@hespv@}dunqx=O~~$33^3sqBmBAFfP5mhUfq+ zXC_8}37*Y!(eHD}mK3^^)u|A8O}>YSCWm80pH7`4S5T|^PF3kc@0sKQ)aZ5;?d#C^ zoM4LS^201zXib2sdeR;9my07Ka5*Vb%Mx4V^g-DS3}H)jU{jqY?4^XI(UVzYa2R%x zrB(7R&&xnvC_cFNRTQKPS6B(j{Ia8XV{VfYc#OT;J(}j!A;_zasDEw>-$HyWUg&Ft zY+Vg4t&M`4sYM)jOiD&r#E%hS%67K4^Mv1SsGzH zYE2}p8}q$^Q!?$s0wJ7H&sbsl&PzM`KMB7WBTaf%dh8`CJtNd)S5`;^!Mmz|I=Kv9 z;7?K~P1Y$@PV!tOMzl({2liRf8m6r7uVhJ&*u-@b| z%uc7vK}h%oXCn3s6&*|rZqYG`Y4*J z&Mps)#LXi8z5QUuMwlhJLnfeuE~h;lw}dv=!#ax5BGDqsi$$(X9Qh~Hj>Q?IPv4wc!Q!@Uo8^rB>VUJ!M-^K$vFilM*=D}Du|*r z#SJuU`SD!w0Y5E}gPBb(3{$7<3~mM1pN*E!Jx*D;T&STW$M#Ys0th5Wjki6wVU4yH zc;M_C<}Gk!BBp4{baqTCSZo;hMq65yKv?ey??{BqNP7guO|wZL4#f&u<|Oy>S$7!7 zlDR3&)0W@UTt>W&(YJ(^(U#XJ8o3G3JbGNeK1N2`{4M3c8!(UTN1Q$}HGf|>%#7HR zw_g9g!K&-;bE98wnWEEBfbbWKS?7tqab9X= z%cfFKYB%z5m!xa!Dw+KNHAn1U)}$HJ>SVFQnyOj){>D$MQ!$1_9HIrno4Ncr^*MF0 zWn#HAuSVd@2~d?4_}_aD_ztL#^^kVfM(MtS%wXD?lXS{osNrL~Vk0w{lnFOIKt)8D z`N~F*7{R%YFZuMfpH#Qj$q1$oV90f?wgjHYS>v?eTD9Qtv`!kcJ8rBO$WSRt-_~Y7 zI}bvKFT3d!-BU<@Ww3hY!y|`5fBJe<@fyzh zI3o&Fe&?k2XUW@iK=ZzJJ-k;BXndMRxS^r5`y?mNNhU@gxc*0ie zF6HDVJbYEVuFANiX00jJ%5$7eRObctth0C^aJ|2QIzbL2>KvvpVccZK)$8|8Yq<)J zj^W|sn~?iK0tAkwZc^oOR-Z3w>weRh&&fvZmZnzeq?S9>DJQKC*ZDi1l}UNN?aVRS zb!QyQy8B6e&$oRWl@+kzaoy--$i)dH2e!U}<1x`9y_8Y(BaP8g2wK&b`yO?Ty@x|N zkS1b@MSX0tk8JRE%1a;2CH^snl=JvRmy301Kkms+jMZ07;0o97gPE<5gzbVCt7%F5%A{*zsxjIk5USr)9##c}2X(aJsB zV8?)epR*A*^o!_2#4@(Ta}{ZE{5^qbxHQ2R>9!TD$n;f6^Mx!dwX$!}1Ks11j0tl* z1?Ci7J?sXb+92I4Z8;E6Wjnfdx2ZfPUP)s@oIdu4;kQc$Pu?`%07>A}@q;7U;Otm! z9&5YoC#xhMtmn`fB3M=kc>CPv{w0{qypyNh?dv8rEah(O zwnXoLe*^WO*4;^6Rc-F1NoZ0QQlE!Ik7`l=3?lGWOl4NS^)A{6f?lm{X3sKj>y#H8 zKc-{)STn6)3N9=De2ffASdPRQKxH4?*=USd;lAlvdlsLbus9pmFMnmwbtrX2^Z!+A z=SmtNi_CQEU@Nlay){VL+HRp%_X9?qmet;ZYML!;xJ%u*qs$b$Np&{OjY`I9x~rBa zA3k7 zWLdQ?I*H+Ak?T2flC5&#l`-=#N}7N4X^dpvDNfW}q2*cFv`=+pMQ3Y}R?1=D!Es=(Av*`0Cg+6=BZ%9C*h8s+|LIX8A1)ROX7 zD}$9|@KTr5I3LK$e&yl9InirlLtsa~WU2hkHg&*v2~74o=UM}x{Fm{DNmpYe>(ssV z@};j@u%L~bI&RGn!=2r|r-$(ri9YHs3|Xy?r6H7?)$tMTel{~F$s#J@7^eu?;{qF%p>p2x(GQp5@eDSCwdo2&O#O;mmmQn1(M06a{=}vG zQrPE#UnF(aRhwc-?z%)4`-`h;^Jw-v=LZjcX^PJ!KvKTerRuh>jDc}=gvQL^;%lv% zye^%H$X8dy%!1I!t}o(om)c5(vt^T>yHurGb96sotyygB46orE%!7rtGbbXB@4;*x zAj#_GniNG>MFXEeYqs^A=N4}mL8bIx+^B*b_-JOg?q;7XkV?7}xL-LHiDW(L!;bSd zxUht*a-_BKm1|Vet#OAY-zo`1GM|6^P6BKE#Z8{&C2m+G4eZrCjdSC(P`AjASHEs- zSw6`E^15*2b1e{HCt`c{XxAOP8wnX|1z+a0hlsVuv{EnFX4*Rm^pT^CC;n zkMnWZPavkU;Mz`7NR+lYOnAc_A2x7YhdHAP55fSqu(Ah+SEezMb#G4Dza4*OrQ{At zV4-aGebF&e-EWLb$8n9~moo{34A!xu3=_)|iC+5&zlNV${!Uu;cFZFjQ=g7ZJu%6R z60DOOWaWcTAaCETwYx7`>^HBYk~!&3D(r_CGI$UZH4v9KGv6W`%^H-EYk_s z^vKk2@1;u6$GvSnXvVk&!w(Xv;EFO*$i>0DPvzswI$k|`jHubJM14*L-?yFI!^i25 z520(C)F+0KwNpjdFitV&R9xM$;5Ykb-_Jq+8>W^|>d&{YNga8P9P{Jk3Bttp6O#wW zF7d`sD`Nd!<(a1pu0aE>LCLL6Q=T)VmNo~i{YPa zPM!1!9J7>p@*$8DHTxRAung=;K0Tb%czd6!JO&wzI{quE1w41TzE9zIcPSHZq0sOJh;{YD5u#r)6pF%w zJ$zx-RcLl?R#w_u8MIy#LEbLpoz=`XuaQ2R*~|FsrpR=y1~E&6&DQmzB{h(wI_V71 zq>H8`E!S4v*nz`GJnz?WznVuI)oy9wx6=tF5)Z%aFN(b;^P^h#S{vQuwvGH2FF-Ph z{%SgRO(M_+{E70KCewg9@y#d;%FWCmVW2Rl3Y{xu%( z!o9Z5VV4rC%7miP0OHS1oFt9<<;Zhji7d=cXA%6IlrQJ6hsjc&voT7rP}$Wuha+Ic zDNS>}azePoL7sEC_5P8|W)xoOw~Ac{FFN zWHPc>4a;mt(JWo7wGo4(^%kb5lmz8#`Bxh@~0rq z!M~`FWn8e&uN%Gg5u7$lKHmij8O73@63#dR8t~H@u=UD1%fIfMJ?4>%SSa5)k%|e0 z5~b*a1*FHKt{(JyJ@np4=cB_lTm4x{H)l+DQ4I`-bi8L%VwMD}qClq!bwgcN9*P5B zcTmm{*?U{fgg9-k6dNUZJ?a7ud3tR&T)g4OUU*zm6Yj09P$!=iT3&;C@)XGhecCK* z3Bv-TQ%}O}Nt^FZSkMzG7Ek{w}5a(8_-#O5$AI)=)X?~03%q~?GD81JX zKJ|Vik#KMs8M>S!Hc+%#yLsST(}7;wqZ}f@8&1U$l#Sx4`3l@-U1vWZGU%x%m4P=+ zprj*?4GOnB;4R}^HcogJl~jEs>o|JyzH-fPH0KB7Bhc4A@uc_6|4zRP52hozT481w z?&9t0H-T~~Ap+*3tW5zh#Ugx?(+tn5>xAD4hu4!%5r zDHsa`Hh7dDnSf#r))cNOcmu>A-zEW$DexmGM`-s?ufA`;!E})sFcD-XH}Sk*JT@~n zQ}UQWZ`^yp)z1j$@8ExH0l09IQ+BSfD>?!L1A7Dq15^B8wg6b!{rheII{(%bK=%Iq3E8%cQCDvGs5D@ zQ?Vk{4#58Y#tVhoXfEP$Fb+0{u5hindulA=@{vil_0@BXa7fA z|Ct=!Kgb#UPvngB&8+_m&A$!BCo~OpOl1TEgW&=LllcFdU|F;w5{(DgW*rxRNP$EwMOQ?UVPyeiG>A!~ij|KE!;6AF`$8#lK_`ieu jC-42gl+zEe|D=@4OF=??=z_s~90nhm!JS2a`St$*0j; - type PayloadOut = Vec; + type PayloadOut = Vec; type Producer = MemoryQueueProducer; + type Consumer = MemoryQueueConsumer; + type Config = usize; async fn new_pair( config: usize, @@ -72,7 +73,7 @@ impl QueueProducer for MemoryQueueProducer { self.registry.as_ref() } - async fn send_raw(&self, payload: &Vec) -> Result<(), QueueError> { + async fn send_raw(&self, payload: &Self::Payload) -> Result<(), QueueError> { self.tx .send(payload.clone()) .map(|_| ()) @@ -85,6 +86,26 @@ impl QueueProducer for MemoryQueueProducer { } } +#[async_trait] +impl ScheduledProducer for MemoryQueueProducer { + async fn send_raw_scheduled( + &self, + payload: &Self::Payload, + delay: Duration, + ) -> Result<(), QueueError> { + let tx = self.tx.clone(); + let payload = payload.clone(); + tokio::spawn(async move { + tracing::trace!("MemoryQueue: event sent > (delay: {:?})", delay); + tokio::time::sleep(delay).await; + if tx.send(payload).is_err() { + tracing::error!("Receiver dropped"); + } + }); + Ok(()) + } +} + pub struct MemoryQueueConsumer { registry: DecoderRegistry>, rx: broadcast::Receiver>, @@ -99,7 +120,7 @@ impl MemoryQueueConsumer { acker: Box::new(MemoryQueueAcker { tx: self.tx.clone(), payload_copy: Some(payload), - alredy_acked_or_nacked: false, + already_acked_or_nacked: false, }), } } @@ -144,25 +165,25 @@ impl QueueConsumer for MemoryQueueConsumer { pub struct MemoryQueueAcker { tx: broadcast::Sender>, payload_copy: Option>, - alredy_acked_or_nacked: bool, + already_acked_or_nacked: bool, } #[async_trait] impl Acker for MemoryQueueAcker { async fn ack(&mut self) -> Result<(), QueueError> { - if self.alredy_acked_or_nacked { + if self.already_acked_or_nacked { Err(QueueError::CannotAckOrNackTwice) } else { - self.alredy_acked_or_nacked = true; + self.already_acked_or_nacked = true; Ok(()) } } async fn nack(&mut self) -> Result<(), QueueError> { - if self.alredy_acked_or_nacked { + if self.already_acked_or_nacked { Err(QueueError::CannotAckOrNackTwice) } else { - self.alredy_acked_or_nacked = true; + self.already_acked_or_nacked = true; self.tx .send( self.payload_copy @@ -182,6 +203,7 @@ mod tests { use crate::{ queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBuilder}, + scheduled::ScheduledProducer, QueueError, }; @@ -395,4 +417,28 @@ mod tests { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + + #[tokio::test] + async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + + let (p, mut c) = QueueBuilder::::new(16) + .build_pair() + .await + .unwrap(); + + let delay = Duration::from_millis(100); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); + } } diff --git a/omniqueue/src/backends/rabbitmq.rs b/omniqueue/src/backends/rabbitmq.rs index 7b2788b..2185f93 100644 --- a/omniqueue/src/backends/rabbitmq.rs +++ b/omniqueue/src/backends/rabbitmq.rs @@ -4,6 +4,7 @@ use std::{any::TypeId, collections::HashMap}; use async_trait::async_trait; use futures::StreamExt; use futures_util::FutureExt; +use lapin::types::AMQPValue; pub use lapin::{ acker::Acker as LapinAcker, options::{ @@ -18,6 +19,7 @@ use crate::{ decoding::DecoderRegistry, encoding::{CustomEncoder, EncoderRegistry}, queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend}, + scheduled::ScheduledProducer, QueueError, }; @@ -89,13 +91,13 @@ async fn producer( #[async_trait] impl QueueBackend for RabbitMqBackend { - type Config = RabbitMqConfig; - type PayloadIn = Vec; + type PayloadOut = Vec; + type Producer = RabbitMqProducer; type Consumer = RabbitMqConsumer; - type Producer = RabbitMqProducer; + type Config = RabbitMqConfig; async fn new_pair( cfg: RabbitMqConfig, @@ -168,6 +170,36 @@ impl QueueProducer for RabbitMqProducer { } } +#[async_trait] +impl ScheduledProducer for RabbitMqProducer { + async fn send_raw_scheduled( + &self, + payload: &Self::Payload, + delay: Duration, + ) -> Result<(), QueueError> { + let mut headers = FieldTable::default(); + + let delay_ms: u32 = delay + .as_millis() + .try_into() + .map_err(|_| QueueError::Generic("delay is too large".into()))?; + headers.insert("x-delay".into(), AMQPValue::LongUInt(delay_ms)); + + self.channel + .basic_publish( + &self.exchange, + &self.routing_key, + self.options, + payload, + self.properties.clone().with_headers(headers), + ) + .await + .map_err(QueueError::generic)?; + + Ok(()) + } +} + pub struct RabbitMqConsumer { registry: DecoderRegistry>, consumer: Consumer, diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 7762060..294c934 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -1,3 +1,29 @@ +//! Redis stream-based queue implementation +//! +//! # Redis Streams in Brief +//! Redis has a built-in queue called streams. With consumer groups and consumers, messages in this +//! queue will automatically be put into a pending queue when read and deleted when acknowledged. +//! +//! # The Implementation +//! This implementation uses this to allow worker instances to race for messages to dispatch which +//! are then, ideally, acknowledged. If a message is processing for more than 45 seconds, it is +//! reinserted at the back of the queue to be tried again. +//! +//! This implementation uses the following data structures: +//! - A "tasks to be processed" stream - which is what the consumer listens to for tasks. +//! AKA: Main +//! - A ZSET for delayed tasks with the sort order being the time-to-be-delivered +//! AKA: Delayed +//! +//! The implementation spawns an additional worker that monitors both the zset delayed tasks and +//! the tasks currently processing. It monitors the zset task set for tasks that should be +//! processed now, and the currently processing queue for tasks that have timed out and should be +//! put back on the main queue. + +// This lint warns on `let _: () = ...` which is used throughout this file for Redis commands which +// have generic return types. This is cleaner than the turbofish operator in my opinion. +#![allow(clippy::let_unit_value)] + use std::time::Duration; use std::{any::TypeId, collections::HashMap, marker::PhantomData}; @@ -5,11 +31,14 @@ use async_trait::async_trait; use bb8::ManageConnection; pub use bb8_redis::RedisMultiplexedConnectionManager; use redis::streams::{StreamId, StreamReadOptions, StreamReadReply}; +use svix_ksuid::KsuidLike; +use tokio::task::JoinHandle; use crate::{ decoding::DecoderRegistry, encoding::{CustomEncoder, EncoderRegistry}, queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend}, + scheduled::ScheduledProducer, QueueError, }; @@ -42,6 +71,7 @@ pub struct RedisConfig { pub max_connections: u16, pub reinsert_on_nack: bool, pub queue_key: String, + pub delayed_queue_key: String, pub consumer_group: String, pub consumer_name: String, pub payload_key: String, @@ -50,6 +80,8 @@ pub struct RedisConfig { pub struct RedisQueueBackend(PhantomData); pub type RedisClusterQueueBackend = RedisQueueBackend; +type RawPayload = Vec; + #[async_trait] impl QueueBackend for RedisQueueBackend where @@ -57,14 +89,14 @@ where R::Connection: redis::aio::ConnectionLike + Send + Sync, R::Error: 'static + std::error::Error + Send + Sync, { - type Config = RedisConfig; - // FIXME: Is it possible to use the types Redis actually uses? - type PayloadIn = Vec; - type PayloadOut = Vec; + type PayloadIn = RawPayload; + type PayloadOut = RawPayload; type Producer = RedisStreamProducer; + type Consumer = RedisStreamConsumer; + type Config = RedisConfig; async fn new_pair( cfg: RedisConfig, @@ -78,11 +110,20 @@ where .await .map_err(QueueError::generic)?; + let _ = start_scheduler_background_task( + redis.clone(), + &cfg.queue_key, + &cfg.delayed_queue_key, + &cfg.payload_key, + ) + .await; + Ok(( RedisStreamProducer { registry: custom_encoders, redis: redis.clone(), queue_key: cfg.queue_key.clone(), + delayed_queue_key: cfg.delayed_queue_key, payload_key: cfg.payload_key.clone(), }, RedisStreamConsumer { @@ -107,10 +148,18 @@ where .await .map_err(QueueError::generic)?; + let _ = start_scheduler_background_task( + redis.clone(), + &cfg.queue_key, + &cfg.delayed_queue_key, + &cfg.payload_key, + ) + .await; Ok(RedisStreamProducer { registry: custom_encoders, redis, queue_key: cfg.queue_key, + delayed_queue_key: cfg.delayed_queue_key, payload_key: cfg.payload_key, }) } @@ -126,6 +175,13 @@ where .await .map_err(QueueError::generic)?; + let _ = start_scheduler_background_task( + redis.clone(), + &cfg.queue_key, + &cfg.delayed_queue_key, + &cfg.payload_key, + ) + .await; Ok(RedisStreamConsumer { registry: custom_decoders, redis, @@ -137,6 +193,166 @@ where } } +// FIXME(onelson): there's a trait, [`SchedulerBackend`], but no obvious way to implement it in a +// way that makes good sense here. +// We need access to the pool, and various bits of config to spawn a task, but none of that is +// available where it matters right now. +// Doing my own thing for now - standalone function that takes what it needs. +async fn start_scheduler_background_task( + redis: bb8::Pool, + queue_key: &str, + delayed_queue_key: &str, + payload_key: &str, +) -> Option>> +where + R: RedisConnection, + R::Connection: redis::aio::ConnectionLike + Send + Sync, + R::Error: 'static + std::error::Error + Send + Sync, +{ + if delayed_queue_key.is_empty() { + tracing::warn!("no delayed_queue_key specified - delayed task scheduler disabled"); + return None; + } + + Some(tokio::spawn({ + let pool = redis.clone(); + let mqn = queue_key.to_string(); + let dqn = delayed_queue_key.to_string(); + let delayed_lock = format!("{delayed_queue_key}__lock"); + let payload_key = payload_key.to_string(); + tracing::debug!( + "spawning delayed task scheduler: delayed_queue_key=`{delayed_queue_key}`, \ + delayed_lock=`{delayed_lock}`" + ); + + async move { + loop { + if let Err(err) = background_task_delayed( + pool.clone(), + mqn.clone(), + dqn.clone(), + &delayed_lock, + &payload_key, + ) + .await + { + tracing::error!("{}", err); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + }; + } + } + })) +} + +/// Special ID for XADD command's which generates a stream ID automatically +const GENERATE_STREAM_ID: &str = "*"; +/// Special ID for XREADGROUP commands which reads any new messages +const LISTEN_STREAM_ID: &str = ">"; + +/// Moves "due" messages from a sorted set, where delayed messages are shelved, back onto the main queue. +async fn background_task_delayed( + pool: bb8::Pool, + main_queue_name: String, + delayed_queue_name: String, + delayed_lock: &str, + payload_key: &str, +) -> Result<(), QueueError> +where + R: RedisConnection, + R::Connection: redis::aio::ConnectionLike + Send + Sync, + R::Error: 'static + std::error::Error + Send + Sync, +{ + let batch_size: isize = 50; + + let mut conn = pool.get().await.map_err(QueueError::generic)?; + + // There is a lock on the delayed queue processing to avoid race conditions. So first try to + // acquire the lock should it not already exist. The lock expires after five seconds in case a + // worker crashes while holding the lock. + let mut cmd = redis::cmd("SET"); + cmd.arg(delayed_lock) + .arg(true) + .arg("NX") + .arg("PX") + .arg(5000); + // WIll be Some("OK") when set or None when not set + let resp: Option = cmd + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + if resp.as_deref() == Some("OK") { + // First look for delayed keys whose time is up and add them to the main queue + let timestamp: i64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map_err(QueueError::generic)? + .as_secs() + .try_into() + .map_err(QueueError::generic)?; + + let keys: Vec = redis::Cmd::zrangebyscore_limit( + &delayed_queue_name, + 0isize, + // Subtract 1 from the timestamp to make it exclusive rather than inclusive, + // preventing premature delivery. + timestamp - 1, + 0isize, + batch_size, + ) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + if !keys.is_empty() { + // For each task, XADD them to the MAIN queue + let mut pipe = redis::pipe(); + for key in &keys { + // XXX: would be sort of nice if we could borrow a slice of bytes instead + // of allocating a vec for each payload. + // I bet serde allows for this somehow, but redis probably ends up allocating + // before the value hits the wire anyway. + let payload = from_delayed_queue_key(key)?; + let _ = pipe.xadd( + &main_queue_name, + GENERATE_STREAM_ID, + &[(payload_key, payload)], + ); + } + let _: () = pipe + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + // Then remove the tasks from the delayed queue so they aren't resent + let _: () = redis::Cmd::zrem(&delayed_queue_name, keys) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + // Make sure to release the lock after done processing + let _: () = redis::Cmd::del(delayed_lock) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + } else { + // Make sure to release the lock before sleeping + let _: () = redis::Cmd::del(delayed_lock) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + // Wait for half a second before attempting to fetch again if nothing was found + tokio::time::sleep(Duration::from_millis(500)).await; + } + } else { + // Also sleep half a second if the lock could not be fetched + tokio::time::sleep(Duration::from_millis(500)).await; + } + + Ok(()) +} + pub struct RedisStreamAcker { redis: bb8::Pool, queue_key: String, @@ -184,6 +400,7 @@ pub struct RedisStreamProducer { registry: EncoderRegistry>, redis: bb8::Pool, queue_key: String, + delayed_queue_key: String, payload_key: String, } @@ -202,11 +419,78 @@ where async fn send_raw(&self, payload: &Vec) -> Result<(), QueueError> { let mut conn = self.redis.get().await.map_err(QueueError::generic)?; - redis::Cmd::xadd(&self.queue_key, "*", &[(&self.payload_key, payload)]) - .query_async(&mut *conn) - .await + redis::Cmd::xadd( + &self.queue_key, + GENERATE_STREAM_ID, + &[(&self.payload_key, payload)], + ) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + Ok(()) + } +} + +/// Acts as a payload prefix for when payloads are written to zset keys. +/// +/// This ensures that messages with identical payloads: +/// - don't only get delivered once instead of N times. +/// - don't replace each other's "delivery due" timestamp. +fn delayed_key_id() -> String { + svix_ksuid::Ksuid::new(None, None).to_base62() +} + +/// Prefixes a payload with an id, separated by a pipe, e.g `ID|payload`. +fn to_delayed_queue_key(payload: &RawPayload) -> Result { + Ok(format!( + "{}|{}", + delayed_key_id(), + serde_json::to_string(payload).map_err(QueueError::generic)? + )) +} + +/// Returns the payload portion of a delayed zset key. +fn from_delayed_queue_key(key: &str) -> Result { + // All information is stored in the key in which the ID and JSON formatted task + // are separated by a `|`. So, take the key, then take the part after the `|`. + serde_json::from_str( + key.split('|') + .nth(1) + .ok_or_else(|| QueueError::Generic("Improper key format".into()))?, + ) + .map_err(QueueError::generic) +} + +#[async_trait] +impl ScheduledProducer for RedisStreamProducer +where + M: ManageConnection, + M::Connection: redis::aio::ConnectionLike + Send + Sync, + M::Error: 'static + std::error::Error + Send + Sync, +{ + async fn send_raw_scheduled( + &self, + payload: &Self::Payload, + delay: Duration, + ) -> Result<(), QueueError> { + let timestamp: i64 = (std::time::SystemTime::now() + delay) + .duration_since(std::time::UNIX_EPOCH) + .map_err(QueueError::generic)? + .as_secs() + .try_into() .map_err(QueueError::generic)?; + let mut conn = self.redis.get().await.map_err(QueueError::generic)?; + redis::Cmd::zadd( + &self.delayed_queue_key, + to_delayed_queue_key(payload)?, + timestamp, + ) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + tracing::trace!("RedisQueue: event sent > (delay: {:?})", delay); Ok(()) } } @@ -260,7 +544,7 @@ where // Ensure an empty vec is never returned let read_out: StreamReadReply = redis::Cmd::xread_options( &[&self.queue_key], - &[">"], + &[LISTEN_STREAM_ID], &StreamReadOptions::default() .group(&self.consumer_group, &self.consumer_name) .block(100_000) @@ -285,7 +569,7 @@ where let read_out: StreamReadReply = redis::Cmd::xread_options( &[&self.queue_key], - &[">"], + &[LISTEN_STREAM_ID], &StreamReadOptions::default() .group(&self.consumer_group, &self.consumer_name) .block( diff --git a/omniqueue/src/backends/sqs.rs b/omniqueue/src/backends/sqs.rs index 5f79ea0..5970807 100644 --- a/omniqueue/src/backends/sqs.rs +++ b/omniqueue/src/backends/sqs.rs @@ -25,13 +25,13 @@ pub struct SqsQueueBackend; #[async_trait] impl QueueBackend for SqsQueueBackend { - type Config = SqsConfig; - type PayloadIn = String; - type PayloadOut = String; + type PayloadOut = String; type Producer = SqsQueueProducer; + type Consumer = SqsQueueConsumer; + type Config = SqsConfig; async fn new_pair( cfg: SqsConfig, @@ -221,7 +221,7 @@ impl ScheduledProducer for SqsQueueProducer { async fn send_raw_scheduled( &self, payload: &Self::Payload, - delay: std::time::Duration, + delay: Duration, ) -> Result<(), QueueError> { self.client .send_message() diff --git a/omniqueue/src/scheduled/mod.rs b/omniqueue/src/scheduled/mod.rs index 2212750..371fcc6 100644 --- a/omniqueue/src/scheduled/mod.rs +++ b/omniqueue/src/scheduled/mod.rs @@ -15,9 +15,10 @@ use crate::{ QueueError, QueuePayload, }; +// FIXME(onelson): only used by redis -- is this meant to be called internally or by the caller building the backend? #[async_trait] pub trait SchedulerBackend: QueueBackend { - async fn start_schedluer_background_task( + async fn start_scheduler_background_task( &self, ) -> Option>>; } diff --git a/omniqueue/tests/rabbitmq.rs b/omniqueue/tests/rabbitmq.rs index 8a7efbe..d5e44f0 100644 --- a/omniqueue/tests/rabbitmq.rs +++ b/omniqueue/tests/rabbitmq.rs @@ -1,11 +1,14 @@ +use lapin::options::ExchangeDeclareOptions; +use lapin::types::AMQPValue; use lapin::{ options::{BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions}, types::FieldTable, - BasicProperties, Connection, ConnectionProperties, + BasicProperties, Connection, ConnectionProperties, ExchangeKind, }; use omniqueue::{ backends::rabbitmq::{RabbitMqBackend, RabbitMqConfig}, queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, + scheduled::ScheduledProducer, }; use serde::{Deserialize, Serialize}; use std::time::{Duration, Instant}; @@ -49,10 +52,39 @@ async fn make_test_queue( .await .unwrap(); + const DELAY_EXCHANGE: &str = "later-alligator"; + let mut args = FieldTable::default(); + args.insert( + "x-delayed-type".into(), + AMQPValue::LongString("direct".into()), + ); + channel + .exchange_declare( + DELAY_EXCHANGE, + ExchangeKind::Custom("x-delayed-message".to_string()), + ExchangeDeclareOptions { + auto_delete: true, + ..Default::default() + }, + args, + ) + .await + .unwrap(); + channel + .queue_bind( + &queue_name, + DELAY_EXCHANGE, + &queue_name, + Default::default(), + Default::default(), + ) + .await + .unwrap(); + let config = RabbitMqConfig { uri: MQ_URI.to_owned(), connection_properties: options, - publish_exchange: "".to_owned(), + publish_exchange: DELAY_EXCHANGE.to_string(), publish_routing_key: queue_name.clone(), publish_options: BasicPublishOptions::default(), publish_properties: BasicProperties::default(), @@ -272,3 +304,27 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + let (p, mut c) = make_test_queue(None, false) + .await + .build_pair() + .await + .unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} diff --git a/omniqueue/tests/redis.rs b/omniqueue/tests/redis.rs index a33edb5..8f5f50e 100644 --- a/omniqueue/tests/redis.rs +++ b/omniqueue/tests/redis.rs @@ -1,6 +1,7 @@ use omniqueue::{ backends::redis::{RedisConfig, RedisQueueBackend}, queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, + scheduled::ScheduledProducer, }; use redis::{AsyncCommands, Client, Commands}; use serde::{Deserialize, Serialize}; @@ -42,6 +43,7 @@ async fn make_test_queue() -> (QueueBuilder, RedisStr max_connections: 8, reinsert_on_nack: false, queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delayed"), consumer_group: "test_cg".to_owned(), consumer_name: "test_cn".to_owned(), payload_key: "payload".to_owned(), @@ -243,3 +245,25 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} diff --git a/omniqueue/tests/redis_cluster.rs b/omniqueue/tests/redis_cluster.rs index 0f0f0b4..24683cb 100644 --- a/omniqueue/tests/redis_cluster.rs +++ b/omniqueue/tests/redis_cluster.rs @@ -1,6 +1,7 @@ use omniqueue::{ backends::redis::{RedisClusterQueueBackend, RedisConfig}, queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, + scheduled::ScheduledProducer, }; use redis::{cluster::ClusterClient, AsyncCommands, Commands}; use serde::{Deserialize, Serialize}; @@ -45,6 +46,7 @@ async fn make_test_queue() -> ( max_connections: 8, reinsert_on_nack: false, queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delay"), consumer_group: "test_cg".to_owned(), consumer_name: "test_cn".to_owned(), payload_key: "payload".to_owned(), @@ -246,3 +248,25 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + + let (builder, _drop) = make_test_queue().await; + let (p, mut c) = builder.build_pair().await.unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} diff --git a/omniqueue/tests/sqs.rs b/omniqueue/tests/sqs.rs index f696a4e..54987c0 100644 --- a/omniqueue/tests/sqs.rs +++ b/omniqueue/tests/sqs.rs @@ -2,6 +2,7 @@ use aws_sdk_sqs::Client; use omniqueue::{ backends::sqs::{SqsConfig, SqsQueueBackend}, queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, + scheduled::ScheduledProducer, }; use serde::{Deserialize, Serialize}; use std::time::{Duration, Instant}; @@ -223,3 +224,23 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} diff --git a/testing-docker-compose.yml b/testing-docker-compose.yml index c8b8b6a..a7db678 100644 --- a/testing-docker-compose.yml +++ b/testing-docker-compose.yml @@ -1,10 +1,15 @@ version: "3.7" services: - mq: + rabbitmq: image: rabbitmq:3.11.11-management-alpine ports: - "5672:5672" - "15672:15672" + environment: + RABBITMQ_PLUGINS_DIR: "/opt/rabbitmq/plugins:/usr/lib/rabbitmq/plugins" + volumes: + - ./_rabbit/enabled_plugins:/etc/rabbitmq/enabled_plugins + - ./_rabbit/plugins:/usr/lib/rabbitmq/plugins elasticmq: # Drop-in SQS replacement image: softwaremill/elasticmq:1.3.14