@@ -493,21 +493,6 @@ async def _disconnect_raise(self, conn: Connection, error: Exception):
493493 ):
494494 raise error
495495
496- async def _try_send_command_parse_response (self , conn , * args , ** options ):
497- try :
498- return await conn .retry .call_with_retry (
499- lambda : self ._send_command_parse_response (
500- conn , args [0 ], * args , ** options
501- ),
502- lambda error : self ._disconnect_raise (conn , error ),
503- )
504- except asyncio .CancelledError :
505- await conn .disconnect (nowait = True )
506- raise
507- finally :
508- if not self .connection :
509- await self .connection_pool .release (conn )
510-
511496 # COMMAND EXECUTION AND PROTOCOL PARSING
512497 async def execute_command (self , * args , ** options ):
513498 """Execute a command and return a parsed response"""
@@ -516,9 +501,20 @@ async def execute_command(self, *args, **options):
516501 command_name = args [0 ]
517502 conn = self .connection or await pool .get_connection (command_name , ** options )
518503
519- return await asyncio .shield (
520- self ._try_send_command_parse_response (conn , * args , ** options )
521- )
504+ if self .single_connection_client :
505+ await self ._single_conn_lock .acquire ()
506+ try :
507+ return await conn .retry .call_with_retry (
508+ lambda : self ._send_command_parse_response (
509+ conn , command_name , * args , ** options
510+ ),
511+ lambda error : self ._disconnect_raise (conn , error ),
512+ )
513+ finally :
514+ if self .single_connection_client :
515+ self ._single_conn_lock .release ()
516+ if not self .connection :
517+ await pool .release (conn )
522518
523519 async def parse_response (
524520 self , connection : Connection , command_name : Union [str , bytes ], ** options
@@ -757,18 +753,10 @@ async def _disconnect_raise_connect(self, conn, error):
757753 is not a TimeoutError. Otherwise, try to reconnect
758754 """
759755 await conn .disconnect ()
760-
761756 if not (conn .retry_on_timeout and isinstance (error , TimeoutError )):
762757 raise error
763758 await conn .connect ()
764759
765- async def _try_execute (self , conn , command , * arg , ** kwargs ):
766- try :
767- return await command (* arg , ** kwargs )
768- except asyncio .CancelledError :
769- await conn .disconnect ()
770- raise
771-
772760 async def _execute (self , conn , command , * args , ** kwargs ):
773761 """
774762 Connect manually upon disconnection. If the Redis server is down,
@@ -777,11 +765,9 @@ async def _execute(self, conn, command, *args, **kwargs):
777765 called by the # connection to resubscribe us to any channels and
778766 patterns we were previously listening to
779767 """
780- return await asyncio .shield (
781- conn .retry .call_with_retry (
782- lambda : self ._try_execute (conn , command , * args , ** kwargs ),
783- lambda error : self ._disconnect_raise_connect (conn , error ),
784- )
768+ return await conn .retry .call_with_retry (
769+ lambda : command (* args , ** kwargs ),
770+ lambda error : self ._disconnect_raise_connect (conn , error ),
785771 )
786772
787773 async def parse_response (self , block : bool = True , timeout : float = 0 ):
@@ -799,7 +785,9 @@ async def parse_response(self, block: bool = True, timeout: float = 0):
799785 await conn .connect ()
800786
801787 read_timeout = None if block else timeout
802- response = await self ._execute (conn , conn .read_response , timeout = read_timeout )
788+ response = await self ._execute (
789+ conn , conn .read_response , timeout = read_timeout , disconnect_on_error = False
790+ )
803791
804792 if conn .health_check_interval and response == self .health_check_response :
805793 # ignore the health check message as user might not expect it
@@ -1183,18 +1171,6 @@ async def _disconnect_reset_raise(self, conn, error):
11831171 await self .reset ()
11841172 raise
11851173
1186- async def _try_send_command_parse_response (self , conn , * args , ** options ):
1187- try :
1188- return await conn .retry .call_with_retry (
1189- lambda : self ._send_command_parse_response (
1190- conn , args [0 ], * args , ** options
1191- ),
1192- lambda error : self ._disconnect_reset_raise (conn , error ),
1193- )
1194- except asyncio .CancelledError :
1195- await conn .disconnect ()
1196- raise
1197-
11981174 async def immediate_execute_command (self , * args , ** options ):
11991175 """
12001176 Execute a command immediately, but don't auto-retry on a
@@ -1210,8 +1186,12 @@ async def immediate_execute_command(self, *args, **options):
12101186 command_name , self .shard_hint
12111187 )
12121188 self .connection = conn
1213- return await asyncio .shield (
1214- self ._try_send_command_parse_response (conn , * args , ** options )
1189+
1190+ return await conn .retry .call_with_retry (
1191+ lambda : self ._send_command_parse_response (
1192+ conn , command_name , * args , ** options
1193+ ),
1194+ lambda error : self ._disconnect_reset_raise (conn , error ),
12151195 )
12161196
12171197 def pipeline_execute_command (self , * args , ** options ):
@@ -1379,19 +1359,6 @@ async def _disconnect_raise_reset(self, conn: Connection, error: Exception):
13791359 await self .reset ()
13801360 raise
13811361
1382- async def _try_execute (self , conn , execute , stack , raise_on_error ):
1383- try :
1384- return await conn .retry .call_with_retry (
1385- lambda : execute (conn , stack , raise_on_error ),
1386- lambda error : self ._disconnect_raise_reset (conn , error ),
1387- )
1388- except asyncio .CancelledError :
1389- # not supposed to be possible, yet here we are
1390- await conn .disconnect (nowait = True )
1391- raise
1392- finally :
1393- await self .reset ()
1394-
13951362 async def execute (self , raise_on_error : bool = True ):
13961363 """Execute all the commands in the current pipeline"""
13971364 stack = self .command_stack
@@ -1413,11 +1380,10 @@ async def execute(self, raise_on_error: bool = True):
14131380 conn = cast (Connection , conn )
14141381
14151382 try :
1416- return await asyncio .shield (
1417- self ._try_execute (conn , execute , stack , raise_on_error )
1383+ return await conn .retry .call_with_retry (
1384+ lambda : execute (conn , stack , raise_on_error ),
1385+ lambda error : self ._disconnect_raise_reset (conn , error ),
14181386 )
1419- except RuntimeError :
1420- await self .reset ()
14211387 finally :
14221388 await self .reset ()
14231389
0 commit comments