Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bittensor_cli/src/bittensor/subtensor_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ async def query_runtime_api(
self,
runtime_api: str,
method: str,
params: Optional[Union[list[list[int]], dict[str, int]]],
params: Optional[Union[list[list[int]], list[int], dict[str, int]]],
block_hash: Optional[str] = None,
reuse_block: Optional[bool] = False,
) -> Optional[str]:
Expand Down
69 changes: 17 additions & 52 deletions bittensor_cli/src/commands/wallets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import os
import sys
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from sys import getsizeof
from typing import Any, Collection, Generator, Optional
from typing import Collection, Generator, Optional

import aiohttp
from bittensor_wallet import Wallet
Expand Down Expand Up @@ -1102,7 +1101,7 @@ def _map_hotkey_to_neurons(

async def _fetch_neuron_for_netuid(
netuid: int, subtensor: SubtensorInterface
) -> tuple[int, dict[str, list[ScaleBytes]]]:
) -> tuple[int, Optional[str]]:
"""
Retrieves all neurons for a specified netuid

Expand All @@ -1112,18 +1111,13 @@ async def _fetch_neuron_for_netuid(
:return: the original netuid, and a mapping of the neurons to their NeuronInfoLite objects
"""

async def neurons_lite_for_uid(uid: int) -> dict[Any, Any]:
call_definition = TYPE_REGISTRY["runtime_api"]["NeuronInfoRuntimeApi"][
"methods"
]["get_neurons_lite"]
data = await subtensor.encode_params(
call_definition=call_definition, params=[uid]
)
async def neurons_lite_for_uid(uid: int) -> Optional[str]:
block_hash = subtensor.substrate.last_block_hash
hex_bytes_result = await subtensor.substrate.rpc_request(
method="state_call",
params=["NeuronInfoRuntimeApi_get_neurons_lite", data, block_hash],
reuse_block_hash=True,
hex_bytes_result = await subtensor.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neurons_lite",
params=[uid],
block_hash=block_hash,
)

return hex_bytes_result
Expand All @@ -1134,7 +1128,7 @@ async def neurons_lite_for_uid(uid: int) -> dict[Any, Any]:

async def _fetch_all_neurons(
netuids: list[int], subtensor
) -> list[tuple[int, list[ScaleBytes]]]:
) -> list[tuple[int, Optional[str]]]:
"""Retrieves all neurons for each of the specified netuids"""
return list(
await asyncio.gather(
Expand All @@ -1143,50 +1137,21 @@ async def _fetch_all_neurons(
)


def _partial_decode(args):
"""
Helper function for passing to ProcessPoolExecutor that decodes scale bytes based on a set return type and
rpc type registry, passing this back to the Executor with its specified netuid for easier mapping

:param args: (return type, scale bytes object, custom rpc type registry, netuid)

:return: (original netuid, decoded object)
"""
return_type, as_scale_bytes, custom_rpc_type_registry_, netuid_ = args
decoded = decode_scale_bytes(return_type, as_scale_bytes, custom_rpc_type_registry_)
if decoded.startswith("0x"):
bytes_result = bytes.fromhex(decoded[2:])
else:
bytes_result = bytes.fromhex(decoded)

return netuid_, NeuronInfoLite.list_from_vec_u8(bytes_result)


def _process_neurons_for_netuids(
netuids_with_all_neurons_hex_bytes: list[tuple[int, list[ScaleBytes]]],
netuids_with_all_neurons_hex_bytes: list[tuple[int, Optional[str]]],
) -> list[tuple[int, list[NeuronInfoLite]]]:
"""
Using multiprocessing to decode a list of hex-bytes neurons with their respective netuid
Decode a list of hex-bytes neurons with their respective netuid

:param netuids_with_all_neurons_hex_bytes: netuids with hex-bytes neurons
:return: netuids mapped to decoded neurons
"""

def make_map(res_):
netuid_, json_result = res_
hex_bytes_result = json_result["result"]
as_scale_bytes = scalecodec.ScaleBytes(hex_bytes_result)
return [return_type, as_scale_bytes, custom_rpc_type_registry, netuid_]

return_type = TYPE_REGISTRY["runtime_api"]["NeuronInfoRuntimeApi"]["methods"][
"get_neurons_lite"
]["type"]

preprocessed = [make_map(r) for r in netuids_with_all_neurons_hex_bytes]
with ProcessPoolExecutor() as executor:
results = list(executor.map(_partial_decode, preprocessed))

all_results = [(netuid, result) for netuid, result in results]
all_results = [
(netuid, NeuronInfoLite.list_from_vec_u8(bytes.fromhex(result[2:])))
if result
else (netuid, [])
for netuid, result in netuids_with_all_neurons_hex_bytes
]
return all_results


Expand Down