@@ -49,22 +49,22 @@ def set_delay(self, delay: float = 0.0):
4949 async def handle (self , reader , writer ):
5050 # establish connection to redis
5151 redis_reader , redis_writer = await asyncio .open_connection (* self .redis_addr )
52- try :
53- pipe1 = asyncio .create_task (
54- self .pipe (reader , redis_writer , "to redis:" , self .send_event )
55- )
56- pipe2 = asyncio .create_task (self .pipe (redis_reader , writer , "from redis:" ))
57- await asyncio .gather (pipe1 , pipe2 )
58- finally :
59- redis_writer .close ()
52+ pipe1 = asyncio .create_task (
53+ self .pipe (reader , redis_writer , "to redis:" , self .send_event )
54+ )
55+ pipe2 = asyncio .create_task (self .pipe (redis_reader , writer , "from redis:" ))
56+ await asyncio .gather (pipe1 , pipe2 )
6057
6158 async def stop (self ):
62- # clean up enough so that we can reuse the looper
59+ # shutdown the server
6360 self .task .cancel ()
6461 try :
6562 await self .task
6663 except asyncio .CancelledError :
6764 pass
65+ await self .server .wait_closed ()
66+ # do we need to close individual connections too?
67+ # prudently close all async generators
6868 loop = self .server .get_loop ()
6969 await loop .shutdown_asyncgens ()
7070
@@ -75,16 +75,25 @@ async def pipe(
7575 name = "" ,
7676 event : asyncio .Event = None ,
7777 ):
78- while True :
79- data = await reader .read (1000 )
80- if not data :
81- break
82- # print(f"{name} read {len(data)} delay {self.delay}")
83- if event :
84- event .set ()
85- await asyncio .sleep (self .delay )
86- writer .write (data )
87- await writer .drain ()
78+ try :
79+ while True :
80+ data = await reader .read (1000 )
81+ if not data :
82+ break
83+ # print(f"{name} read {len(data)} delay {self.delay}")
84+ if event :
85+ event .set ()
86+ await asyncio .sleep (self .delay )
87+ writer .write (data )
88+ await writer .drain ()
89+ finally :
90+ try :
91+ writer .close ()
92+ await writer .wait_closed ()
93+ except RuntimeError :
94+ # ignore errors on close pertaining to no event loop. Don't want
95+ # to clutter the test output with errors if being garbage collected
96+ pass
8897
8998
9099@pytest .mark .onlynoncluster
0 commit comments