diff --git a/bittensor/core/extrinsics/registration.py b/bittensor/core/extrinsics/registration.py new file mode 100644 index 0000000000..bd19b16389 --- /dev/null +++ b/bittensor/core/extrinsics/registration.py @@ -0,0 +1,287 @@ +# The MIT License (MIT) +# Copyright © 2024 Opentensor Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. +# +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +import time +from typing import Union, Optional, TYPE_CHECKING + +from retry import retry +from rich.prompt import Confirm + +from bittensor.core.settings import bt_console +from bittensor.utils import format_error_message +from bittensor.utils.btlogging import logging +from bittensor.utils.networking import ensure_connected +from bittensor.utils.registration import ( + POWSolution, + create_pow, + torch, + log_no_torch_error, +) + +# For annotation purposes +if TYPE_CHECKING: + from bittensor.core.subtensor import Subtensor + from bittensor_wallet import Wallet + + +@ensure_connected +def _do_pow_register( + self: "Subtensor", + netuid: int, + wallet: "Wallet", + pow_result: POWSolution, + wait_for_inclusion: bool = False, + wait_for_finalization: bool = True, +) -> tuple[bool, Optional[str]]: + """Sends a (POW) register extrinsic to the chain. + + Args: + netuid (int): The subnet to register on. + wallet (bittensor.wallet): The wallet to register. + pow_result (POWSolution): The PoW result to register. + wait_for_inclusion (bool): If ``True``, waits for the extrinsic to be included in a block. + Default to `False`. + wait_for_finalization (bool): If ``True``, waits for the extrinsic to be finalized. Default to `True`. + + Returns: + success (bool): ``True`` if the extrinsic was included in a block. + error (Optional[str]): ``None`` on success or not waiting for inclusion/finalization, otherwise the error + message. + """ + + @retry(delay=1, tries=3, backoff=2, max_delay=4) + def make_substrate_call_with_retry(): + # create extrinsic call + call = self.substrate.compose_call( + call_module="SubtensorModule", + call_function="register", + call_params={ + "netuid": netuid, + "block_number": pow_result.block_number, + "nonce": pow_result.nonce, + "work": [int(byte_) for byte_ in pow_result.seal], + "hotkey": wallet.hotkey.ss58_address, + "coldkey": wallet.coldkeypub.ss58_address, + }, + ) + extrinsic = self.substrate.create_signed_extrinsic( + call=call, keypair=wallet.hotkey + ) + response = self.substrate.submit_extrinsic( + extrinsic, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + ) + + # We only wait here if we expect finalization. + if not wait_for_finalization and not wait_for_inclusion: + return True, None + + # process if registration successful, try again if pow is still valid + response.process_events() + if not response.is_success: + return False, format_error_message(response.error_message) + # Successful registration + else: + return True, None + + return make_substrate_call_with_retry() + + +def register_extrinsic( + subtensor: "Subtensor", + wallet: "Wallet", + netuid: int, + wait_for_inclusion: bool = False, + wait_for_finalization: bool = True, + prompt: bool = False, + max_allowed_attempts: int = 3, + output_in_place: bool = True, + cuda: bool = False, + dev_id: Union[list[int], int] = 0, + tpb: int = 256, + num_processes: Optional[int] = None, + update_interval: Optional[int] = None, + log_verbose: bool = False, +) -> bool: + """Registers the wallet to the chain. + + Args: + subtensor (bittensor.core.subtensor.Subtensor): Subtensor interface. + wallet (bittensor.wallet): Bittensor wallet object. + netuid (int): The ``netuid`` of the subnet to register on. + wait_for_inclusion (bool): If set, waits for the extrinsic to enter a block before returning ``true``, or returns ``false`` if the extrinsic fails to enter the block within the timeout. + wait_for_finalization (bool): If set, waits for the extrinsic to be finalized on the chain before returning ``true``, or returns ``false`` if the extrinsic fails to be finalized within the timeout. + prompt (bool): If ``true``, the call waits for confirmation from the user before proceeding. + max_allowed_attempts (int): Maximum number of attempts to register the wallet. + output_in_place (bool): If true, prints the progress of the proof of work to the console in-place. Meaning the progress is printed on the same lines. Defaults to `True`. + cuda (bool): If ``true``, the wallet should be registered using CUDA device(s). + dev_id (Union[List[int], int]): The CUDA device id to use, or a list of device ids. + tpb (int): The number of threads per block (CUDA). + num_processes (int): The number of processes to use to register. + update_interval (int): The number of nonces to solve between updates. + log_verbose (bool): If ``true``, the registration process will log more information. + + Returns: + success (bool): + Flag is ``true`` if extrinsic was finalized or uncluded in the block. If we did not wait for finalization / inclusion, the response is ``true``. + """ + if not subtensor.subnet_exists(netuid): + bt_console.print( + ":cross_mark: [red]Failed[/red]: error: [bold white]subnet:{}[/bold white] does not exist.".format( + netuid + ) + ) + return False + + with bt_console.status( + f":satellite: Checking Account on [bold]subnet:{netuid}[/bold]..." + ): + neuron = subtensor.get_neuron_for_pubkey_and_subnet( + wallet.hotkey.ss58_address, netuid=netuid + ) + if not neuron.is_null: + logging.debug( + f"Wallet {wallet} is already registered on {neuron.netuid} with {neuron.uid}" + ) + return True + + if prompt: + if not Confirm.ask( + "Continue Registration?\n hotkey: [bold white]{}[/bold white]\n coldkey: [bold white]{}[/bold white]\n network: [bold white]{}[/bold white]".format( + wallet.hotkey.ss58_address, + wallet.coldkeypub.ss58_address, + subtensor.network, + ) + ): + return False + + if not torch: + log_no_torch_error() + return False + + # Attempt rolling registration. + attempts = 1 + while True: + bt_console.print( + ":satellite: Registering...({}/{})".format(attempts, max_allowed_attempts) + ) + # Solve latest POW. + if cuda: + if not torch.cuda.is_available(): + if prompt: + bt_console.print("CUDA is not available.") + return False + pow_result: Optional[POWSolution] = create_pow( + subtensor, + wallet, + netuid, + output_in_place, + cuda=cuda, + dev_id=dev_id, + tpb=tpb, + num_processes=num_processes, + update_interval=update_interval, + log_verbose=log_verbose, + ) + else: + pow_result: Optional[POWSolution] = create_pow( + subtensor, + wallet, + netuid, + output_in_place, + cuda=cuda, + num_processes=num_processes, + update_interval=update_interval, + log_verbose=log_verbose, + ) + + # pow failed + if not pow_result: + # might be registered already on this subnet + is_registered = subtensor.is_hotkey_registered( + netuid=netuid, hotkey_ss58=wallet.hotkey.ss58_address + ) + if is_registered: + bt_console.print( + f":white_heavy_check_mark: [green]Already registered on netuid:{netuid}[/green]" + ) + return True + + # pow successful, proceed to submit pow to chain for registration + else: + with bt_console.status(":satellite: Submitting POW..."): + # check if pow result is still valid + while not pow_result.is_stale(subtensor=subtensor): + result: tuple[bool, Optional[str]] = _do_pow_register( + self=subtensor, + netuid=netuid, + wallet=wallet, + pow_result=pow_result, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + ) + success, err_msg = result + + if not success: + # Look error here + # https://github.com/opentensor/subtensor/blob/development/pallets/subtensor/src/errors.rs + if "HotKeyAlreadyRegisteredInSubNet" in err_msg: + bt_console.print( + f":white_heavy_check_mark: [green]Already Registered on [bold]subnet:{netuid}[/bold][/green]" + ) + return True + + bt_console.print(f":cross_mark: [red]Failed[/red]: {err_msg}") + time.sleep(0.5) + + # Successful registration, final check for neuron and pubkey + else: + bt_console.print(":satellite: Checking Balance...") + is_registered = subtensor.is_hotkey_registered( + hotkey_ss58=wallet.hotkey.ss58_address, + netuid=netuid, + ) + if is_registered: + bt_console.print( + ":white_heavy_check_mark: [green]Registered[/green]" + ) + return True + else: + # neuron not found, try again + bt_console.print( + ":cross_mark: [red]Unknown error. Neuron not found.[/red]" + ) + continue + else: + # Exited loop because pow is no longer valid. + bt_console.print("[red]POW is stale.[/red]") + # Try again. + continue + + if attempts < max_allowed_attempts: + # Failed registration, retry pow + attempts += 1 + bt_console.print( + ":satellite: Failed registration, retrying pow ...({}/{})".format( + attempts, max_allowed_attempts + ) + ) + else: + # Failed to register after max attempts. + bt_console.print("[red]No more attempts.[/red]") + return False diff --git a/bittensor/core/subtensor.py b/bittensor/core/subtensor.py index ca7397adb6..cef96e802f 100644 --- a/bittensor/core/subtensor.py +++ b/bittensor/core/subtensor.py @@ -54,6 +54,7 @@ do_serve_prometheus, prometheus_extrinsic, ) +from bittensor.core.extrinsics.registration import register_extrinsic from bittensor.core.extrinsics.serving import ( do_serve_axon, serve_axon_extrinsic, @@ -898,6 +899,65 @@ def set_weights( return success, message + def register( + self, + wallet: "Wallet", + netuid: int, + wait_for_inclusion: bool = False, + wait_for_finalization: bool = True, + prompt: bool = False, + max_allowed_attempts: int = 3, + output_in_place: bool = True, + cuda: bool = False, + dev_id: Union[list[int], int] = 0, + tpb: int = 256, + num_processes: Optional[int] = None, + update_interval: Optional[int] = None, + log_verbose: bool = False, + ) -> bool: + """ + Registers a neuron on the Bittensor network using the provided wallet. + + Registration is a critical step for a neuron to become an active participant in the network, enabling it to stake, set weights, and receive incentives. + + Args: + wallet (bittensor_wallet.Wallet): The wallet associated with the neuron to be registered. + netuid (int): The unique identifier of the subnet. + wait_for_inclusion (bool): Waits for the transaction to be included in a block. Defaults to `False`. + wait_for_finalization (bool): Waits for the transaction to be finalized on the blockchain. Defaults to `True`. + prompt (bool): If ``True``, prompts for user confirmation before proceeding. + max_allowed_attempts (int): Maximum number of attempts to register the wallet. + output_in_place (bool): If true, prints the progress of the proof of work to the console in-place. Meaning the progress is printed on the same lines. Defaults to `True`. + cuda (bool): If ``true``, the wallet should be registered using CUDA device(s). Defaults to `False`. + dev_id (Union[List[int], int]): The CUDA device id to use, or a list of device ids. Defaults to `0` (zero). + tpb (int): The number of threads per block (CUDA). Default to `256`. + num_processes (Optional[int]): The number of processes to use to register. Default to `None`. + update_interval (Optional[int]): The number of nonces to solve between updates. Default to `None`. + log_verbose (bool): If ``true``, the registration process will log more information. Default to `False`. + + Returns: + bool: ``True`` if the registration is successful, False otherwise. + + This function facilitates the entry of new neurons into the network, supporting the decentralized + growth and scalability of the Bittensor ecosystem. + """ + return register_extrinsic( + subtensor=self, + wallet=wallet, + netuid=netuid, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + prompt=prompt, + max_allowed_attempts=max_allowed_attempts, + output_in_place=output_in_place, + cuda=cuda, + dev_id=dev_id, + tpb=tpb, + num_processes=num_processes, + update_interval=update_interval, + log_verbose=log_verbose, + ) + def serve_axon( self, netuid: int, @@ -1730,6 +1790,28 @@ def reveal_weights( return success, message + def difficulty(self, netuid: int, block: Optional[int] = None) -> Optional[int]: + """ + Retrieves the 'Difficulty' hyperparameter for a specified subnet in the Bittensor network. + + This parameter is instrumental in determining the computational challenge required for neurons to participate in consensus and validation processes. + + Args: + netuid (int): The unique identifier of the subnet. + block (Optional[int]): The blockchain block number for the query. + + Returns: + Optional[int]: The value of the 'Difficulty' hyperparameter if the subnet exists, ``None`` otherwise. + + The 'Difficulty' parameter directly impacts the network's security and integrity by setting the computational effort required for validating transactions and participating in the network's consensus mechanism. + """ + call = self._get_hyperparameter( + param_name="Difficulty", netuid=netuid, block=block + ) + if call is None: + return None + return int(call) + # Subnet 27 uses this method _do_serve_prometheus = do_serve_prometheus # Subnet 27 uses this method name diff --git a/bittensor/utils/formatting.py b/bittensor/utils/formatting.py new file mode 100644 index 0000000000..1ee3fd6671 --- /dev/null +++ b/bittensor/utils/formatting.py @@ -0,0 +1,41 @@ +# The MIT License (MIT) +# Copyright © 2024 Opentensor Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. +# +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +import math + + +def get_human_readable(num, suffix="H"): + """Convert a number into a human-readable format with suffixes.""" + for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: + if abs(num) < 1000.0: + return f"{num:3.1f}{unit}{suffix}" + num /= 1000.0 + return f"{num:.1f}Y{suffix}" + + +def millify(n: int): + """Converts a number into a more readable format with suffixes.""" + mill_names = ["", " K", " M", " B", " T"] + n = float(n) + mill_idx = max( + 0, + min( + len(mill_names) - 1, + int(math.floor(0 if n == 0 else math.log10(abs(n)) / 3)), + ), + ) + return "{:.2f}{}".format(n / 10 ** (3 * mill_idx), mill_names[mill_idx]) diff --git a/bittensor/utils/register_cuda.py b/bittensor/utils/register_cuda.py new file mode 100644 index 0000000000..e0a77f19c9 --- /dev/null +++ b/bittensor/utils/register_cuda.py @@ -0,0 +1,130 @@ +# The MIT License (MIT) +# Copyright © 2024 Opentensor Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. +# +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +import binascii +import hashlib +import io +import math +from contextlib import redirect_stdout +from typing import Any, Union + +import numpy as np +from Crypto.Hash import keccak + + +def solve_cuda( + nonce_start: "np.int64", + update_interval: "np.int64", + tpb: int, + block_and_hotkey_hash_bytes: bytes, + difficulty: int, + limit: int, + dev_id: int = 0, +) -> Union[tuple[Any, bytes], tuple[int, bytes], tuple[Any, None]]: + """ + Solves the PoW problem using CUDA. + + Args: + nonce_start (numpy.int64): Starting nonce. + update_interval (numpy.int64): Number of nonces to solve before updating block information. + tpb (int): Threads per block. + block_and_hotkey_hash_bytes (bytes): Keccak(Bytes of the block hash + bytes of the hotkey) 64 bytes. + difficulty (int): Difficulty of the PoW problem. + limit (int): Upper limit of the nonce. + dev_id (int): The CUDA device ID. Defaults to ``0``. + + Returns: + (Union[tuple[Any, bytes], tuple[int, bytes], tuple[Any, None]]): Tuple of the nonce and the seal corresponding to the solution. Returns -1 for nonce if no solution is found. + """ + + try: + import cubit + except ImportError: + raise ImportError( + "Please install cubit. See the instruction https://github.com/opentensor/cubit?tab=readme-ov-file#install." + ) + + upper = int(limit // difficulty) + + upper_bytes = upper.to_bytes(32, byteorder="little", signed=False) + + def _hex_bytes_to_u8_list(hex_bytes: bytes): + """Converts a sequence of hex bytes to a list of unsigned 8-bit integers.""" + hex_chunks = [ + int(hex_bytes[i : i + 2], 16) for i in range(0, len(hex_bytes), 2) + ] + return hex_chunks + + def _create_seal_hash(block_and_hotkey_hash_hex_: bytes, nonce: int) -> bytes: + """Creates a seal hash from the block and hotkey hash and nonce.""" + nonce_bytes = binascii.hexlify(nonce.to_bytes(8, "little")) + pre_seal = nonce_bytes + block_and_hotkey_hash_hex_ + seal_sh256 = hashlib.sha256(bytearray(_hex_bytes_to_u8_list(pre_seal))).digest() + kec = keccak.new(digest_bits=256) + return kec.update(seal_sh256).digest() + + def _seal_meets_difficulty(seal_: bytes, difficulty_: int): + """Checks if the seal meets the given difficulty.""" + seal_number = int.from_bytes(seal_, "big") + product = seal_number * difficulty_ + limit_ = int(math.pow(2, 256)) - 1 + + return product < limit_ + + # Call cython function + # int blockSize, uint64 nonce_start, uint64 update_interval, const unsigned char[:] limit, + # const unsigned char[:] block_bytes, int dev_id + block_and_hotkey_hash_hex = binascii.hexlify(block_and_hotkey_hash_bytes)[:64] + + solution = cubit.solve_cuda( + tpb, + nonce_start, + update_interval, + upper_bytes, + block_and_hotkey_hash_hex, + dev_id, + ) # 0 is first GPU + seal = None + if solution != -1: + seal = _create_seal_hash(block_and_hotkey_hash_hex, solution) + if _seal_meets_difficulty(seal, difficulty): + return solution, seal + else: + return -1, b"\x00" * 32 + return solution, seal + + +def reset_cuda(): + """Resets the CUDA environment.""" + try: + import cubit + except ImportError: + raise ImportError("Please install cubit") + cubit.reset_cuda() + + +def log_cuda_errors() -> str: + """Logs any CUDA errors.""" + try: + import cubit + except ImportError: + raise ImportError("Please install cubit") + + file = io.StringIO() + with redirect_stdout(file): + cubit.log_cuda_errors() + return file.getvalue() diff --git a/bittensor/utils/registration.py b/bittensor/utils/registration.py index 4d0cdb93d6..46c39d3d40 100644 --- a/bittensor/utils/registration.py +++ b/bittensor/utils/registration.py @@ -15,13 +15,30 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. +import binascii +import dataclasses import functools +import hashlib +import math +import multiprocessing import os -from typing import TYPE_CHECKING +import random +import subprocess +import time +from datetime import timedelta +from multiprocessing.queues import Queue as QueueType +from queue import Empty, Full +from typing import Any, Callable, Optional, Union, TYPE_CHECKING +import backoff import numpy +from Crypto.Hash import keccak +from rich import console as rich_console, status as rich_status +from bittensor.core.settings import bt_console from bittensor.utils.btlogging import logging +from bittensor.utils.formatting import get_human_readable, millify +from bittensor.utils.register_cuda import solve_cuda def use_torch() -> bool: @@ -95,5 +112,1007 @@ def __getattr__(self, name): if TYPE_CHECKING: import torch + from bittensor.core.subtensor import Subtensor + from bittensor_wallet import Wallet else: torch = LazyLoadedTorch() + + +def _hex_bytes_to_u8_list(hex_bytes: bytes): + hex_chunks = [int(hex_bytes[i : i + 2], 16) for i in range(0, len(hex_bytes), 2)] + return hex_chunks + + +def _create_seal_hash(block_and_hotkey_hash_bytes: bytes, nonce: int) -> bytes: + """Create a seal hash for a given block and nonce.""" + nonce_bytes = binascii.hexlify(nonce.to_bytes(8, "little")) + pre_seal = nonce_bytes + binascii.hexlify(block_and_hotkey_hash_bytes)[:64] + seal_sh256 = hashlib.sha256(bytearray(_hex_bytes_to_u8_list(pre_seal))).digest() + kec = keccak.new(digest_bits=256) + seal = kec.update(seal_sh256).digest() + return seal + + +def _seal_meets_difficulty(seal: bytes, difficulty: int, limit: int): + """Check if the seal meets the given difficulty criteria.""" + seal_number = int.from_bytes(seal, "big") + product = seal_number * difficulty + return product < limit + + +@dataclasses.dataclass +class POWSolution: + """A solution to the registration PoW problem.""" + + nonce: int + block_number: int + difficulty: int + seal: bytes + + def is_stale(self, subtensor: "Subtensor") -> bool: + """ + Returns True if the POW is stale. + + This means the block the POW is solved for is within 3 blocks of the current block. + """ + return self.block_number < subtensor.get_current_block() - 3 + + +class _UsingSpawnStartMethod: + def __init__(self, force: bool = False): + self._old_start_method = None + self._force = force + + def __enter__(self): + self._old_start_method = multiprocessing.get_start_method(allow_none=True) + if self._old_start_method is None: + self._old_start_method = "spawn" # default to spawn + + multiprocessing.set_start_method("spawn", force=self._force) + + def __exit__(self, *args): + # restore the old start method + multiprocessing.set_start_method(self._old_start_method, force=True) + + +class _SolverBase(multiprocessing.Process): + """ + A process that solves the registration PoW problem. + + Args: + proc_num (int): The number of the process being created. + num_proc (int): The total number of processes running. + update_interval (int): The number of nonces to try to solve before checking for a new block. + finished_queue (multiprocessing.Queue): The queue to put the process number when a process finishes each update_interval. Used for calculating the average time per update_interval across all processes. + solution_queue (multiprocessing.Queue): The queue to put the solution the process has found during the pow solve. + newBlockEvent (multiprocessing.Event): The event to set by the main process when a new block is finalized in the network. The solver process will check for the event after each update_interval. The solver process will get the new block hash and difficulty and start solving for a new nonce. + stopEvent (multiprocessing.Event): The event to set by the main process when all the solver processes should stop. The solver process will check for the event after each update_interval. The solver process will stop when the event is set. Used to stop the solver processes when a solution is found. + curr_block (multiprocessing.Array): The array containing this process's current block hash. The main process will set the array to the new block hash when a new block is finalized in the network. The solver process will get the new block hash from this array when newBlockEvent is set. + curr_block_num (multiprocessing.Value): The value containing this process's current block number. The main process will set the value to the new block number when a new block is finalized in the network. The solver process will get the new block number from this value when newBlockEvent is set. + curr_diff (multiprocessing.Array): The array containing this process's current difficulty. The main process will set the array to the new difficulty when a new block is finalized in the network. The solver process will get the new difficulty from this array when newBlockEvent is set. + check_block (multiprocessing.Lock): The lock to prevent this process from getting the new block data while the main process is updating the data. + limit (int): The limit of the pow solve for a valid solution. + """ + + proc_num: int + num_proc: int + update_interval: int + finished_queue: "multiprocessing.Queue" + solution_queue: "multiprocessing.Queue" + newBlockEvent: "multiprocessing.Event" + stopEvent: "multiprocessing.Event" + hotkey_bytes: bytes + curr_block: "multiprocessing.Array" + curr_block_num: "multiprocessing.Value" + curr_diff: "multiprocessing.Array" + check_block: "multiprocessing.Lock" + limit: int + + def __init__( + self, + proc_num, + num_proc, + update_interval, + finished_queue, + solution_queue, + stopEvent, + curr_block, + curr_block_num, + curr_diff, + check_block, + limit, + ): + multiprocessing.Process.__init__(self, daemon=True) + self.proc_num = proc_num + self.num_proc = num_proc + self.update_interval = update_interval + self.finished_queue = finished_queue + self.solution_queue = solution_queue + self.newBlockEvent = multiprocessing.Event() + self.newBlockEvent.clear() + self.curr_block = curr_block + self.curr_block_num = curr_block_num + self.curr_diff = curr_diff + self.check_block = check_block + self.stopEvent = stopEvent + self.limit = limit + + def run(self): + raise NotImplementedError("_SolverBase is an abstract class") + + @staticmethod + def create_shared_memory() -> ( + tuple["multiprocessing.Array", "multiprocessing.Value", "multiprocessing.Array"] + ): + """Creates shared memory for the solver processes to use.""" + curr_block = multiprocessing.Array("h", 32, lock=True) # byte array + curr_block_num = multiprocessing.Value("i", 0, lock=True) # int + curr_diff = multiprocessing.Array("Q", [0, 0], lock=True) # [high, low] + + return curr_block, curr_block_num, curr_diff + + +class _Solver(_SolverBase): + def run(self): + block_number: int + block_and_hotkey_hash_bytes: bytes + block_difficulty: int + nonce_limit = int(math.pow(2, 64)) - 1 + + # Start at random nonce + nonce_start = random.randint(0, nonce_limit) + nonce_end = nonce_start + self.update_interval + while not self.stopEvent.is_set(): + if self.newBlockEvent.is_set(): + with self.check_block: + block_number = self.curr_block_num.value + block_and_hotkey_hash_bytes = bytes(self.curr_block) + block_difficulty = _registration_diff_unpack(self.curr_diff) + + self.newBlockEvent.clear() + + # Do a block of nonces + solution = _solve_for_nonce_block( + nonce_start, + nonce_end, + block_and_hotkey_hash_bytes, + block_difficulty, + self.limit, + block_number, + ) + if solution is not None: + self.solution_queue.put(solution) + + try: + # Send time + self.finished_queue.put_nowait(self.proc_num) + except Full: + pass + + nonce_start = random.randint(0, nonce_limit) + nonce_start = nonce_start % nonce_limit + nonce_end = nonce_start + self.update_interval + + +class _CUDASolver(_SolverBase): + dev_id: int + tpb: int + + def __init__( + self, + proc_num, + num_proc, + update_interval, + finished_queue, + solution_queue, + stopEvent, + curr_block, + curr_block_num, + curr_diff, + check_block, + limit, + dev_id: int, + tpb: int, + ): + super().__init__( + proc_num, + num_proc, + update_interval, + finished_queue, + solution_queue, + stopEvent, + curr_block, + curr_block_num, + curr_diff, + check_block, + limit, + ) + self.dev_id = dev_id + self.tpb = tpb + + def run(self): + block_number: int = 0 # dummy value + block_and_hotkey_hash_bytes: bytes = b"0" * 32 # dummy value + block_difficulty: int = int(math.pow(2, 64)) - 1 # dummy value + nonce_limit = int(math.pow(2, 64)) - 1 # U64MAX + + # Start at random nonce + nonce_start = random.randint(0, nonce_limit) + while not self.stopEvent.is_set(): + if self.newBlockEvent.is_set(): + with self.check_block: + block_number = self.curr_block_num.value + block_and_hotkey_hash_bytes = bytes(self.curr_block) + block_difficulty = _registration_diff_unpack(self.curr_diff) + + self.newBlockEvent.clear() + + # Do a block of nonces + solution = _solve_for_nonce_block_cuda( + nonce_start, + self.update_interval, + block_and_hotkey_hash_bytes, + block_difficulty, + self.limit, + block_number, + self.dev_id, + self.tpb, + ) + if solution is not None: + self.solution_queue.put(solution) + + try: + # Signal that a nonce_block was finished using queue + # send our proc_num + self.finished_queue.put(self.proc_num) + except Full: + pass + + # increase nonce by number of nonces processed + nonce_start += self.update_interval * self.tpb + nonce_start = nonce_start % nonce_limit + + +def _solve_for_nonce_block_cuda( + nonce_start: int, + update_interval: int, + block_and_hotkey_hash_bytes: bytes, + difficulty: int, + limit: int, + block_number: int, + dev_id: int, + tpb: int, +) -> Optional["POWSolution"]: + """Tries to solve the POW on a CUDA device for a block of nonces (nonce_start, nonce_start + update_interval * tpb""" + solution, seal = solve_cuda( + nonce_start, + update_interval, + tpb, + block_and_hotkey_hash_bytes, + difficulty, + limit, + dev_id, + ) + + if solution != -1: + # Check if solution is valid (i.e. not -1) + return POWSolution(solution, block_number, difficulty, seal) + + return None + + +def _solve_for_nonce_block( + nonce_start: int, + nonce_end: int, + block_and_hotkey_hash_bytes: bytes, + difficulty: int, + limit: int, + block_number: int, +) -> Optional["POWSolution"]: + """Tries to solve the POW for a block of nonces (nonce_start, nonce_end)""" + for nonce in range(nonce_start, nonce_end): + # Create seal. + seal = _create_seal_hash(block_and_hotkey_hash_bytes, nonce) + + # Check if seal meets difficulty + if _seal_meets_difficulty(seal, difficulty, limit): + # Found a solution, save it. + return POWSolution(nonce, block_number, difficulty, seal) + + return None + + +def _registration_diff_unpack(packed_diff: "multiprocessing.Array") -> int: + """Unpacks the packed two 32-bit integers into one 64-bit integer. Little endian.""" + return int(packed_diff[0] << 32 | packed_diff[1]) + + +def _registration_diff_pack(diff: int, packed_diff: "multiprocessing.Array"): + """Packs the difficulty into two 32-bit integers. Little endian.""" + packed_diff[0] = diff >> 32 + packed_diff[1] = diff & 0xFFFFFFFF # low 32 bits + + +def _hash_block_with_hotkey(block_bytes: bytes, hotkey_bytes: bytes) -> bytes: + """Hashes the block with the hotkey using Keccak-256 to get 32 bytes""" + kec = keccak.new(digest_bits=256) + kec = kec.update(bytearray(block_bytes + hotkey_bytes)) + block_and_hotkey_hash_bytes = kec.digest() + return block_and_hotkey_hash_bytes + + +def _update_curr_block( + curr_diff: "multiprocessing.Array", + curr_block: "multiprocessing.Array", + curr_block_num: "multiprocessing.Value", + block_number: int, + block_bytes: bytes, + diff: int, + hotkey_bytes: bytes, + lock: "multiprocessing.Lock", +): + """Updates the current block's information atomically using a lock.""" + with lock: + curr_block_num.value = block_number + # Hash the block with the hotkey + block_and_hotkey_hash_bytes = _hash_block_with_hotkey(block_bytes, hotkey_bytes) + for i in range(32): + curr_block[i] = block_and_hotkey_hash_bytes[i] + _registration_diff_pack(diff, curr_diff) + + +def get_cpu_count() -> int: + """Returns the number of CPUs in the system.""" + try: + return len(os.sched_getaffinity(0)) + except AttributeError: + # OSX does not have sched_getaffinity + return os.cpu_count() + + +@dataclasses.dataclass +class RegistrationStatistics: + """Statistics for a registration.""" + + time_spent_total: float + rounds_total: int + time_average: float + time_spent: float + hash_rate_perpetual: float + hash_rate: float + difficulty: int + block_number: int + block_hash: bytes + + +class RegistrationStatisticsLogger: + """Logs statistics for a registration.""" + + console: rich_console.Console + status: Optional[rich_status.Status] + + def __init__( + self, console: rich_console.Console, output_in_place: bool = True + ) -> None: + self.console = console + + if output_in_place: + self.status = self.console.status("Solving") + else: + self.status = None + + def start(self) -> None: + if self.status is not None: + self.status.start() + + def stop(self) -> None: + if self.status is not None: + self.status.stop() + + def get_status_message( + self, stats: RegistrationStatistics, verbose: bool = False + ) -> str: + """Generates the status message based on registration statistics.""" + message = ( + "Solving\n" + + f"Time Spent (total): [bold white]{timedelta(seconds=stats.time_spent_total)}[/bold white]\n" + + ( + f"Time Spent This Round: {timedelta(seconds=stats.time_spent)}\n" + + f"Time Spent Average: {timedelta(seconds=stats.time_average)}\n" + if verbose + else "" + ) + + f"Registration Difficulty: [bold white]{millify(stats.difficulty)}[/bold white]\n" + + f"Iters (Inst/Perp): [bold white]{get_human_readable(stats.hash_rate, 'H')}/s / " + + f"{get_human_readable(stats.hash_rate_perpetual, 'H')}/s[/bold white]\n" + + f"Block Number: [bold white]{stats.block_number}[/bold white]\n" + + f"Block Hash: [bold white]{stats.block_hash.encode('utf-8')}[/bold white]\n" + ) + return message + + def update(self, stats: RegistrationStatistics, verbose: bool = False) -> None: + if self.status is not None: + self.status.update(self.get_status_message(stats, verbose=verbose)) + else: + self.console.log(self.get_status_message(stats, verbose=verbose)) + + +def _solve_for_difficulty_fast( + subtensor: "Subtensor", + wallet: "Wallet", + netuid: int, + output_in_place: bool = True, + num_processes: Optional[int] = None, + update_interval: Optional[int] = None, + n_samples: int = 10, + alpha_: float = 0.80, + log_verbose: bool = False, +) -> Optional[POWSolution]: + """ + Solves the POW for registration using multiprocessing. + + Args: + subtensor (bittensor.core.subtensor.Subtensor): Subtensor instance to connect to for block information and to submit. + wallet (bittensor_wallet.Wallet): wallet to use for registration. + netuid (int): The netuid of the subnet to register to. + output_in_place (bool): If true, prints the status in place. Otherwise, prints the status on a new line. + num_processes (int): Number of processes to use. + update_interval (int): Number of nonces to solve before updating block information. + n_samples (int): The number of samples of the hash_rate to keep for the EWMA. + alpha_ (float): The alpha for the EWMA for the hash_rate calculation. + log_verbose (bool): If true, prints more verbose logging of the registration metrics. + + Note: The hash rate is calculated as an exponentially weighted moving average in order to make the measure more robust. + Note: We can also modify the update interval to do smaller blocks of work, while still updating the block information after a different number of nonces, to increase the transparency of the process while still keeping the speed. + """ + if num_processes is None: + # get the number of allowed processes for this process + num_processes = min(1, get_cpu_count()) + + if update_interval is None: + update_interval = 50_000 + + limit = int(math.pow(2, 256)) - 1 + + curr_block, curr_block_num, curr_diff = _Solver.create_shared_memory() + + # Establish communication queues + # See the _Solver class for more information on the queues. + stopEvent = multiprocessing.Event() + stopEvent.clear() + + solution_queue = multiprocessing.Queue() + finished_queues = [multiprocessing.Queue() for _ in range(num_processes)] + check_block = multiprocessing.Lock() + + hotkey_bytes = ( + wallet.coldkeypub.public_key if netuid == -1 else wallet.hotkey.public_key + ) + # Start consumers + solvers = [ + _Solver( + i, + num_processes, + update_interval, + finished_queues[i], + solution_queue, + stopEvent, + curr_block, + curr_block_num, + curr_diff, + check_block, + limit, + ) + for i in range(num_processes) + ] + + # Get first block + block_number, difficulty, block_hash = _get_block_with_retry( + subtensor=subtensor, netuid=netuid + ) + + block_bytes = bytes.fromhex(block_hash[2:]) + old_block_number = block_number + # Set to current block + _update_curr_block( + curr_diff, + curr_block, + curr_block_num, + block_number, + block_bytes, + difficulty, + hotkey_bytes, + check_block, + ) + + # Set new block events for each solver to start at the initial block + for worker in solvers: + worker.newBlockEvent.set() + + for worker in solvers: + worker.start() # start the solver processes + + start_time = time.time() # time that the registration started + time_last = start_time # time that the last work blocks completed + + curr_stats = RegistrationStatistics( + time_spent_total=0.0, + time_average=0.0, + rounds_total=0, + time_spent=0.0, + hash_rate_perpetual=0.0, + hash_rate=0.0, + difficulty=difficulty, + block_number=block_number, + block_hash=block_hash, + ) + + start_time_perpetual = time.time() + + logger = RegistrationStatisticsLogger(bt_console, output_in_place) + logger.start() + + solution = None + + hash_rates = [0] * n_samples # The last n true hash_rates + weights = [alpha_**i for i in range(n_samples)] # weights decay by alpha + + while netuid == -1 or not subtensor.is_hotkey_registered( + netuid=netuid, hotkey_ss58=wallet.hotkey.ss58_address + ): + # Wait until a solver finds a solution + try: + solution = solution_queue.get(block=True, timeout=0.25) + if solution is not None: + break + except Empty: + # No solution found, try again + pass + + # check for new block + old_block_number = _check_for_newest_block_and_update( + subtensor=subtensor, + netuid=netuid, + hotkey_bytes=hotkey_bytes, + old_block_number=old_block_number, + curr_diff=curr_diff, + curr_block=curr_block, + curr_block_num=curr_block_num, + curr_stats=curr_stats, + update_curr_block=_update_curr_block, + check_block=check_block, + solvers=solvers, + ) + + num_time = 0 + for finished_queue in finished_queues: + try: + proc_num = finished_queue.get(timeout=0.1) + num_time += 1 + + except Empty: + continue + + time_now = time.time() # get current time + time_since_last = time_now - time_last # get time since last work block(s) + if num_time > 0 and time_since_last > 0.0: + # create EWMA of the hash_rate to make measure more robust + + hash_rate_ = (num_time * update_interval) / time_since_last + hash_rates.append(hash_rate_) + hash_rates.pop(0) # remove the 0th data point + curr_stats.hash_rate = sum( + [hash_rates[i] * weights[i] for i in range(n_samples)] + ) / (sum(weights)) + + # update time last to now + time_last = time_now + + curr_stats.time_average = ( + curr_stats.time_average * curr_stats.rounds_total + + curr_stats.time_spent + ) / (curr_stats.rounds_total + num_time) + curr_stats.rounds_total += num_time + + # Update stats + curr_stats.time_spent = time_since_last + new_time_spent_total = time_now - start_time_perpetual + curr_stats.hash_rate_perpetual = ( + curr_stats.rounds_total * update_interval + ) / new_time_spent_total + curr_stats.time_spent_total = new_time_spent_total + + # Update the logger + logger.update(curr_stats, verbose=log_verbose) + + # exited while, solution contains the nonce or wallet is registered + stopEvent.set() # stop all other processes + logger.stop() + + # terminate and wait for all solvers to exit + _terminate_workers_and_wait_for_exit(solvers) + + return solution + + +@backoff.on_exception(backoff.constant, Exception, interval=1, max_tries=3) +def _get_block_with_retry( + subtensor: "Subtensor", netuid: int +) -> tuple[int, int, bytes]: + """ + Gets the current block number, difficulty, and block hash from the substrate node. + + Args: + subtensor (bittensor.core.subtensor.Subtensor): The subtensor object to use to get the block number, difficulty, and block hash. + netuid (int): The netuid of the network to get the block number, difficulty, and block hash from. + + Returns: + tuple[int, int, bytes] + block_number (int): The current block number. + difficulty (int): The current difficulty of the subnet. + block_hash (bytes): The current block hash. + + Raises: + Exception: If the block hash is None. + ValueError: If the difficulty is None. + """ + block_number = subtensor.get_current_block() + difficulty = 1_000_000 if netuid == -1 else subtensor.difficulty(netuid=netuid) + block_hash = subtensor.get_block_hash(block_number) + if block_hash is None: + raise Exception( + "Network error. Could not connect to substrate to get block hash" + ) + if difficulty is None: + raise ValueError("Chain error. Difficulty is None") + return block_number, difficulty, block_hash + + +def _check_for_newest_block_and_update( + subtensor: "Subtensor", + netuid: int, + old_block_number: int, + hotkey_bytes: bytes, + curr_diff: "multiprocessing.Array", + curr_block: "multiprocessing.Array", + curr_block_num: "multiprocessing.Value", + update_curr_block: "Callable", + check_block: "multiprocessing.Lock", + solvers: Union[list["_Solver"], list["_CUDASolver"]], + curr_stats: "RegistrationStatistics", +) -> int: + """ + Checks for a new block and updates the current block information if a new block is found. + + Args: + subtensor (bittensor.core.subtensor.Subtensor): The subtensor object to use for getting the current block. + netuid (int): The netuid to use for retrieving the difficulty. + old_block_number (int): The old block number to check against. + hotkey_bytes (bytes): The bytes of the hotkey's pubkey. + curr_diff (multiprocessing.Array): The current difficulty as a multiprocessing array. + curr_block (multiprocessing.Array): Where the current block is stored as a multiprocessing array. + curr_block_num (multiprocessing.Value): Where the current block number is stored as a multiprocessing value. + update_curr_block (typing.Callable): A function that updates the current block. + check_block (multiprocessing.Lock): A mp lock that is used to check for a new block. + solvers (list[bittensor.utils.registration._Solver]): A list of solvers to update the current block for. + curr_stats (bittensor.utils.registration.RegistrationStatistics): The current registration statistics to update. + + Returns: + (int) The current block number. + """ + block_number = subtensor.get_current_block() + if block_number != old_block_number: + old_block_number = block_number + # update block information + block_number, difficulty, block_hash = _get_block_with_retry( + subtensor=subtensor, netuid=netuid + ) + block_bytes = bytes.fromhex(block_hash[2:]) + + update_curr_block( + curr_diff, + curr_block, + curr_block_num, + block_number, + block_bytes, + difficulty, + hotkey_bytes, + check_block, + ) + # Set new block events for each solver + + for worker in solvers: + worker.newBlockEvent.set() + + # update stats + curr_stats.block_number = block_number + curr_stats.block_hash = block_hash + curr_stats.difficulty = difficulty + + return old_block_number + + +def _solve_for_difficulty_fast_cuda( + subtensor: "Subtensor", + wallet: "Wallet", + netuid: int, + output_in_place: bool = True, + update_interval: int = 50_000, + tpb: int = 512, + dev_id: Union[list[int], int] = 0, + n_samples: int = 10, + alpha_: float = 0.80, + log_verbose: bool = False, +) -> Optional["POWSolution"]: + """ + Solves the registration fast using CUDA. + + Args: + subtensor (bittensor.core.subtensor.Subtensor): The subtensor node to grab blocks. + wallet (bittensor_wallet.Wallet): The wallet to register. + netuid (int): The netuid of the subnet to register to. + output_in_place (bool) If true, prints the output in place, otherwise prints to new lines. + update_interval (int): The number of nonces to try before checking for more blocks. + tpb (int): The number of threads per block. CUDA param that should match the GPU capability + dev_id (Union[list[int], int]): The CUDA device IDs to execute the registration on, either a single device or a list of devices. + n_samples (int): The number of samples of the hash_rate to keep for the EWMA. + alpha_ (float): The alpha for the EWMA for the hash_rate calculation. + log_verbose (bool): If true, prints more verbose logging of the registration metrics. + + Note: The hash rate is calculated as an exponentially weighted moving average in order to make the measure more robust. + """ + if isinstance(dev_id, int): + dev_id = [dev_id] + elif dev_id is None: + dev_id = [0] + + if update_interval is None: + update_interval = 50_000 + + if not torch.cuda.is_available(): + raise Exception("CUDA not available") + + limit = int(math.pow(2, 256)) - 1 + + # Set mp start to use spawn so CUDA doesn't complain + with _UsingSpawnStartMethod(force=True): + curr_block, curr_block_num, curr_diff = _CUDASolver.create_shared_memory() + + # Create a worker per CUDA device + num_processes = len(dev_id) + + # Establish communication queues + stopEvent = multiprocessing.Event() + stopEvent.clear() + solution_queue = multiprocessing.Queue() + finished_queues = [multiprocessing.Queue() for _ in range(num_processes)] + check_block = multiprocessing.Lock() + + hotkey_bytes = wallet.hotkey.public_key + # Start workers + solvers = [ + _CUDASolver( + i, + num_processes, + update_interval, + finished_queues[i], + solution_queue, + stopEvent, + curr_block, + curr_block_num, + curr_diff, + check_block, + limit, + dev_id[i], + tpb, + ) + for i in range(num_processes) + ] + + # Get first block + block_number, difficulty, block_hash = _get_block_with_retry( + subtensor=subtensor, netuid=netuid + ) + + block_bytes = bytes.fromhex(block_hash[2:]) + old_block_number = block_number + + # Set to current block + _update_curr_block( + curr_diff, + curr_block, + curr_block_num, + block_number, + block_bytes, + difficulty, + hotkey_bytes, + check_block, + ) + + # Set new block events for each solver to start at the initial block + for worker in solvers: + worker.newBlockEvent.set() + + for worker in solvers: + worker.start() # start the solver processes + + start_time = time.time() # time that the registration started + time_last = start_time # time that the last work blocks completed + + curr_stats = RegistrationStatistics( + time_spent_total=0.0, + time_average=0.0, + rounds_total=0, + time_spent=0.0, + hash_rate_perpetual=0.0, + hash_rate=0.0, # EWMA hash_rate (H/s) + difficulty=difficulty, + block_number=block_number, + block_hash=block_hash, + ) + + start_time_perpetual = time.time() + + logger = RegistrationStatisticsLogger(bt_console, output_in_place) + logger.start() + + hash_rates = [0] * n_samples # The last n true hash_rates + weights = [alpha_**i for i in range(n_samples)] # weights decay by alpha + + solution = None + while netuid == -1 or not subtensor.is_hotkey_registered( + netuid=netuid, hotkey_ss58=wallet.hotkey.ss58_address + ): + # Wait until a solver finds a solution + try: + solution = solution_queue.get(block=True, timeout=0.15) + if solution is not None: + break + except Empty: + # No solution found, try again + pass + + # check for new block + old_block_number = _check_for_newest_block_and_update( + subtensor=subtensor, + netuid=netuid, + hotkey_bytes=hotkey_bytes, + curr_diff=curr_diff, + curr_block=curr_block, + curr_block_num=curr_block_num, + old_block_number=old_block_number, + curr_stats=curr_stats, + update_curr_block=_update_curr_block, + check_block=check_block, + solvers=solvers, + ) + + num_time = 0 + # Get times for each solver + for finished_queue in finished_queues: + try: + proc_num = finished_queue.get(timeout=0.1) + num_time += 1 + + except Empty: + continue + + time_now = time.time() # get current time + time_since_last = time_now - time_last # get time since last work block(s) + if num_time > 0 and time_since_last > 0.0: + # create EWMA of the hash_rate to make measure more robust + + hash_rate_ = (num_time * tpb * update_interval) / time_since_last + hash_rates.append(hash_rate_) + hash_rates.pop(0) # remove the 0th data point + curr_stats.hash_rate = sum( + [hash_rates[i] * weights[i] for i in range(n_samples)] + ) / (sum(weights)) + + # update time last to now + time_last = time_now + + curr_stats.time_average = ( + curr_stats.time_average * curr_stats.rounds_total + + curr_stats.time_spent + ) / (curr_stats.rounds_total + num_time) + curr_stats.rounds_total += num_time + + # Update stats + curr_stats.time_spent = time_since_last + new_time_spent_total = time_now - start_time_perpetual + curr_stats.hash_rate_perpetual = ( + curr_stats.rounds_total * (tpb * update_interval) + ) / new_time_spent_total + curr_stats.time_spent_total = new_time_spent_total + + # Update the logger + logger.update(curr_stats, verbose=log_verbose) + + # exited while, found_solution contains the nonce or wallet is registered + + stopEvent.set() # stop all other processes + logger.stop() + + # terminate and wait for all solvers to exit + _terminate_workers_and_wait_for_exit(solvers) + + return solution + + +def _terminate_workers_and_wait_for_exit( + workers: list[Union[multiprocessing.Process, QueueType]], +) -> None: + for worker in workers: + if isinstance(worker, QueueType): + worker.join_thread() + else: + try: + worker.join(3.0) + except subprocess.TimeoutExpired: + worker.terminate() + try: + worker.close() + except ValueError: + worker.terminate() + + +def create_pow( + subtensor: "Subtensor", + wallet: "Wallet", + netuid: int, + output_in_place: bool = True, + cuda: bool = False, + dev_id: Union[list[int], int] = 0, + tpb: int = 256, + num_processes: Optional[int] = None, + update_interval: Optional[int] = None, + log_verbose: bool = False, +) -> Optional[dict[str, Any]]: + """ + Creates a proof of work for the given subtensor and wallet. + + Args: + subtensor (bittensor.core.subtensor.Subtensor): The subtensor to create a proof of work for. + wallet (bittensor_wallet.Wallet): The wallet to create a proof of work for. + netuid (int): The netuid for the subnet to create a proof of work for. + output_in_place (bool): If true, prints the progress of the proof of work to the console in-place. Meaning the progress is printed on the same lines. Default is ``True``. + cuda (bool): If true, uses CUDA to solve the proof of work. Default is ``False``. + dev_id (Union[List[int], int]): The CUDA device id(s) to use. If cuda is true and dev_id is a list, then multiple CUDA devices will be used to solve the proof of work. Default is ``0``. + tpb (int): The number of threads per block to use when solving the proof of work. Should be a multiple of 32. Default is ``256``. + num_processes (Optional[int]): The number of processes to use when solving the proof of work. If None, then the number of processes is equal to the number of CPU cores. Default is None. + update_interval (Optional[int]): The number of nonces to run before checking for a new block. Default is ``None``. + log_verbose (bool): If true, prints the progress of the proof of work more verbosely. Default is ``False``. + + Returns: + Optional[Dict[str, Any]]: The proof of work solution or None if the wallet is already registered or there is a different error. + + Raises: + ValueError: If the subnet does not exist. + """ + if netuid != -1: + if not subtensor.subnet_exists(netuid=netuid): + raise ValueError(f"Subnet {netuid} does not exist.") + + if cuda: + solution: Optional[POWSolution] = _solve_for_difficulty_fast_cuda( + subtensor, + wallet, + netuid=netuid, + output_in_place=output_in_place, + dev_id=dev_id, + tpb=tpb, + update_interval=update_interval, + log_verbose=log_verbose, + ) + else: + solution: Optional[POWSolution] = _solve_for_difficulty_fast( + subtensor, + wallet, + netuid=netuid, + output_in_place=output_in_place, + num_processes=num_processes, + update_interval=update_interval, + log_verbose=log_verbose, + ) + return solution diff --git a/requirements/prod.txt b/requirements/prod.txt index 4a319c506c..bed65e9d2e 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -1,7 +1,8 @@ wheel setuptools~=70.0.0 -bittensor-cli aiohttp~=3.9 +backoff +bittensor-cli bt-decode colorama~=0.4.6 fastapi~=0.110.1 @@ -12,6 +13,7 @@ nest_asyncio netaddr packaging python-statemachine~=2.1 +pycryptodome>=3.18.0,<4.0.0 pyyaml retry requests diff --git a/tests/integration_tests/test_subtensor_integration.py b/tests/integration_tests/test_subtensor_integration.py index e252cb63f1..552e5ab993 100644 --- a/tests/integration_tests/test_subtensor_integration.py +++ b/tests/integration_tests/test_subtensor_integration.py @@ -15,7 +15,9 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. +import random import unittest +from queue import Empty as QueueEmpty from unittest.mock import MagicMock, patch import pytest @@ -247,6 +249,175 @@ def test_defaults_to_finney(self): assert sub.network == "finney" assert sub.chain_endpoint == settings.FINNEY_ENTRYPOINT + def test_registration_multiprocessed_already_registered(self): + work_blocks_before_is_registered = random.randint(5, 10) + # return False each work block but return True after a random number of blocks + is_registered_return_values = ( + [False for _ in range(work_blocks_before_is_registered)] + + [True] + + [True, False] + ) + # this should pass the initial False check in the subtensor class and then return True because the neuron is already registered + + mock_neuron = MagicMock() + mock_neuron.is_null = True + + # patch solution queue to return None + with patch( + "multiprocessing.queues.Queue.get", return_value=None + ) as mock_queue_get: + # patch time queue get to raise Empty exception + with patch( + "multiprocessing.queues.Queue.get_nowait", side_effect=QueueEmpty + ) as mock_queue_get_nowait: + wallet = get_mock_wallet( + hotkey=get_mock_keypair(0, self.id()), + coldkey=get_mock_keypair(1, self.id()), + ) + self.subtensor.is_hotkey_registered = MagicMock( + side_effect=is_registered_return_values + ) + + self.subtensor.difficulty = MagicMock(return_value=1) + self.subtensor.get_neuron_for_pubkey_and_subnet = MagicMock( + side_effect=mock_neuron + ) + self.subtensor._do_pow_register = MagicMock(return_value=(True, None)) + + with patch("bittensor.core.settings.bt_console") as mock_set_status: + # Need to patch the console status to avoid opening a parallel live display + mock_set_status.__enter__ = MagicMock(return_value=True) + mock_set_status.__exit__ = MagicMock(return_value=True) + + # should return True + assert self.subtensor.register( + wallet=wallet, netuid=3, num_processes=3, update_interval=5 + ) + + # calls until True and once again before exiting subtensor class + # This assertion is currently broken when difficulty is too low + assert ( + self.subtensor.is_hotkey_registered.call_count + == work_blocks_before_is_registered + 2 + ) + + def test_registration_partly_failed(self): + do_pow_register_mock = MagicMock( + side_effect=[(False, "Failed"), (False, "Failed"), (True, None)] + ) + + def is_registered_side_effect(*args, **kwargs): + nonlocal do_pow_register_mock + return do_pow_register_mock.call_count < 3 + + current_block = [i for i in range(0, 100)] + + wallet = get_mock_wallet( + hotkey=get_mock_keypair(0, self.id()), + coldkey=get_mock_keypair(1, self.id()), + ) + + self.subtensor.get_neuron_for_pubkey_and_subnet = MagicMock( + return_value=bittensor.NeuronInfo.get_null_neuron() + ) + self.subtensor.is_hotkey_registered = MagicMock( + side_effect=is_registered_side_effect + ) + + self.subtensor.difficulty = MagicMock(return_value=1) + self.subtensor.get_current_block = MagicMock(side_effect=current_block) + self.subtensor._do_pow_register = do_pow_register_mock + + # should return True + self.assertTrue( + self.subtensor.register( + wallet=wallet, netuid=3, num_processes=3, update_interval=5 + ), + msg="Registration should succeed", + ) + + def test_registration_failed(self): + is_registered_return_values = [False for _ in range(100)] + current_block = [i for i in range(0, 100)] + mock_neuron = MagicMock() + mock_neuron.is_null = True + + with patch( + "bittensor.core.extrinsics.registration.create_pow", return_value=None + ) as mock_create_pow: + wallet = get_mock_wallet( + hotkey=get_mock_keypair(0, self.id()), + coldkey=get_mock_keypair(1, self.id()), + ) + + self.subtensor.is_hotkey_registered = MagicMock( + side_effect=is_registered_return_values + ) + + self.subtensor.get_current_block = MagicMock(side_effect=current_block) + self.subtensor.get_neuron_for_pubkey_and_subnet = MagicMock( + return_value=mock_neuron + ) + self.subtensor.substrate.get_block_hash = MagicMock( + return_value="0x" + "0" * 64 + ) + self.subtensor._do_pow_register = MagicMock(return_value=(False, "Failed")) + + # should return True + self.assertIsNot( + self.subtensor.register(wallet=wallet, netuid=3), + True, + msg="Registration should fail", + ) + self.assertEqual(mock_create_pow.call_count, 3) + + def test_registration_stale_then_continue(self): + # verify that after a stale solution, to solve will continue without exiting + + class ExitEarly(Exception): + pass + + mock_is_stale = MagicMock(side_effect=[True, False]) + + mock_do_pow_register = MagicMock(side_effect=ExitEarly()) + + mock_subtensor_self = MagicMock( + neuron_for_pubkey=MagicMock( + return_value=MagicMock(is_null=True) + ), # not registered + substrate=MagicMock( + get_block_hash=MagicMock(return_value="0x" + "0" * 64), + ), + ) + + mock_wallet = MagicMock() + + mock_create_pow = MagicMock(return_value=MagicMock(is_stale=mock_is_stale)) + + with patch( + "bittensor.core.extrinsics.registration.create_pow", mock_create_pow + ), patch( + "bittensor.core.extrinsics.registration._do_pow_register", + mock_do_pow_register, + ): + # should create a pow and check if it is stale + # then should create a new pow and check if it is stale + # then should enter substrate and exit early because of test + self.subtensor.get_neuron_for_pubkey_and_subnet = MagicMock( + return_value=bittensor.NeuronInfo.get_null_neuron() + ) + with pytest.raises(ExitEarly): + bittensor.subtensor.register(mock_subtensor_self, mock_wallet, netuid=3) + self.assertEqual( + mock_create_pow.call_count, 2, msg="must try another pow after stale" + ) + self.assertEqual(mock_is_stale.call_count, 2) + self.assertEqual( + mock_do_pow_register.call_count, + 1, + msg="only tries to submit once, then exits", + ) + if __name__ == "__main__": unittest.main() diff --git a/tests/unit_tests/extrinsics/test_registration.py b/tests/unit_tests/extrinsics/test_registration.py new file mode 100644 index 0000000000..b1bc1f0725 --- /dev/null +++ b/tests/unit_tests/extrinsics/test_registration.py @@ -0,0 +1,181 @@ +# The MIT License (MIT) +# Copyright © 2024 Opentensor Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. +# +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +import pytest +from bittensor_wallet import Wallet + +from bittensor.core.extrinsics.registration import ( + register_extrinsic, +) +from bittensor.core.subtensor import Subtensor +from bittensor.utils.registration import POWSolution + + +# Mocking external dependencies +@pytest.fixture +def mock_subtensor(mocker): + mock = mocker.MagicMock(spec=Subtensor) + mock.network = "mock_network" + mock.substrate = mocker.MagicMock() + return mock + + +@pytest.fixture +def mock_wallet(mocker): + mock = mocker.MagicMock(spec=Wallet) + mock.coldkeypub.ss58_address = "mock_address" + mock.coldkey = mocker.MagicMock() + mock.hotkey = mocker.MagicMock() + mock.hotkey.ss58_address = "fake_ss58_address" + return mock + + +@pytest.fixture +def mock_pow_solution(mocker): + mock = mocker.MagicMock(spec=POWSolution) + mock.block_number = 123 + mock.nonce = 456 + mock.seal = [0, 1, 2, 3] + mock.is_stale.return_value = False + return mock + + +@pytest.fixture +def mock_new_wallet(mocker): + mock = mocker.MagicMock(spec=Wallet) + mock.coldkeypub.ss58_address = "mock_address" + mock.coldkey = mocker.MagicMock() + mock.hotkey = mocker.MagicMock() + return mock + + +@pytest.mark.parametrize( + "subnet_exists, neuron_is_null, prompt, prompt_response, cuda_available, expected_result, test_id", + [ + (False, True, True, True, True, False, "subnet-does-not-exist"), + (True, False, True, True, True, True, "neuron-already-registered"), + (True, True, True, False, True, False, "user-declines-prompt"), + (True, True, False, None, False, False, "cuda-unavailable"), + ], +) +def test_register_extrinsic_without_pow( + mock_subtensor, + mock_wallet, + subnet_exists, + neuron_is_null, + prompt, + prompt_response, + cuda_available, + expected_result, + test_id, + mocker, +): + # Arrange + with mocker.patch.object( + mock_subtensor, "subnet_exists", return_value=subnet_exists + ), mocker.patch.object( + mock_subtensor, + "get_neuron_for_pubkey_and_subnet", + return_value=mocker.MagicMock(is_null=neuron_is_null), + ), mocker.patch( + "rich.prompt.Confirm.ask", return_value=prompt_response + ), mocker.patch("torch.cuda.is_available", return_value=cuda_available): + # Act + result = register_extrinsic( + subtensor=mock_subtensor, + wallet=mock_wallet, + netuid=123, + wait_for_inclusion=True, + wait_for_finalization=True, + prompt=prompt, + max_allowed_attempts=3, + output_in_place=True, + cuda=True, + dev_id=0, + tpb=256, + num_processes=None, + update_interval=None, + log_verbose=False, + ) + + # Assert + assert result == expected_result, f"Test failed for test_id: {test_id}" + + +@pytest.mark.parametrize( + "pow_success, pow_stale, registration_success, cuda, hotkey_registered, expected_result, test_id", + [ + (True, False, True, False, False, True, "successful-with-valid-pow"), + (True, False, True, True, False, True, "successful-with-valid-cuda-pow"), + # Pow failed but key was registered already + (False, False, False, False, True, True, "hotkey-registered"), + # Pow was a success but registration failed with error 'key already registered' + (True, False, False, False, False, True, "registration-fail-key-registered"), + ], +) +def test_register_extrinsic_with_pow( + mock_subtensor, + mock_wallet, + mock_pow_solution, + pow_success, + pow_stale, + registration_success, + cuda, + hotkey_registered, + expected_result, + test_id, + mocker, +): + # Arrange + with mocker.patch( + "bittensor.utils.registration._solve_for_difficulty_fast", + return_value=mock_pow_solution if pow_success else None, + ), mocker.patch( + "bittensor.utils.registration._solve_for_difficulty_fast_cuda", + return_value=mock_pow_solution if pow_success else None, + ), mocker.patch( + "bittensor.core.extrinsics.registration._do_pow_register", + return_value=(registration_success, "HotKeyAlreadyRegisteredInSubNet"), + ), mocker.patch("torch.cuda.is_available", return_value=cuda): + # Act + if pow_success: + mock_pow_solution.is_stale.return_value = pow_stale + + if not pow_success and hotkey_registered: + mock_subtensor.is_hotkey_registered = mocker.MagicMock( + return_value=hotkey_registered + ) + + result = register_extrinsic( + subtensor=mock_subtensor, + wallet=mock_wallet, + netuid=123, + wait_for_inclusion=True, + wait_for_finalization=True, + prompt=False, + max_allowed_attempts=3, + output_in_place=True, + cuda=cuda, + dev_id=0, + tpb=256, + num_processes=None, + update_interval=None, + log_verbose=False, + ) + + # Assert + assert result == expected_result, f"Test failed for test_id: {test_id}." diff --git a/tests/unit_tests/utils/test_formatting.py b/tests/unit_tests/utils/test_formatting.py new file mode 100644 index 0000000000..3c223a48b3 --- /dev/null +++ b/tests/unit_tests/utils/test_formatting.py @@ -0,0 +1,80 @@ +# The MIT License (MIT) +# Copyright © 2024 Opentensor Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. +# +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +import math + +from bittensor.utils import formatting + + +def test_get_human_readable(): + """Tests the `get_human_readable` function in the `formatting` module.""" + num1 = 1000 + num2 = 1_000_000 + num3 = 1_000_000_000 + num4 = 150 + negative_num = -1000 + + # Test for default output + assert formatting.get_human_readable(num1) == "1.0KH" + + # Test for different quantities + assert formatting.get_human_readable(num2) == "1.0MH" + assert formatting.get_human_readable(num3) == "1.0GH" + + # Test for numbers less than 1000 + assert formatting.get_human_readable(num4) == "150.0H" + + # Test for negative numbers + assert formatting.get_human_readable(negative_num) == "-1.0KH" + + # Test for different suffix + assert formatting.get_human_readable(num1, suffix="B") == "1.0KB" + assert formatting.get_human_readable(num2, suffix="B") == "1.0MB" + assert formatting.get_human_readable(num3, suffix="B") == "1.0GB" + assert formatting.get_human_readable(num4, suffix="B") == "150.0B" + assert formatting.get_human_readable(negative_num, suffix="B") == "-1.0KB" + + +def test_millify(): + """Test millify function with various cases.""" + # Testing with value 0 + assert formatting.millify(0) == "0.00" + # Testing with a number in the tens + assert formatting.millify(10) == "10.00" + # Testing with a number in the hundreds + assert formatting.millify(100) == "100.00" + # Testing with a number in the thousands + assert formatting.millify(1000) == "1.00 K" + # Testing with a number in the millions + assert formatting.millify(1000000) == "1.00 M" + # Testing with a number in the billions + assert formatting.millify(1000000000) == "1.00 B" + # Testing with a number in the trillions + assert formatting.millify(1000000000000) == "1.00 T" + # Testing with maximum limit + mill_names = ["", " K", " M", " B", " T"] + n = 10 ** (3 * (len(mill_names) - 1) + 1) + mill_idx = max( + 0, + min( + len(mill_names) - 1, + int(math.floor(0 if n == 0 else math.log10(abs(n)) / 3)), + ), + ) + assert formatting.millify(n) == "{:.2f}{}".format( + n / 10 ** (3 * mill_idx), mill_names[mill_idx] + )