@@ -182,37 +182,67 @@ async def op(pipe):
182182@pytest .mark .onlycluster
183183async def test_cluster (request , redis_addr ):
184184
185- # TODO: This test actually doesn't work. Once the RedisCluster initializes,
186- # it will re-connect to the nodes as advertised by the cluster, bypassing
187- # the single DelayProxy we set up.
188- # to work around this, we really would nedd a port-remapper for the RedisCluster
189-
190- redis_addr = redis_addr [0 ], 6372 # use the cluster port
191185 delay = 0.1
192- dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr )
193- await dp .start ()
186+ cluster_port = 6372
187+ remap_base = 7372
188+ n_nodes = 6
189+
190+ def remap (host , port ):
191+ return host , remap_base + port - cluster_port
192+
193+ proxies = []
194+ for i in range (n_nodes ):
195+ port = cluster_port + i
196+ remapped = remap_base + i
197+ forward_addr = redis_addr [0 ], port
198+ proxy = DelayProxy (addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr )
199+ proxies .append (proxy )
200+
201+ # start proxies
202+ await asyncio .gather (* [p .start () for p in proxies ])
203+
204+ def all_clear ():
205+ for p in proxies :
206+ p .send_event .clear ()
207+
208+ async def wait_for_send ():
209+ asyncio .wait (
210+ [p .send_event .wait () for p in proxies ], return_when = asyncio .FIRST_COMPLETED
211+ )
212+
213+ @contextlib .contextmanager
214+ def set_delay (delay : float ):
215+ with contextlib .ExitStack () as stack :
216+ for p in proxies :
217+ stack .enter_context (p .set_delay (delay ))
218+ yield
194219
195- with contextlib .closing (RedisCluster .from_url ("redis://127.0.0.1:5381" )) as r :
220+ with contextlib .closing (
221+ RedisCluster .from_url (f"redis://127.0.0.1:{ remap_base } " , host_port_remap = remap )
222+ ) as r :
196223 await r .initialize ()
197224 await r .set ("foo" , "foo" )
198225 await r .set ("bar" , "bar" )
199226
200227 async def op (r ):
201- with dp . set_delay (delay ):
228+ with set_delay (delay ):
202229 return await r .get ("foo" )
203230
204- dp . send_event . clear ()
231+ all_clear ()
205232 t = asyncio .create_task (op (r ))
206- # await dp.send_event.wait() # won"t work, because DelayProxy is by-passed
233+ # Wait for whichever DelayProxy gets the request first
234+ await wait_for_send ()
207235 await asyncio .sleep (0.01 )
208236 t .cancel ()
209- try :
237+ with pytest . raises ( asyncio . CancelledError ) :
210238 await t
211- except asyncio .CancelledError :
212- pass
213239
214- assert await r .get ("bar" ) == b"bar"
215- assert await r .ping ()
216- assert await r .get ("foo" ) == b"foo"
240+ # try a number of requests to excercise all the connections
241+ async def doit ():
242+ assert await r .get ("bar" ) == b"bar"
243+ assert await r .ping ()
244+ assert await r .get ("foo" ) == b"foo"
217245
218- await dp .stop ()
246+ await asyncio .gather (* [doit () for _ in range (10 )])
247+
248+ await asyncio .gather (* (p .stop () for p in proxies ))
0 commit comments