2424
2525import websockets .exceptions
2626from bt_decode import MetadataV15 , PortableRegistry , decode as decode_by_type_string
27+ from scalecodec import GenericVariant
2728from scalecodec .base import ScaleBytes , ScaleType , RuntimeConfigurationObject
2829from scalecodec .type_registry import load_type_registry_preset
2930from scalecodec .types import (
@@ -526,9 +527,9 @@ class Websocket:
526527 def __init__ (
527528 self ,
528529 ws_url : str ,
529- max_subscriptions = 1024 ,
530- max_connections = 100 ,
531- shutdown_timer = 5 ,
530+ max_subscriptions : int = 1024 ,
531+ max_connections : int = 100 ,
532+ shutdown_timer : Optional [ float ] = 5.0 ,
532533 options : Optional [dict ] = None ,
533534 _log_raw_websockets : bool = False ,
534535 retry_timeout : float = 60.0 ,
@@ -542,7 +543,9 @@ def __init__(
542543 ws_url: Websocket URL to connect to
543544 max_subscriptions: Maximum number of subscriptions per websocket connection
544545 max_connections: Maximum number of connections total
545- shutdown_timer: Number of seconds to shut down websocket connection after last use
546+ shutdown_timer: Number of seconds to shut down websocket connection after last use. If set to `None`, the
547+ connection will never be automatically shut down. Use this for very long-running processes, where you
548+ will manually shut down the connection if ever you intend to close it.
546549 options: Options to pass to the websocket connection
547550 _log_raw_websockets: Whether to log raw websockets in the "raw_websocket" logger
548551 retry_timeout: Timeout in seconds to retry websocket connection
@@ -643,6 +646,10 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]:
643646 self ._attempts += 1
644647 is_retry = True
645648 if should_reconnect is True :
649+ if len (self ._received_subscriptions ) > 0 :
650+ return SubstrateRequestException (
651+ f"Unable to reconnect because there are currently open subscriptions."
652+ )
646653 for original_id , payload in list (self ._inflight .items ()):
647654 self ._received [original_id ] = loop .create_future ()
648655 to_send = json .loads (payload )
@@ -659,25 +666,33 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]:
659666 return e
660667 elif isinstance (e := send_task .result (), Exception ):
661668 return e
669+ elif len (self ._received_subscriptions ) > 0 :
670+ return SubstrateRequestException (
671+ f"Currently open subscriptions while disconnecting. "
672+ f"Ensure these are unsubscribed from before closing in the future."
673+ )
674+ return None
662675
663676 async def __aexit__ (self , exc_type , exc_val , exc_tb ):
664- if not self .state != State .CONNECTING :
665- if self ._exit_task is not None :
666- self ._exit_task .cancel ()
667- try :
668- await self ._exit_task
669- except asyncio .CancelledError :
670- pass
671- if self .ws is not None :
672- self ._exit_task = asyncio .create_task (self ._exit_with_timer ())
677+ if self .shutdown_timer is not None :
678+ if not self .state != State .CONNECTING :
679+ if self ._exit_task is not None :
680+ self ._exit_task .cancel ()
681+ try :
682+ await self ._exit_task
683+ except asyncio .CancelledError :
684+ pass
685+ if self .ws is not None :
686+ self ._exit_task = asyncio .create_task (self ._exit_with_timer ())
673687
674688 async def _exit_with_timer (self ):
675689 """
676690 Allows for graceful shutdown of websocket connection after specified number of seconds, allowing
677691 for reuse of the websocket connection.
678692 """
679693 try :
680- await asyncio .sleep (self .shutdown_timer )
694+ if self .shutdown_timer is not None :
695+ await asyncio .sleep (self .shutdown_timer )
681696 await self .shutdown ()
682697 except asyncio .CancelledError :
683698 pass
@@ -1407,7 +1422,8 @@ async def retrieve_pending_extrinsics(self) -> list:
14071422 runtime = await self .init_runtime ()
14081423
14091424 result_data = await self .rpc_request ("author_pendingExtrinsics" , [])
1410-
1425+ if "error" in result_data :
1426+ raise SubstrateRequestException (result_data ["error" ]["message" ])
14111427 extrinsics = []
14121428
14131429 for extrinsic_data in result_data ["result" ]:
@@ -2141,6 +2157,8 @@ async def get_parent_block_hash(self, block_hash) -> str:
21412157
21422158 async def _get_parent_block_hash (self , block_hash ) -> str :
21432159 block_header = await self .rpc_request ("chain_getHeader" , [block_hash ])
2160+ if "error" in block_header :
2161+ raise SubstrateRequestException (block_header ["error" ]["message" ])
21442162
21452163 if block_header ["result" ] is None :
21462164 raise SubstrateRequestException (f'Block not found for "{ block_hash } "' )
@@ -2172,15 +2190,7 @@ async def get_storage_by_key(self, block_hash: str, storage_key: str) -> Any:
21722190 response = await self .rpc_request (
21732191 "state_getStorage" , [storage_key , block_hash ]
21742192 )
2175-
2176- if "result" in response :
2177- return response .get ("result" )
2178- elif "error" in response :
2179- raise SubstrateRequestException (response ["error" ]["message" ])
2180- else :
2181- raise SubstrateRequestException (
2182- "Unknown error occurred during retrieval of events"
2183- )
2193+ return response .get ("result" )
21842194
21852195 @cached_fetcher (max_size = SUBSTRATE_RUNTIME_CACHE_SIZE )
21862196 async def get_block_runtime_info (self , block_hash : str ) -> dict :
@@ -2236,9 +2246,6 @@ async def get_block_metadata(
22362246 params = [block_hash ]
22372247 response = await self .rpc_request ("state_getMetadata" , params )
22382248
2239- if "error" in response :
2240- raise SubstrateRequestException (response ["error" ]["message" ])
2241-
22422249 if (result := response .get ("result" )) and decode :
22432250 metadata_decoder = runtime_config .create_scale_object (
22442251 "MetadataVersioned" , data = ScaleBytes (result )
@@ -2304,8 +2311,13 @@ async def _preprocess(
23042311 metadata = runtime .metadata ,
23052312 )
23062313 method = "state_getStorageAt"
2314+ queryable = (
2315+ str (query_for )
2316+ if query_for is not None
2317+ else f"{ method } { random .randint (0 , 7000 )} "
2318+ )
23072319 return Preprocessed (
2308- str ( query_for ) ,
2320+ queryable ,
23092321 method ,
23102322 [storage_key .to_hex (), block_hash ],
23112323 value_scale_type ,
@@ -2553,7 +2565,7 @@ async def _get_block_hash(self, block_id: int) -> str:
25532565 return (await self .rpc_request ("chain_getBlockHash" , [block_id ]))["result" ]
25542566
25552567 async def get_chain_head (self ) -> str :
2556- result = await self ._make_rpc_request (
2568+ response = await self ._make_rpc_request (
25572569 [
25582570 self .make_payload (
25592571 "rpc_request" ,
@@ -2562,8 +2574,11 @@ async def get_chain_head(self) -> str:
25622574 )
25632575 ]
25642576 )
2565- self .last_block_hash = result ["rpc_request" ][0 ]["result" ]
2566- return result ["rpc_request" ][0 ]["result" ]
2577+ result = response ["rpc_request" ][0 ]
2578+ if "error" in result :
2579+ raise SubstrateRequestException (result ["error" ]["message" ])
2580+ self .last_block_hash = result ["result" ]
2581+ return result ["result" ]
25672582
25682583 async def compose_call (
25692584 self ,
@@ -2690,9 +2705,6 @@ async def query_multi(
26902705 runtime = runtime ,
26912706 )
26922707
2693- if "error" in response :
2694- raise SubstrateRequestException (response ["error" ]["message" ])
2695-
26962708 result = []
26972709
26982710 storage_key_map = {s .to_hex (): s for s in storage_keys }
@@ -3044,12 +3056,7 @@ async def get_chain_finalised_head(self):
30443056
30453057 """
30463058 response = await self .rpc_request ("chain_getFinalizedHead" , [])
3047-
3048- if response is not None :
3049- if "error" in response :
3050- raise SubstrateRequestException (response ["error" ]["message" ])
3051-
3052- return response .get ("result" )
3059+ return response ["result" ]
30533060
30543061 async def _do_runtime_call_old (
30553062 self ,
@@ -3092,6 +3099,8 @@ async def _do_runtime_call_old(
30923099 [f"{ api } _{ method } " , param_data .hex (), block_hash ],
30933100 runtime = runtime ,
30943101 )
3102+ if "error" in result_data :
3103+ raise SubstrateRequestException (result_data ["error" ]["message" ])
30953104 result_vec_u8_bytes = hex_to_bytes (result_data ["result" ])
30963105 result_bytes = await self .decode_scale (
30973106 "Vec<u8>" , result_vec_u8_bytes , runtime = runtime
@@ -3185,6 +3194,8 @@ async def runtime_call(
31853194 [f"{ api } _{ method } " , param_data .hex (), block_hash ],
31863195 runtime = runtime ,
31873196 )
3197+ if "error" in result_data :
3198+ raise SubstrateRequestException (result_data ["error" ]["message" ])
31883199 output_type_string = f"scale_info::{ runtime_call_def ['output' ]} "
31893200
31903201 # Decode result
@@ -3237,6 +3248,8 @@ async def get_account_next_index(self, account_address: str) -> int:
32373248 nonce_obj = await self .rpc_request (
32383249 "account_nextIndex" , [account_address ]
32393250 )
3251+ if "error" in nonce_obj :
3252+ raise SubstrateRequestException (nonce_obj ["error" ]["message" ])
32403253 self ._nonces [account_address ] = nonce_obj ["result" ]
32413254 else :
32423255 self ._nonces [account_address ] += 1
@@ -3622,9 +3635,6 @@ async def query_map(
36223635 method = "state_getKeys" , params = [prefix , block_hash ], runtime = runtime
36233636 )
36243637
3625- if "error" in response :
3626- raise SubstrateRequestException (response ["error" ]["message" ])
3627-
36283638 result_keys = response .get ("result" )
36293639
36303640 result = []
@@ -3640,8 +3650,6 @@ async def query_map(
36403650 params = [result_keys , block_hash ],
36413651 runtime = runtime ,
36423652 )
3643- if "error" in response :
3644- raise SubstrateRequestException (response ["error" ]["message" ])
36453653 for result_group in response ["result" ]:
36463654 result = decode_query_map (
36473655 result_group ["changes" ],
@@ -3680,8 +3688,6 @@ async def query_map(
36803688 )
36813689 )
36823690 for response in all_responses :
3683- if "error" in response :
3684- raise SubstrateRequestException (response ["error" ]["message" ])
36853691 for result_group in response ["result" ]:
36863692 changes .extend (result_group ["changes" ])
36873693
@@ -3905,9 +3911,6 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]:
39053911 "author_submitExtrinsic" , [str (extrinsic .data )]
39063912 )
39073913
3908- if "result" not in response :
3909- raise SubstrateRequestException (response .get ("error" ))
3910-
39113914 result = AsyncExtrinsicReceipt (
39123915 substrate = self , extrinsic_hash = response ["result" ]
39133916 )
@@ -3919,18 +3922,17 @@ async def get_metadata_call_function(
39193922 module_name : str ,
39203923 call_function_name : str ,
39213924 block_hash : Optional [str ] = None ,
3922- ) -> Optional [list ]:
3925+ ) -> Optional [GenericVariant ]:
39233926 """
3924- Retrieves a list of all call functions in metadata active for given block_hash (or chaintip if block_hash
3925- is omitted)
3927+ Retrieves specified call from the metadata at the block specified, or the chain tip if omitted.
39263928
39273929 Args:
39283930 module_name: name of the module
39293931 call_function_name: name of the call function
39303932 block_hash: optional block hash
39313933
39323934 Returns:
3933- list of call functions
3935+ The dict-like call definition, if found. None otherwise.
39343936 """
39353937 runtime = await self .init_runtime (block_hash = block_hash )
39363938
@@ -3994,12 +3996,8 @@ async def get_block_number(self, block_hash: Optional[str] = None) -> int:
39943996 """Async version of `substrateinterface.base.get_block_number` method."""
39953997 response = await self .rpc_request ("chain_getHeader" , [block_hash ])
39963998
3997- if "error" in response :
3998- raise SubstrateRequestException (response ["error" ]["message" ])
3999-
4000- elif "result" in response :
4001- if response ["result" ]:
4002- return int (response ["result" ]["number" ], 16 )
3999+ if response ["result" ]:
4000+ return int (response ["result" ]["number" ], 16 )
40034001 raise SubstrateRequestException (
40044002 f"Unable to retrieve block number for { block_hash } "
40054003 )
0 commit comments