diff --git a/bittensor_cli/src/bittensor/subtensor_interface.py b/bittensor_cli/src/bittensor/subtensor_interface.py index b056937aa..b8cfb7afd 100644 --- a/bittensor_cli/src/bittensor/subtensor_interface.py +++ b/bittensor_cli/src/bittensor/subtensor_interface.py @@ -283,7 +283,7 @@ async def query_runtime_api( self, runtime_api: str, method: str, - params: Optional[Union[list[list[int]], dict[str, int]]], + params: Optional[Union[list[list[int]], list[int], dict[str, int]]], block_hash: Optional[str] = None, reuse_block: Optional[bool] = False, ) -> Optional[str]: diff --git a/bittensor_cli/src/commands/wallets.py b/bittensor_cli/src/commands/wallets.py index c790d7fb3..44fff91f0 100644 --- a/bittensor_cli/src/commands/wallets.py +++ b/bittensor_cli/src/commands/wallets.py @@ -4,10 +4,9 @@ import os import sys from collections import defaultdict -from concurrent.futures import ProcessPoolExecutor from functools import partial from sys import getsizeof -from typing import Any, Collection, Generator, Optional +from typing import Collection, Generator, Optional import aiohttp from bittensor_wallet import Wallet @@ -1102,7 +1101,7 @@ def _map_hotkey_to_neurons( async def _fetch_neuron_for_netuid( netuid: int, subtensor: SubtensorInterface -) -> tuple[int, dict[str, list[ScaleBytes]]]: +) -> tuple[int, Optional[str]]: """ Retrieves all neurons for a specified netuid @@ -1112,18 +1111,13 @@ async def _fetch_neuron_for_netuid( :return: the original netuid, and a mapping of the neurons to their NeuronInfoLite objects """ - async def neurons_lite_for_uid(uid: int) -> dict[Any, Any]: - call_definition = TYPE_REGISTRY["runtime_api"]["NeuronInfoRuntimeApi"][ - "methods" - ]["get_neurons_lite"] - data = await subtensor.encode_params( - call_definition=call_definition, params=[uid] - ) + async def neurons_lite_for_uid(uid: int) -> Optional[str]: block_hash = subtensor.substrate.last_block_hash - hex_bytes_result = await subtensor.substrate.rpc_request( - method="state_call", - params=["NeuronInfoRuntimeApi_get_neurons_lite", data, block_hash], - reuse_block_hash=True, + hex_bytes_result = await subtensor.query_runtime_api( + runtime_api="NeuronInfoRuntimeApi", + method="get_neurons_lite", + params=[uid], + block_hash=block_hash, ) return hex_bytes_result @@ -1134,7 +1128,7 @@ async def neurons_lite_for_uid(uid: int) -> dict[Any, Any]: async def _fetch_all_neurons( netuids: list[int], subtensor -) -> list[tuple[int, list[ScaleBytes]]]: +) -> list[tuple[int, Optional[str]]]: """Retrieves all neurons for each of the specified netuids""" return list( await asyncio.gather( @@ -1143,50 +1137,21 @@ async def _fetch_all_neurons( ) -def _partial_decode(args): - """ - Helper function for passing to ProcessPoolExecutor that decodes scale bytes based on a set return type and - rpc type registry, passing this back to the Executor with its specified netuid for easier mapping - - :param args: (return type, scale bytes object, custom rpc type registry, netuid) - - :return: (original netuid, decoded object) - """ - return_type, as_scale_bytes, custom_rpc_type_registry_, netuid_ = args - decoded = decode_scale_bytes(return_type, as_scale_bytes, custom_rpc_type_registry_) - if decoded.startswith("0x"): - bytes_result = bytes.fromhex(decoded[2:]) - else: - bytes_result = bytes.fromhex(decoded) - - return netuid_, NeuronInfoLite.list_from_vec_u8(bytes_result) - - def _process_neurons_for_netuids( - netuids_with_all_neurons_hex_bytes: list[tuple[int, list[ScaleBytes]]], + netuids_with_all_neurons_hex_bytes: list[tuple[int, Optional[str]]], ) -> list[tuple[int, list[NeuronInfoLite]]]: """ - Using multiprocessing to decode a list of hex-bytes neurons with their respective netuid + Decode a list of hex-bytes neurons with their respective netuid :param netuids_with_all_neurons_hex_bytes: netuids with hex-bytes neurons :return: netuids mapped to decoded neurons """ - - def make_map(res_): - netuid_, json_result = res_ - hex_bytes_result = json_result["result"] - as_scale_bytes = scalecodec.ScaleBytes(hex_bytes_result) - return [return_type, as_scale_bytes, custom_rpc_type_registry, netuid_] - - return_type = TYPE_REGISTRY["runtime_api"]["NeuronInfoRuntimeApi"]["methods"][ - "get_neurons_lite" - ]["type"] - - preprocessed = [make_map(r) for r in netuids_with_all_neurons_hex_bytes] - with ProcessPoolExecutor() as executor: - results = list(executor.map(_partial_decode, preprocessed)) - - all_results = [(netuid, result) for netuid, result in results] + all_results = [ + (netuid, NeuronInfoLite.list_from_vec_u8(bytes.fromhex(result[2:]))) + if result + else (netuid, []) + for netuid, result in netuids_with_all_neurons_hex_bytes + ] return all_results