From 5749900ed885d224055ee8eca687a8301f8a7d20 Mon Sep 17 00:00:00 2001 From: tedbee Date: Mon, 2 Dec 2024 15:39:06 +0800 Subject: [PATCH 1/7] feat: simulator --- .github/workflows/docker_build.yaml | 3 +- commons/objects.py | 24 ++- dojo/__init__.py | 2 +- dojo/utils/config.py | 12 ++ entrypoints.sh | 27 ++- simulator/__init__.py | 0 simulator/miner.py | 173 +++++++++++++++++++ simulator/validator.py | 254 ++++++++++++++++++++++++++++ 8 files changed, 482 insertions(+), 13 deletions(-) create mode 100644 simulator/__init__.py create mode 100644 simulator/miner.py create mode 100644 simulator/validator.py diff --git a/.github/workflows/docker_build.yaml b/.github/workflows/docker_build.yaml index 2effd247..ddda3a98 100644 --- a/.github/workflows/docker_build.yaml +++ b/.github/workflows/docker_build.yaml @@ -6,6 +6,7 @@ on: - dev - staging - main + - simulator jobs: docker_publish: @@ -28,7 +29,7 @@ jobs: echo "BRANCH_NAME=$SANITIZED_BRANCH_NAME" >> $GITHUB_ENV - name: Build and Push Docker Image with Branch Tag - if: github.ref == 'refs/heads/dev' || github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' + if: github.ref == 'refs/heads/dev' || github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/simulator' uses: macbre/push-to-ghcr@master with: image_name: ${{ github.repository }} diff --git a/commons/objects.py b/commons/objects.py index 114c49e1..1d8ae1c9 100644 --- a/commons/objects.py +++ b/commons/objects.py @@ -9,18 +9,26 @@ class ObjectManager: @classmethod def get_miner(cls): - from neurons.miner import Miner - - if cls._miner is None: - cls._miner = Miner() + if get_config().simulation: + from simulator.miner import MinerSim + if cls._miner is None: + cls._miner = MinerSim() + else: + from neurons.miner import Miner + if cls._miner is None: + cls._miner = Miner() return cls._miner @classmethod def get_validator(cls): - from neurons.validator import Validator - - if cls._validator is None: - cls._validator = Validator() + if get_config().simulation: + from simulator.validator import ValidatorSim + if cls._validator is None: + cls._validator = ValidatorSim() + else: + from neurons.validator import Validator + if cls._validator is None: + cls._validator = Validator() return cls._validator @classmethod diff --git a/dojo/__init__.py b/dojo/__init__.py index 11625674..5674629d 100644 --- a/dojo/__init__.py +++ b/dojo/__init__.py @@ -59,4 +59,4 @@ def get_dojo_api_base_url() -> str: if base_url is None: raise ValueError("DOJO_API_BASE_URL is not set in the environment.") - return base_url + return base_url \ No newline at end of file diff --git a/dojo/utils/config.py b/dojo/utils/config.py index a0022d54..2c399997 100644 --- a/dojo/utils/config.py +++ b/dojo/utils/config.py @@ -178,6 +178,18 @@ def add_args(parser): help="Whether to run in fast mode, for developers to test locally.", ) + parser.add_argument( + "--simulation", + action="store_true", + help="Whether to run the validator in simulation mode", + ) + + parser.add_argument( + "--simulation_bad_miner", + action="store_true", + help="Set miner simluation to a bad one", + ) + epoch_length = 100 known_args, _ = parser.parse_known_args() if known_args := vars(known_args): diff --git a/entrypoints.sh b/entrypoints.sh index 7f64dfe7..325fc7ad 100755 --- a/entrypoints.sh +++ b/entrypoints.sh @@ -21,6 +21,17 @@ if [ "$1" = 'miner' ]; then echo "SUBTENSOR_ENDPOINT: ${SUBTENSOR_ENDPOINT}" echo "NETUID: ${NETUID}" + EXTRA_ARGS="" + if [ "${SIMULATION}" = "true" ]; then + EXTRA_ARGS="${EXTRA_ARGS} --simulation" + fi + if [ "${FAST_MODE}" = "true" ]; then + EXTRA_ARGS="${EXTRA_ARGS} --fast_mode" + fi + if [ "${SIMULATION_BAD_MINER}" = "true" ]; then + EXTRA_ARGS="${EXTRA_ARGS} --simulation_bad_miner" + fi + python main_miner.py \ --netuid ${NETUID} \ --subtensor.network ${SUBTENSOR_NETWORK} \ @@ -29,7 +40,8 @@ if [ "$1" = 'miner' ]; then --wallet.name ${WALLET_COLDKEY} \ --wallet.hotkey ${WALLET_HOTKEY} \ --axon.port ${AXON_PORT} \ - --neuron.type miner + --neuron.type miner \ + ${EXTRA_ARGS} fi # If the first argument is 'validator', run the validator script @@ -43,6 +55,14 @@ if [ "$1" = 'validator' ]; then echo "NETUID: ${NETUID}" echo "WANDB_PROJECT_NAME: ${WANDB_PROJECT_NAME}" + EXTRA_ARGS="" + if [ "${SIMULATION}" = "true" ]; then + EXTRA_ARGS="${EXTRA_ARGS} --simulation" + fi + if [ "${FAST_MODE}" = "true" ]; then + EXTRA_ARGS="${EXTRA_ARGS} --fast_mode" + fi + python main_validator.py \ --netuid ${NETUID} \ --subtensor.network ${SUBTENSOR_NETWORK} \ @@ -51,5 +71,6 @@ if [ "$1" = 'validator' ]; then --wallet.name ${WALLET_COLDKEY} \ --wallet.hotkey ${WALLET_HOTKEY} \ --neuron.type validator \ - --wandb.project_name ${WANDB_PROJECT_NAME} -fi + --wandb.project_name ${WANDB_PROJECT_NAME} \ + ${EXTRA_ARGS} +fi \ No newline at end of file diff --git a/simulator/__init__.py b/simulator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/simulator/miner.py b/simulator/miner.py new file mode 100644 index 00000000..a3e891c9 --- /dev/null +++ b/simulator/miner.py @@ -0,0 +1,173 @@ +import os +import redis +import traceback +import asyncio +import random +import json +from datetime import datetime, timezone +from dojo.utils.config import get_config +from bittensor.btlogging import logging as logger +from neurons.miner import Miner +from dojo.protocol import ( + FeedbackRequest, + TaskResultRequest, + TaskResult, + Result +) +from commons.utils import get_new_uuid + + +class MinerSim(Miner): + def __init__(self): + super().__init__() + try: + # Initialize Redis connection + host = os.getenv("REDIS_HOST", "localhost") + port = int(os.getenv("REDIS_PORT", 6379)) + self.redis_client = redis.Redis( + host=host, + port=port, + db=0, + decode_responses=True + ) + logger.info("Redis connection established") + + self._configure_simulation() + + self.is_bad_miner = get_config().simulation_bad_miner + logger.info(f"Miner role set to: {'bad' if self.is_bad_miner else 'good'}") + + logger.info("Starting Miner Simulator") + except Exception as e: + logger.error(f"Failed to connect to Redis: {e}") + raise + + def _configure_simulation(self): + """Configure simulation parameters with environment variables or defaults.""" + self.response_behaviors = { + 'normal': float(os.getenv("SIM_NORMAL_RESP_PROB", 0.8)), + 'no_response': float(os.getenv("SIM_NO_RESP_PROB", 0.1)), + 'timeout': float(os.getenv("SIM_TIMEOUT_PROB", 0.1)) + } + + async def forward_feedback_request(self, synapse: FeedbackRequest) -> FeedbackRequest: + try: + # Validate that synapse, dendrite, dendrite.hotkey, and response are not None + if not synapse or not synapse.dendrite or not synapse.dendrite.hotkey: + logger.error("Invalid synapse: dendrite or dendrite.hotkey is None.") + return synapse + + if not synapse.completion_responses: + logger.error("Invalid synapse: response field is None.") + return synapse + + # Empty out completion response since not needed in simulator + new_synapse = synapse.model_copy(deep=True) + new_synapse.completion_responses = [] + + synapse.dojo_task_id = synapse.request_id + self.hotkey_to_request[synapse.dendrite.hotkey] = synapse + + redis_key = f"feedback:{synapse.request_id}" + self.redis_client.set( + redis_key, + new_synapse.model_dump_json(), + ex=86400 # expire after 24 hours + ) + logger.info(f"Stored feedback request {synapse.request_id}") + + synapse.ground_truth = {} + return synapse + + except Exception as e: + logger.error(f"Error handling FeedbackRequest: {e}") + traceback.print_exc() + return synapse + + async def forward_task_result_request(self, synapse: TaskResultRequest) -> TaskResultRequest | None: + try: + logger.info(f"Received TaskResultRequest for task id: {synapse.task_id}") + if not synapse or not synapse.task_id: + logger.error("Invalid TaskResultRequest: missing task_id") + return None + + # Simulate different response behaviors + behavior = self._get_response_behavior() + + if behavior in ['no_response', 'timeout']: + logger.debug(f"Simulating {behavior} for task {synapse.task_id}") + if behavior == 'timeout': + await asyncio.sleep(30) + return None + + redis_key = f"feedback:{synapse.task_id}" + request_data = self.redis_client.get(redis_key) + + request_dict = json.loads(request_data) if request_data else None + feedback_request = FeedbackRequest(**request_dict) if request_dict else None + + if not feedback_request: + logger.debug(f"No task result found for task id: {synapse.task_id}") + return None + + current_time = datetime.now(timezone.utc).isoformat() + + task_results = [] + for criteria_type in feedback_request.criteria_types: + result = Result( + type=criteria_type.type, + value=self._generate_scores(feedback_request.ground_truth) + ) + + task_result = TaskResult( + id=get_new_uuid(), + status='COMPLETED', + created_at=current_time, + updated_at=current_time, + result_data=[result], + worker_id=get_new_uuid(), + task_id=synapse.task_id + ) + task_results.append(task_result) + + synapse.task_results = task_results + logger.info(f"TaskResultRequest: {synapse}") + + self.redis_client.delete(redis_key) + logger.debug(f"Processed task result for task {synapse.task_id}") + + return synapse + + except Exception as e: + traceback.print_exc() + logger.error(f"Error handling TaskResultRequest: {e}") + return None + + def _get_response_behavior(self) -> str: + """Determine the response behavior based on configured probabilities.""" + return random.choices( + list(self.response_behaviors.keys()), + weights=list(self.response_behaviors.values()) + )[0] + + def _generate_scores(self, ground_truth: dict) -> dict: + scores = {} + + for k, v in ground_truth.items(): + if self.is_bad_miner: + deviation = random.randint(-5, 5) + else: + deviation = random.randint(-2, 2) + random_score = max(1, min(10, v + deviation)) + score = int((random_score / (10 - 1)) * (100 - 1) + 1) + scores[k] = score + + return scores + + # def __del__(self): + # """Cleanup Redis connection on object destruction""" + # try: + # self.redis_client.close() + # logger.info("Redis connection closed") + # except: + # pass diff --git a/simulator/validator.py b/simulator/validator.py new file mode 100644 index 00000000..8fc9f2e6 --- /dev/null +++ b/simulator/validator.py @@ -0,0 +1,254 @@ +import logging +import traceback +from typing import List +import aiohttp +import dojo +from commons.dataset.synthetic import SyntheticAPI +from commons.orm import ORM +from commons.utils import get_epoch_time, get_new_uuid, set_expire_time, ttl_get_block +from dojo.protocol import FeedbackRequest, TaskType, MultiScoreCriteria, DendriteQueryResponse +from neurons.validator import Validator +from bittensor.btlogging import logging as logger +from tenacity import RetryError +import bittensor as bt +import asyncio + + +class ValidatorSim(Validator): + def __init__(self): + super().__init__() + logger.info("Starting Validator Simulator") + self._last_block = None + self._block_check_attempts = 0 + self.MAX_BLOCK_CHECK_ATTEMPTS = 3 + + async def _try_reconnect_subtensor(self): + """Attempt to reconnect to the subtensor network""" + try: + logger.info("Attempting to reconnect to subtensor...") + self.subtensor = bt.subtensor(self.subtensor.config) + self._block_check_attempts = 0 + return True + except Exception as e: + logger.error(f"Failed to reconnect to subtensor: {e}") + return False + + @property + def block(self): + try: + self._last_block = ttl_get_block(self.subtensor) + self._block_check_attempts = 0 + return self._last_block + except BrokenPipeError: + self._block_check_attempts += 1 + if self._block_check_attempts >= self.MAX_BLOCK_CHECK_ATTEMPTS: + logger.error("Multiple failed attempts to get block number, attempting reconnection") + if asyncio.get_event_loop().run_until_complete(self._try_reconnect_subtensor()): + return self.block + + return self._last_block if self._last_block is not None else 0 + except Exception as e: + logger.error(f"Error getting block number: {e}") + return self._last_block if self._last_block is not None else 0 + + def check_registered(self): + new_subtensor = bt.subtensor(self.subtensor.config) + if not new_subtensor.is_hotkey_registered( + netuid=self.config.netuid, + hotkey_ss58=self.wallet.hotkey.ss58_address, + ): + logger.error( + f"Wallet: {self.wallet} is not registered on netuid {self.config.netuid}." + f" Please register the hotkey using `btcli s register` before trying again" + ) + exit() + + async def send_request( + self, + synapse: FeedbackRequest | None = None, + external_user: bool = False, + ): + start = get_epoch_time() + # typically the request may come from an external source however, + # initially will seed it with some data for miners to get started + if len(self._active_miner_uids) == 0: + logger.info("No active miners to send request to... skipping") + return + + request_id = get_new_uuid() + # sel_miner_uids = await self.get_miner_uids(external_user, request_id) + sel_miner_uids = sorted(list(self._active_miner_uids)) + + axons = [ + self.metagraph.axons[uid] + for uid in sel_miner_uids + if self.metagraph.axons[uid].hotkey.casefold() + != self.wallet.hotkey.ss58_address.casefold() + ] + if not len(axons): + logger.warning("🤷 No axons to query ... skipping") + return + + obfuscated_model_to_model = {} + + if synapse is None: + try: + data = await SyntheticAPI.get_qa() + except RetryError as e: + logger.error( + f"Exhausted all retry attempts for synthetic data generation: {e}" + ) + return + except ValueError as e: + logger.error(f"Invalid response from synthetic data API: {e}") + return + except aiohttp.ClientError as e: + logger.error(f"Network error when calling synthetic data API: {e}") + return + except Exception as e: + logger.error(f"Unexpected error during synthetic data generation: {e}") + logger.debug(f"Traceback: {traceback.format_exc()}") + return + + if not data: + logger.error("No data returned from synthetic data API") + return + + obfuscated_model_to_model = self.obfuscate_model_names(data.responses) + expire_at = set_expire_time(dojo.TASK_DEADLINE) + synapse = FeedbackRequest( + request_id=request_id, + task_type=str(TaskType.CODE_GENERATION), + criteria_types=[ + MultiScoreCriteria( + options=list(obfuscated_model_to_model.keys()), + min=1.0, + max=100.0, + ), + ], + prompt=data.prompt, + completion_responses=data.responses, + expire_at=expire_at, + ground_truth=data.ground_truth # Added ground truth!!!!! + ) + elif external_user: + obfuscated_model_to_model = self.obfuscate_model_names( + synapse.completion_responses + ) + + logger.info( + f"⬆️ Sending feedback request for request id: {synapse.request_id}, miners uids:{sel_miner_uids} with expire_at: {synapse.expire_at}" + ) + + miner_responses: List[FeedbackRequest] = await self._send_shuffled_requests( + self.dendrite, axons, synapse + ) + + valid_miner_responses: List[FeedbackRequest] = [] + try: + for miner_response in miner_responses: + miner_hotkey = ( + miner_response.axon.hotkey if miner_response.axon else "??" + ) + # map obfuscated model names back to the original model names + real_model_ids = [] + + for i, completion in enumerate(miner_response.completion_responses): + found_model_id = obfuscated_model_to_model.get( + completion.model, None + ) + real_model_ids.append(found_model_id) + if found_model_id: + miner_response.completion_responses[i].model = found_model_id + synapse.completion_responses[i].model = found_model_id + + if any(c is None for c in real_model_ids): + logger.warning("Failed to map obfuscated model to original model") + continue + + if miner_response.dojo_task_id is None: + logger.debug(f"Miner {miner_hotkey} must provide the dojo task id") + continue + + # update the miner response with the real model ids + valid_miner_responses.append(miner_response) + except Exception as e: + logger.error(f"Failed to map obfuscated model to original model: {e}") + pass + + logger.info(f"⬇️ Received {len(valid_miner_responses)} valid responses") + if valid_miner_responses is None or len(valid_miner_responses) == 0: + logger.info("No valid miner responses to process... skipping") + return + + # include the ground_truth to keep in data manager + synapse.ground_truth = data.ground_truth + synapse.dendrite.hotkey = self.wallet.hotkey.ss58_address + response_data = DendriteQueryResponse( + request=synapse, + miner_responses=valid_miner_responses, + ) + + logger.debug("Attempting to saving dendrite response") + vali_request_model = await ORM.save_task( + validator_request=synapse, + miner_responses=valid_miner_responses, + ground_truth=data.ground_truth, + ) + + if vali_request_model is None: + logger.error("Failed to save dendrite response") + return + + # saving response + logger.success( + f"Saved dendrite response for request id: {response_data.request.request_id}" + ) + logger.info( + f"Sending request to miners & processing took {get_epoch_time() - start}" + ) + return + + @staticmethod + async def _send_shuffled_requests( + dendrite: bt.dendrite, axons: List[bt.AxonInfo], synapse: FeedbackRequest + ) -> list[FeedbackRequest]: + """Send the same request to all miners without shuffling the order. + WARNING: This should only be used for testing/debugging as it could allow miners to game the system. + + Args: + dendrite (bt.dendrite): Communication channel to send requests + axons (List[bt.AxonInfo]): List of miner endpoints + synapse (FeedbackRequest): The feedback request to send + + Returns: + list[FeedbackRequest]: List of miner responses + """ + all_responses = [] + batch_size = 10 + + for i in range(0, len(axons), batch_size): + batch_axons = axons[i: i + batch_size] + tasks = [] + + for axon in batch_axons: + tasks.append( + dendrite.forward( + axons=[axon], + synapse=synapse, + deserialize=False, + timeout=12, + ) + ) + + batch_responses = await asyncio.gather(*tasks) + flat_batch_responses = [ + response for sublist in batch_responses for response in sublist + ] + all_responses.extend(flat_batch_responses) + + logger.info( + f"Processed batch {i // batch_size + 1} of {(len(axons) - 1) // batch_size + 1}" + ) + + return all_responses From f1e9329656c7f0cc60b86be8059e6485e059a103 Mon Sep 17 00:00:00 2001 From: jarvis8x7b <157810922+jarvis8x7b@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:06:34 +0800 Subject: [PATCH 2/7] chore: ruff lint --- simulator/miner.py | 21 +++++++++------------ simulator/validator.py | 18 ++++++++++++------ 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/simulator/miner.py b/simulator/miner.py index a3e891c9..2f58e5c8 100644 --- a/simulator/miner.py +++ b/simulator/miner.py @@ -1,20 +1,17 @@ -import os -import redis -import traceback import asyncio -import random import json +import os +import random +import traceback from datetime import datetime, timezone -from dojo.utils.config import get_config + +import redis from bittensor.btlogging import logging as logger -from neurons.miner import Miner -from dojo.protocol import ( - FeedbackRequest, - TaskResultRequest, - TaskResult, - Result -) + from commons.utils import get_new_uuid +from dojo.protocol import FeedbackRequest, Result, TaskResult, TaskResultRequest +from dojo.utils.config import get_config +from neurons.miner import Miner class MinerSim(Miner): diff --git a/simulator/validator.py b/simulator/validator.py index 8fc9f2e6..75ade66c 100644 --- a/simulator/validator.py +++ b/simulator/validator.py @@ -1,17 +1,23 @@ -import logging +import asyncio import traceback from typing import List + import aiohttp +import bittensor as bt +from bittensor.btlogging import logging as logger +from tenacity import RetryError + import dojo from commons.dataset.synthetic import SyntheticAPI from commons.orm import ORM from commons.utils import get_epoch_time, get_new_uuid, set_expire_time, ttl_get_block -from dojo.protocol import FeedbackRequest, TaskType, MultiScoreCriteria, DendriteQueryResponse +from dojo.protocol import ( + DendriteQueryResponse, + FeedbackRequest, + MultiScoreCriteria, + TaskType, +) from neurons.validator import Validator -from bittensor.btlogging import logging as logger -from tenacity import RetryError -import bittensor as bt -import asyncio class ValidatorSim(Validator): From 05ca45a91cd27b455f9048687fbdd2a70b3fef0b Mon Sep 17 00:00:00 2001 From: codebender <167290009+codebender37@users.noreply.github.com> Date: Mon, 2 Dec 2024 14:15:56 +0630 Subject: [PATCH 3/7] refactor: migrate score data from db to .pt file --- commons/score_storage.py | 81 ++++++++++++++++++++++++++++++++++++++++ neurons/validator.py | 17 +++++++-- 2 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 commons/score_storage.py diff --git a/commons/score_storage.py b/commons/score_storage.py new file mode 100644 index 00000000..99805a1e --- /dev/null +++ b/commons/score_storage.py @@ -0,0 +1,81 @@ +import json +from pathlib import Path + +import torch +from bittensor.btlogging import logging as logger + +from database.prisma.models import Score_Model + + +class ScoreStorage: + """Handles persistence of validator scores""" + + SCORES_DIR = Path("scores") + SCORES_FILE = SCORES_DIR / "validator_scores.pt" + + @classmethod + async def migrate_from_db(cls) -> bool: + """One-time migration of scores from database to .pt file + Returns: + bool: True if migration successful or file already exists, False if migration failed + """ + try: + if cls.SCORES_FILE.exists(): + logger.info("Scores file already exists, skipping migration") + return True + + # Get scores from database + score_record = await Score_Model.prisma().find_first( + order={"created_at": "desc"} + ) + if not score_record: + logger.warning("No scores found in database to migrate") + return True # Not an error, just no scores yet + + scores = torch.tensor(json.loads(score_record.score)) + + # Create scores directory if it doesn't exist + cls.SCORES_DIR.mkdir(exist_ok=True) + + # Save scores to .pt file + torch.save(scores, cls.SCORES_FILE) + logger.success(f"Successfully migrated scores to {cls.SCORES_FILE}") + + # Verify the migration + loaded_scores = torch.load(cls.SCORES_FILE) + if torch.equal(scores, loaded_scores): + logger.success("Migration verification successful - scores match") + return True + else: + logger.error("Migration verification failed - scores do not match") + return False + + except Exception as e: + logger.error(f"Failed to migrate scores: {e}") + return False + + @classmethod + async def save(cls, scores: torch.Tensor) -> None: + """Save validator scores to .pt file""" + try: + cls.SCORES_DIR.mkdir(exist_ok=True) + torch.save(scores, cls.SCORES_FILE) + logger.success("Successfully saved validator scores to file") + except Exception as e: + logger.error(f"Failed to save validator scores: {e}") + raise + + @classmethod + async def load(cls) -> torch.Tensor | None: + """Load validator scores from .pt file""" + try: + if not cls.SCORES_FILE.exists(): + logger.warning("No validator scores file found") + return None + + scores = torch.load(cls.SCORES_FILE) + logger.success("Successfully loaded validator scores from file") + return scores + except Exception as e: + logger.error(f"Failed to load validator scores: {e}") + return None diff --git a/neurons/validator.py b/neurons/validator.py index 0d0e620c..126d3baa 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -34,6 +34,7 @@ from commons.obfuscation.obfuscation_utils import obfuscate_html_and_js from commons.objects import ObjectManager from commons.orm import ORM +from commons.score_storage import ScoreStorage from commons.scoring import Scoring from commons.utils import ( _terminal_plot, @@ -103,10 +104,17 @@ def __init__(self): self.scores: torch.Tensor = torch.zeros( len(self.metagraph.hotkeys), dtype=torch.float32 ) - # manually always register and always sync metagraph when application starts self.check_registered() - self.executor = ThreadPoolExecutor(max_workers=2) + # Run score migration before loading state + migration_success = self.loop.run_until_complete(ScoreStorage.migrate_from_db()) + if not migration_success: + logger.error( + "Score migration failed - cannot continue without valid scores" + ) + raise RuntimeError("Score migration failed - validator cannot start") + + self.executor = ThreadPoolExecutor(max_workers=2) self.load_state() init_wandb(config=self.config, my_uid=self.uid, wallet=self.wallet) @@ -713,7 +721,8 @@ async def save_state( logger.warning("Scores are all zeros, but saving anyway!") # raise EmptyScores("Skipping save as scores are all empty") - await ORM.create_or_update_validator_score(self.scores) + # await ORM.create_or_update_validator_score(self.scores) + await ScoreStorage.save(self.scores) logger.success(f"📦 Saved validator state with scores: {self.scores}") except EmptyScores as e: logger.debug(f"No need to to save validator state: {e}") @@ -723,7 +732,7 @@ async def save_state( async def _load_state(self): try: await connect_db() - scores = await ORM.get_validator_score() + scores = await ScoreStorage.load() if scores is None: num_processed_tasks = await ORM.get_num_processed_tasks() From b882fc6088a7e1dd9abd00b6b01a3fd971d0b2f7 Mon Sep 17 00:00:00 2001 From: codebender <167290009+codebender37@users.noreply.github.com> Date: Mon, 2 Dec 2024 16:06:29 +0630 Subject: [PATCH 4/7] fix: add VALIDATOR_MIN_STAKE in environment --- .env.miner.example | 3 +++ commons/score_storage.py | 50 +++++++++++++++++++++++----------------- dojo/__init__.py | 4 ++-- 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/.env.miner.example b/.env.miner.example index 919c020d..9562b10e 100644 --- a/.env.miner.example +++ b/.env.miner.example @@ -48,3 +48,6 @@ S3_PUBLIC_URL= JWT_SECRET= # e.g. infura, alchemy API url for ethereum ETHEREUM_NODE= + +# Add this line with other environment variables +VALIDATOR_MIN_STAKE=20000 diff --git a/commons/score_storage.py b/commons/score_storage.py index 99805a1e..969a0c0e 100644 --- a/commons/score_storage.py +++ b/commons/score_storage.py @@ -4,6 +4,7 @@ import torch from bittensor.btlogging import logging as logger +from database.client import connect_db, disconnect_db from database.prisma.models import Score_Model @@ -24,31 +25,38 @@ async def migrate_from_db(cls) -> bool: logger.info("Scores file already exists, skipping migration") return True - # Get scores from database - score_record = await Score_Model.prisma().find_first( - order={"created_at": "desc"} - ) - if not score_record: - logger.warning("No scores found in database to migrate") - return True # Not an error, just no scores yet + # Connect to database first + await connect_db() - scores = torch.tensor(json.loads(score_record.score)) + try: + # Get scores from database + score_record = await Score_Model.prisma().find_first( + order={"created_at": "desc"} + ) + if not score_record: + logger.warning("No scores found in database to migrate") + return True # Not an error, just no scores yet - # Create scores directory if it doesn't exist - cls.SCORES_DIR.mkdir(exist_ok=True) + scores = torch.tensor(json.loads(score_record.score)) - # Save scores to .pt file - torch.save(scores, cls.SCORES_FILE) - logger.success(f"Successfully migrated scores to {cls.SCORES_FILE}") + # Create scores directory if it doesn't exist + cls.SCORES_DIR.mkdir(exist_ok=True) - # Verify the migration - loaded_scores = torch.load(cls.SCORES_FILE) - if torch.equal(scores, loaded_scores): - logger.success("Migration verification successful - scores match") - return True - else: - logger.error("Migration verification failed - scores do not match") - return False + # Save scores to .pt file + torch.save(scores, cls.SCORES_FILE) + logger.success(f"Successfully migrated scores to {cls.SCORES_FILE}") + + # Verify the migration + loaded_scores = torch.load(cls.SCORES_FILE) + if torch.equal(scores, loaded_scores): + logger.success("Migration verification successful - scores match") + return True + else: + logger.error("Migration verification failed - scores do not match") + return False + + finally: + await disconnect_db() except Exception as e: logger.error(f"Failed to migrate scores: {e}") diff --git a/dojo/__init__.py b/dojo/__init__.py index 5674629d..aa3edec3 100644 --- a/dojo/__init__.py +++ b/dojo/__init__.py @@ -30,7 +30,7 @@ def get_latest_git_tag(): ) -VALIDATOR_MIN_STAKE = 20000 +VALIDATOR_MIN_STAKE = int(os.getenv("VALIDATOR_MIN_STAKE", "20000")) TASK_DEADLINE = 6 * 60 * 60 # Define the time intervals for various tasks. @@ -44,7 +44,7 @@ def get_latest_git_tag(): if get_config().fast_mode: print("Running in fast mode for testing purposes...") - VALIDATOR_MIN_STAKE = 20000 + VALIDATOR_MIN_STAKE = int(os.getenv("VALIDATOR_MIN_STAKE", "20000")) TASK_DEADLINE = 180 VALIDATOR_RUN = 60 VALIDATOR_HEARTBEAT = 15 From 2baf283d5ae359d0cd1ebab4f13fa37439474152 Mon Sep 17 00:00:00 2001 From: codebender <167290009+codebender37@users.noreply.github.com> Date: Tue, 3 Dec 2024 00:14:41 +0630 Subject: [PATCH 5/7] feat: added script to inspect score --- .gitignore | 3 ++ README.md | 1 + dojo/utils/uids.py | 1 + scripts/inspect_scores.py | 78 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 83 insertions(+) create mode 100644 scripts/inspect_scores.py diff --git a/.gitignore b/.gitignore index 8e193826..0205291e 100644 --- a/.gitignore +++ b/.gitignore @@ -188,3 +188,6 @@ testing/ # prisma database/prisma/ + +# scores data +scores/*.pt diff --git a/README.md b/README.md index b6d6c047..695b62d4 100644 --- a/README.md +++ b/README.md @@ -278,6 +278,7 @@ DOJO_API_KEY= # blank for now WALLET_COLDKEY=# the name of the coldkey WALLET_HOTKEY=# the name of the hotkey AXON_PORT=8888 # port to serve requests over the public network for validators to call +VALIDATOR_MIN_STAKE= # minimum stake required for validators default is 20000 TAO (use this to bypass the blacklist function in testnet) # Task related config TASK_MAX_RESULT=4 # this means that each miner can have up to 4 workers fill in responses ``` diff --git a/dojo/utils/uids.py b/dojo/utils/uids.py index e61d60e7..c346d034 100644 --- a/dojo/utils/uids.py +++ b/dojo/utils/uids.py @@ -29,6 +29,7 @@ def is_miner(metagraph: bt.metagraph, uid: int) -> bool: stakes = metagraph.S.tolist() from dojo import VALIDATOR_MIN_STAKE + print(f"Validator min stake {VALIDATOR_MIN_STAKE}") return stakes[uid] < VALIDATOR_MIN_STAKE diff --git a/scripts/inspect_scores.py b/scripts/inspect_scores.py new file mode 100644 index 00000000..fb04577b --- /dev/null +++ b/scripts/inspect_scores.py @@ -0,0 +1,78 @@ +import argparse +from typing import List + +import torch +from tabulate import tabulate +from termcolor import colored + + +def format_score_table(scores: torch.Tensor) -> List[List[str]]: + """Format scores into a table with 10 columns""" + table_data = [] + row = [] + for i, score in enumerate(scores): + score_str = f"{score.item():.4f}" + # Color non-zero scores + if score.item() > 0: + score_str = colored(score_str, "green") + row.append([i, score_str]) + + # Use 10 columns for better screen fit + if len(row) == 10 or i == len(scores) - 1: + table_data.append(row) + row = [] + return table_data + + +def inspect_scores( + file_path: str = "scores/validator_scores.pt", show_all: bool = False +): + try: + scores = torch.load(file_path) + + # Print Summary + print(colored("\n=== Scores Summary ===", "blue", attrs=["bold"])) + print(f"Total UIDs: {len(scores)}") + print(f"Data type: {scores.dtype}") + print(f"Device: {scores.device}") + + # Print Statistics + print(colored("\n=== Statistics ===", "blue", attrs=["bold"])) + print(f"Mean score: {scores.mean().item():.4f}") + print(f"Min score: {scores.min().item():.4f}") + print(f"Max score: {scores.max().item():.4f}") + print( + f"Non-zero UIDs: {torch.count_nonzero(scores).item()} " + f"({(torch.count_nonzero(scores).item()/len(scores)*100):.1f}%)" + ) + + # Print Top Scores + top_k = 10 # Show top 10 + values, indices = torch.topk(scores, k=min(top_k, len(scores))) + print(colored("\n=== Top 10 Scores ===", "blue", attrs=["bold"])) + top_scores = [ + [f"UID {idx}", f"{val.item():.4f}"] for idx, val in zip(indices, values) + ] + print(tabulate(top_scores, headers=["UID", "Score"], tablefmt="simple")) + + if show_all: + print(colored("\n=== All Scores ===", "blue", attrs=["bold"])) + table_data = format_score_table(scores) + for row in table_data: + # headers = [f"UID {i[0]}" for i in row] + values = [f"UID {i[0]} - {i[1]}" for i in row] + print(tabulate([values], tablefmt="simple")) + + print("\nNote: Green values indicate non-zero scores") + + except FileNotFoundError: + print(colored(f"Score file not found at {file_path}", "red")) + except Exception as e: + print(colored(f"Error reading scores: {e}", "red")) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--all", action="store_true", help="Show all scores") + args = parser.parse_args() + inspect_scores(show_all=args.all) From 75ef017a0f2f7ef719ffe5536c3e91346579366a Mon Sep 17 00:00:00 2001 From: codebender <167290009+codebender37@users.noreply.github.com> Date: Tue, 3 Dec 2024 17:16:51 +0630 Subject: [PATCH 6/7] fix: fixed from PR feedback --- .env.miner.example | 4 +--- README.md | 2 +- commons/score_storage.py | 2 +- dojo/utils/uids.py | 1 - neurons/validator.py | 2 -- 5 files changed, 3 insertions(+), 8 deletions(-) diff --git a/.env.miner.example b/.env.miner.example index 9562b10e..1cf8b5f6 100644 --- a/.env.miner.example +++ b/.env.miner.example @@ -17,6 +17,7 @@ SUBTENSOR_ENDPOINT=wss://entrypoint-finney.opentensor.ai:443 # NETUID=98 # SUBTENSOR_NETWORK=test # SUBTENSOR_ENDPOINT=ws://testnet-lite:9944 +# VALIDATOR_MIN_STAKE=20000 # Task related config # this a maximum of 4 workers may submit responses for a single task @@ -48,6 +49,3 @@ S3_PUBLIC_URL= JWT_SECRET= # e.g. infura, alchemy API url for ethereum ETHEREUM_NODE= - -# Add this line with other environment variables -VALIDATOR_MIN_STAKE=20000 diff --git a/README.md b/README.md index 695b62d4..99871d29 100644 --- a/README.md +++ b/README.md @@ -278,7 +278,7 @@ DOJO_API_KEY= # blank for now WALLET_COLDKEY=# the name of the coldkey WALLET_HOTKEY=# the name of the hotkey AXON_PORT=8888 # port to serve requests over the public network for validators to call -VALIDATOR_MIN_STAKE= # minimum stake required for validators default is 20000 TAO (use this to bypass the blacklist function in testnet) +VALIDATOR_MIN_STAKE=20000 # minimum stake required for validators default is 20000 TAO (use this to bypass the blacklist function in testnet) # Task related config TASK_MAX_RESULT=4 # this means that each miner can have up to 4 workers fill in responses ``` diff --git a/commons/score_storage.py b/commons/score_storage.py index 969a0c0e..ec8d8c77 100644 --- a/commons/score_storage.py +++ b/commons/score_storage.py @@ -12,7 +12,7 @@ class ScoreStorage: """Handles persistence of validator scores""" SCORES_DIR = Path("scores") - SCORES_FILE = SCORES_DIR / "validator_scores.pt" + SCORES_FILE = SCORES_DIR / "miner_scores.pt" @classmethod async def migrate_from_db(cls) -> bool: diff --git a/dojo/utils/uids.py b/dojo/utils/uids.py index c346d034..e61d60e7 100644 --- a/dojo/utils/uids.py +++ b/dojo/utils/uids.py @@ -29,7 +29,6 @@ def is_miner(metagraph: bt.metagraph, uid: int) -> bool: stakes = metagraph.S.tolist() from dojo import VALIDATOR_MIN_STAKE - print(f"Validator min stake {VALIDATOR_MIN_STAKE}") return stakes[uid] < VALIDATOR_MIN_STAKE diff --git a/neurons/validator.py b/neurons/validator.py index 126d3baa..34222911 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -46,7 +46,6 @@ set_expire_time, ttl_get_block, ) -from database.client import connect_db from dojo import __spec_version__ from dojo.protocol import ( CompletionResponses, @@ -731,7 +730,6 @@ async def save_state( async def _load_state(self): try: - await connect_db() scores = await ScoreStorage.load() if scores is None: From 090d2a38333dbf7f684e1959babfae3aba440491 Mon Sep 17 00:00:00 2001 From: codebender <167290009+codebender37@users.noreply.github.com> Date: Wed, 4 Dec 2024 16:10:23 +0630 Subject: [PATCH 7/7] fix: fixed linter issue, and commitizen --- commons/objects.py | 4 ++++ dojo/__init__.py | 2 +- entrypoints.sh | 2 +- simulator/miner.py | 33 +++++++++++++++++---------------- simulator/validator.py | 42 +++++++++++++++++++++++------------------- 5 files changed, 46 insertions(+), 37 deletions(-) diff --git a/commons/objects.py b/commons/objects.py index 1d8ae1c9..bccce52f 100644 --- a/commons/objects.py +++ b/commons/objects.py @@ -11,10 +11,12 @@ class ObjectManager: def get_miner(cls): if get_config().simulation: from simulator.miner import MinerSim + if cls._miner is None: cls._miner = MinerSim() else: from neurons.miner import Miner + if cls._miner is None: cls._miner = Miner() return cls._miner @@ -23,10 +25,12 @@ def get_miner(cls): def get_validator(cls): if get_config().simulation: from simulator.validator import ValidatorSim + if cls._validator is None: cls._validator = ValidatorSim() else: from neurons.validator import Validator + if cls._validator is None: cls._validator = Validator() return cls._validator diff --git a/dojo/__init__.py b/dojo/__init__.py index aa3edec3..99eef378 100644 --- a/dojo/__init__.py +++ b/dojo/__init__.py @@ -59,4 +59,4 @@ def get_dojo_api_base_url() -> str: if base_url is None: raise ValueError("DOJO_API_BASE_URL is not set in the environment.") - return base_url \ No newline at end of file + return base_url diff --git a/entrypoints.sh b/entrypoints.sh index 325fc7ad..33001a25 100755 --- a/entrypoints.sh +++ b/entrypoints.sh @@ -73,4 +73,4 @@ if [ "$1" = 'validator' ]; then --neuron.type validator \ --wandb.project_name ${WANDB_PROJECT_NAME} \ ${EXTRA_ARGS} -fi \ No newline at end of file +fi diff --git a/simulator/miner.py b/simulator/miner.py index 2f58e5c8..c72dd5eb 100644 --- a/simulator/miner.py +++ b/simulator/miner.py @@ -22,10 +22,7 @@ def __init__(self): host = os.getenv("REDIS_HOST", "localhost") port = int(os.getenv("REDIS_PORT", 6379)) self.redis_client = redis.Redis( - host=host, - port=port, - db=0, - decode_responses=True + host=host, port=port, db=0, decode_responses=True ) logger.info("Redis connection established") @@ -42,12 +39,14 @@ def __init__(self): def _configure_simulation(self): """Configure simulation parameters with environment variables or defaults.""" self.response_behaviors = { - 'normal': float(os.getenv("SIM_NORMAL_RESP_PROB", 0.8)), - 'no_response': float(os.getenv("SIM_NO_RESP_PROB", 0.1)), - 'timeout': float(os.getenv("SIM_TIMEOUT_PROB", 0.1)) + "normal": float(os.getenv("SIM_NORMAL_RESP_PROB", 0.8)), + "no_response": float(os.getenv("SIM_NO_RESP_PROB", 0.1)), + "timeout": float(os.getenv("SIM_TIMEOUT_PROB", 0.1)), } - async def forward_feedback_request(self, synapse: FeedbackRequest) -> FeedbackRequest: + async def forward_feedback_request( + self, synapse: FeedbackRequest + ) -> FeedbackRequest: try: # Validate that synapse, dendrite, dendrite.hotkey, and response are not None if not synapse or not synapse.dendrite or not synapse.dendrite.hotkey: @@ -69,7 +68,7 @@ async def forward_feedback_request(self, synapse: FeedbackRequest) -> FeedbackRe self.redis_client.set( redis_key, new_synapse.model_dump_json(), - ex=86400 # expire after 24 hours + ex=86400, # expire after 24 hours ) logger.info(f"Stored feedback request {synapse.request_id}") @@ -81,7 +80,9 @@ async def forward_feedback_request(self, synapse: FeedbackRequest) -> FeedbackRe traceback.print_exc() return synapse - async def forward_task_result_request(self, synapse: TaskResultRequest) -> TaskResultRequest | None: + async def forward_task_result_request( + self, synapse: TaskResultRequest + ) -> TaskResultRequest | None: try: logger.info(f"Received TaskResultRequest for task id: {synapse.task_id}") if not synapse or not synapse.task_id: @@ -91,9 +92,9 @@ async def forward_task_result_request(self, synapse: TaskResultRequest) -> TaskR # Simulate different response behaviors behavior = self._get_response_behavior() - if behavior in ['no_response', 'timeout']: + if behavior in ["no_response", "timeout"]: logger.debug(f"Simulating {behavior} for task {synapse.task_id}") - if behavior == 'timeout': + if behavior == "timeout": await asyncio.sleep(30) return None @@ -113,17 +114,17 @@ async def forward_task_result_request(self, synapse: TaskResultRequest) -> TaskR for criteria_type in feedback_request.criteria_types: result = Result( type=criteria_type.type, - value=self._generate_scores(feedback_request.ground_truth) + value=self._generate_scores(feedback_request.ground_truth), ) task_result = TaskResult( id=get_new_uuid(), - status='COMPLETED', + status="COMPLETED", created_at=current_time, updated_at=current_time, result_data=[result], worker_id=get_new_uuid(), - task_id=synapse.task_id + task_id=synapse.task_id, ) task_results.append(task_result) @@ -144,7 +145,7 @@ def _get_response_behavior(self) -> str: """Determine the response behavior based on configured probabilities.""" return random.choices( list(self.response_behaviors.keys()), - weights=list(self.response_behaviors.values()) + weights=list(self.response_behaviors.values()), )[0] def _generate_scores(self, ground_truth: dict) -> dict: diff --git a/simulator/validator.py b/simulator/validator.py index 75ade66c..3982aa9c 100644 --- a/simulator/validator.py +++ b/simulator/validator.py @@ -48,8 +48,12 @@ def block(self): except BrokenPipeError: self._block_check_attempts += 1 if self._block_check_attempts >= self.MAX_BLOCK_CHECK_ATTEMPTS: - logger.error("Multiple failed attempts to get block number, attempting reconnection") - if asyncio.get_event_loop().run_until_complete(self._try_reconnect_subtensor()): + logger.error( + "Multiple failed attempts to get block number, attempting reconnection" + ) + if asyncio.get_event_loop().run_until_complete( + self._try_reconnect_subtensor() + ): return self.block return self._last_block if self._last_block is not None else 0 @@ -60,8 +64,8 @@ def block(self): def check_registered(self): new_subtensor = bt.subtensor(self.subtensor.config) if not new_subtensor.is_hotkey_registered( - netuid=self.config.netuid, - hotkey_ss58=self.wallet.hotkey.ss58_address, + netuid=self.config.netuid, + hotkey_ss58=self.wallet.hotkey.ss58_address, ): logger.error( f"Wallet: {self.wallet} is not registered on netuid {self.config.netuid}." @@ -70,9 +74,9 @@ def check_registered(self): exit() async def send_request( - self, - synapse: FeedbackRequest | None = None, - external_user: bool = False, + self, + synapse: FeedbackRequest | None = None, + external_user: bool = False, ): start = get_epoch_time() # typically the request may come from an external source however, @@ -89,7 +93,7 @@ async def send_request( self.metagraph.axons[uid] for uid in sel_miner_uids if self.metagraph.axons[uid].hotkey.casefold() - != self.wallet.hotkey.ss58_address.casefold() + != self.wallet.hotkey.ss58_address.casefold() ] if not len(axons): logger.warning("🤷 No axons to query ... skipping") @@ -135,7 +139,7 @@ async def send_request( prompt=data.prompt, completion_responses=data.responses, expire_at=expire_at, - ground_truth=data.ground_truth # Added ground truth!!!!! + ground_truth=data.ground_truth, # Added ground truth!!!!! ) elif external_user: obfuscated_model_to_model = self.obfuscate_model_names( @@ -217,24 +221,24 @@ async def send_request( @staticmethod async def _send_shuffled_requests( - dendrite: bt.dendrite, axons: List[bt.AxonInfo], synapse: FeedbackRequest + dendrite: bt.dendrite, axons: List[bt.AxonInfo], synapse: FeedbackRequest ) -> list[FeedbackRequest]: """Send the same request to all miners without shuffling the order. - WARNING: This should only be used for testing/debugging as it could allow miners to game the system. + WARNING: This should only be used for testing/debugging as it could allow miners to game the system. - Args: - dendrite (bt.dendrite): Communication channel to send requests - axons (List[bt.AxonInfo]): List of miner endpoints - synapse (FeedbackRequest): The feedback request to send + Args: + dendrite (bt.dendrite): Communication channel to send requests + axons (List[bt.AxonInfo]): List of miner endpoints + synapse (FeedbackRequest): The feedback request to send - Returns: - list[FeedbackRequest]: List of miner responses - """ + Returns: + list[FeedbackRequest]: List of miner responses + """ all_responses = [] batch_size = 10 for i in range(0, len(axons), batch_size): - batch_axons = axons[i: i + batch_size] + batch_axons = axons[i : i + batch_size] tasks = [] for axon in batch_axons: