@@ -610,6 +610,7 @@ async def connect(self, force=False):
610610 try :
611611 await asyncio .wait_for (self ._cancel (), timeout = 10.0 )
612612 except asyncio .TimeoutError :
613+ logger .debug (f"Timed out waiting for cancellation" )
613614 pass
614615 self .ws = await asyncio .wait_for (
615616 connect (self .ws_url , ** self ._options ), timeout = 10.0
@@ -618,8 +619,9 @@ async def connect(self, force=False):
618619 self ._send_recv_task = asyncio .get_running_loop ().create_task (
619620 self ._handler (self .ws )
620621 )
622+ logger .debug ("Websocket handler attached." )
621623
622- async def _handler (self , ws : ClientConnection ) -> None :
624+ async def _handler (self , ws : ClientConnection ) -> Union [ None , Exception ] :
623625 recv_task = asyncio .create_task (self ._start_receiving (ws ))
624626 send_task = asyncio .create_task (self ._start_sending (ws ))
625627 done , pending = await asyncio .wait (
@@ -652,6 +654,7 @@ async def _handler(self, ws: ClientConnection) -> None:
652654 )
653655 await self .connect (True )
654656 await self ._handler (ws = self .ws )
657+ return None
655658 elif isinstance (e := recv_task .result (), Exception ):
656659 return e
657660 elif isinstance (e := send_task .result (), Exception ):
@@ -834,8 +837,10 @@ async def retrieve(self, item_id: str) -> Optional[dict]:
834837 except asyncio .QueueEmpty :
835838 pass
836839 if self ._send_recv_task is not None and self ._send_recv_task .done ():
837- if isinstance (e := self ._send_recv_task .result (), Exception ):
838- raise e
840+ if not self ._send_recv_task .cancelled ():
841+ if isinstance ((e := self ._send_recv_task .exception ()), Exception ):
842+ logger .exception (f"Websocket sending exception: { e } " )
843+ raise e
839844 await asyncio .sleep (0.1 )
840845 return None
841846
@@ -2377,8 +2382,13 @@ async def _make_rpc_request(
23772382 for payload in payloads :
23782383 item_id = await ws .send (payload ["payload" ])
23792384 request_manager .add_request (item_id , payload ["id" ])
2385+ # truncate to 2000 chars for debug logging
2386+ if len (stringified_payload := str (payload )) < 2_000 :
2387+ output_payload = stringified_payload
2388+ else :
2389+ output_payload = f"{ stringified_payload [:2_000 ]} (truncated)"
23802390 logger .debug (
2381- f"Submitted payload ID { payload ['id' ]} with websocket ID { item_id } : { payload } "
2391+ f"Submitted payload ID { payload ['id' ]} with websocket ID { item_id } : { output_payload } "
23822392 )
23832393
23842394 while True :
@@ -2420,6 +2430,7 @@ async def _make_rpc_request(
24202430 request_manager .add_response (
24212431 item_id , decoded_response , complete
24222432 )
2433+ # truncate to 2000 chars for debug logging
24232434 if (
24242435 len (stringified_response := str (decoded_response ))
24252436 < 2_000
0 commit comments