diff --git a/.env.miner.example b/.env.miner.example index 919c020d..1cf8b5f6 100644 --- a/.env.miner.example +++ b/.env.miner.example @@ -17,6 +17,7 @@ SUBTENSOR_ENDPOINT=wss://entrypoint-finney.opentensor.ai:443 # NETUID=98 # SUBTENSOR_NETWORK=test # SUBTENSOR_ENDPOINT=ws://testnet-lite:9944 +# VALIDATOR_MIN_STAKE=20000 # Task related config # this a maximum of 4 workers may submit responses for a single task diff --git a/.env.validator.example b/.env.validator.example index 332e8a41..7b41ec36 100644 --- a/.env.validator.example +++ b/.env.validator.example @@ -1,5 +1,6 @@ WALLET_COLDKEY= WALLET_HOTKEY= +DATASET_SERVICE_BASE_URL=https://dojo-validator-api.tensorplex.ai # Mainnet related config NETUID=52 diff --git a/.github/workflows/docker_build.yaml b/.github/workflows/docker_build.yaml index 2effd247..ddda3a98 100644 --- a/.github/workflows/docker_build.yaml +++ b/.github/workflows/docker_build.yaml @@ -6,6 +6,7 @@ on: - dev - staging - main + - simulator jobs: docker_publish: @@ -28,7 +29,7 @@ jobs: echo "BRANCH_NAME=$SANITIZED_BRANCH_NAME" >> $GITHUB_ENV - name: Build and Push Docker Image with Branch Tag - if: github.ref == 'refs/heads/dev' || github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' + if: github.ref == 'refs/heads/dev' || github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/simulator' uses: macbre/push-to-ghcr@master with: image_name: ${{ github.repository }} diff --git a/.gitignore b/.gitignore index 8e193826..0205291e 100644 --- a/.gitignore +++ b/.gitignore @@ -188,3 +188,6 @@ testing/ # prisma database/prisma/ + +# scores data +scores/*.pt diff --git a/CHANGELOG.md b/CHANGELOG.md index 75d61281..ad634176 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,27 @@ +## [1.4.2](https://github.com/tensorplex-labs/dojo/compare/v1.4.1...v1.4.2) (2024-11-22) + +### Performance Improvements + +* add retries for dojo api calls ([461214a](https://github.com/tensorplex-labs/dojo/commit/461214aef4b51f42574d920effeaa7cbb58f5a92)) +* extend delay ([97a9b04](https://github.com/tensorplex-labs/dojo/commit/97a9b0417106c4e3cfc11501c9408f15574daf23)) +* extend delay ([69e1974](https://github.com/tensorplex-labs/dojo/commit/69e19742f23fd1a2ac81f197f4e2f07fe9018049)) +* removed unused completions in miner's response to optimize network traffic ([e76ce98](https://github.com/tensorplex-labs/dojo/commit/e76ce98dd2c6e7ef3f52b70225cd0f0e07abf996)) + +## [1.4.1](https://github.com/tensorplex-labs/dojo/compare/v1.4.0...v1.4.1) (2024-11-07) + +### Bug Fixes + +* add assert check ([f849130](https://github.com/tensorplex-labs/dojo/commit/f849130670abdc8614c302dbb73a1af053aa00ed)) +* add unit test for reward_cubic ([7a8483a](https://github.com/tensorplex-labs/dojo/commit/7a8483ae11569287fdb2474615781c3fa458de35)) +* change nans to -1 instead of 0 ([f15815c](https://github.com/tensorplex-labs/dojo/commit/f15815c9613935857cc7c649cb3dcb8d185c6bd9)) +* potential scoring issue ([335b40b](https://github.com/tensorplex-labs/dojo/commit/335b40b2493a67fdc19aa5366daac9f0e0031c01)) +* remove error log ([e0b1a85](https://github.com/tensorplex-labs/dojo/commit/e0b1a8570457765a395d995b94d936c0e37f3618)) +* scoring shapes ([dc15bb9](https://github.com/tensorplex-labs/dojo/commit/dc15bb93cd9b61bb1219c4766fbc8938e110c1b3)) + +### Performance Improvements + +* refactor logging, and clean synapase_history ([8e36b5c](https://github.com/tensorplex-labs/dojo/commit/8e36b5c6694909506a04f002389fc106b6a4f87d)) + ## [1.4.0](https://github.com/tensorplex-labs/dojo/compare/v1.3.3...v1.4.0) (2024-10-31) ### Features diff --git a/Makefile b/Makefile index 8156bdbd..b98ba784 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,9 @@ miner-worker-api: dojo-cli: docker compose --env-file .env.miner -f docker-compose.miner.yaml run --rm dojo-cli +extract-dataset: + docker compose -f docker-compose.validator.yaml run --rm --remove-orphans extract-dataset + # ---------------------------------------------------------------------------- # # CORE SERVICE LOGGING # # ---------------------------------------------------------------------------- # diff --git a/README.md b/README.md index b6d6c047..f707712e 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ - [Option 2: Decentralised Method](#option-2-decentralised-method) - [Setup Subscription Key for Labellers on UI to connect to Dojo Subnet for scoring](#setup-subscription-key-for-labellers-on-ui-to-connect-to-dojo-subnet-for-scoring) - [Validating](#validating) + - [Data Collection](#data-collection) - [Auto-updater](#auto-updater) - [Dojo CLI](#dojo-cli) - [For Dojo developerss](#for-dojo-developerss) @@ -278,6 +279,7 @@ DOJO_API_KEY= # blank for now WALLET_COLDKEY=# the name of the coldkey WALLET_HOTKEY=# the name of the hotkey AXON_PORT=8888 # port to serve requests over the public network for validators to call +VALIDATOR_MIN_STAKE=20000 # minimum stake required for validators default is 20000 TAO (use this to bypass the blacklist function in testnet) # Task related config TASK_MAX_RESULT=4 # this means that each miner can have up to 4 workers fill in responses ``` @@ -416,6 +418,7 @@ cp .env.validator.example .env.validator WALLET_COLDKEY=# the name of the coldkey WALLET_HOTKEY=# the name of the hotkey +DATASET_SERVICE_BASE_URL=https://dojo-validator-api.tensorplex.ai # head to https://wandb.ai/authorize to get your API key WANDB_API_KEY="" @@ -448,6 +451,15 @@ make validator To start with autoupdate for validators (**strongly recommended**), see the [Auto-updater](#auto-updater) section. +## Data Collection + +To export all data that has been collected from the validator, ensure that you have the environment variables setup properly as in [validator-setup](#validating), then run the following: + +```bash +make validator-pull +make extract-dataset +``` + # Auto-updater > [!WARNING] diff --git a/commons/objects.py b/commons/objects.py index 114c49e1..bccce52f 100644 --- a/commons/objects.py +++ b/commons/objects.py @@ -9,18 +9,30 @@ class ObjectManager: @classmethod def get_miner(cls): - from neurons.miner import Miner + if get_config().simulation: + from simulator.miner import MinerSim - if cls._miner is None: - cls._miner = Miner() + if cls._miner is None: + cls._miner = MinerSim() + else: + from neurons.miner import Miner + + if cls._miner is None: + cls._miner = Miner() return cls._miner @classmethod def get_validator(cls): - from neurons.validator import Validator + if get_config().simulation: + from simulator.validator import ValidatorSim + + if cls._validator is None: + cls._validator = ValidatorSim() + else: + from neurons.validator import Validator - if cls._validator is None: - cls._validator = Validator() + if cls._validator is None: + cls._validator = Validator() return cls._validator @classmethod diff --git a/commons/score_storage.py b/commons/score_storage.py new file mode 100644 index 00000000..ec8d8c77 --- /dev/null +++ b/commons/score_storage.py @@ -0,0 +1,89 @@ +import json +from pathlib import Path + +import torch +from bittensor.btlogging import logging as logger + +from database.client import connect_db, disconnect_db +from database.prisma.models import Score_Model + + +class ScoreStorage: + """Handles persistence of validator scores""" + + SCORES_DIR = Path("scores") + SCORES_FILE = SCORES_DIR / "miner_scores.pt" + + @classmethod + async def migrate_from_db(cls) -> bool: + """One-time migration of scores from database to .pt file + Returns: + bool: True if migration successful or file already exists, False if migration failed + """ + try: + if cls.SCORES_FILE.exists(): + logger.info("Scores file already exists, skipping migration") + return True + + # Connect to database first + await connect_db() + + try: + # Get scores from database + score_record = await Score_Model.prisma().find_first( + order={"created_at": "desc"} + ) + if not score_record: + logger.warning("No scores found in database to migrate") + return True # Not an error, just no scores yet + + scores = torch.tensor(json.loads(score_record.score)) + + # Create scores directory if it doesn't exist + cls.SCORES_DIR.mkdir(exist_ok=True) + + # Save scores to .pt file + torch.save(scores, cls.SCORES_FILE) + logger.success(f"Successfully migrated scores to {cls.SCORES_FILE}") + + # Verify the migration + loaded_scores = torch.load(cls.SCORES_FILE) + if torch.equal(scores, loaded_scores): + logger.success("Migration verification successful - scores match") + return True + else: + logger.error("Migration verification failed - scores do not match") + return False + + finally: + await disconnect_db() + + except Exception as e: + logger.error(f"Failed to migrate scores: {e}") + return False + + @classmethod + async def save(cls, scores: torch.Tensor) -> None: + """Save validator scores to .pt file""" + try: + cls.SCORES_DIR.mkdir(exist_ok=True) + torch.save(scores, cls.SCORES_FILE) + logger.success("Successfully saved validator scores to file") + except Exception as e: + logger.error(f"Failed to save validator scores: {e}") + raise + + @classmethod + async def load(cls) -> torch.Tensor | None: + """Load validator scores from .pt file""" + try: + if not cls.SCORES_FILE.exists(): + logger.warning("No validator scores file found") + return None + + scores = torch.load(cls.SCORES_FILE) + logger.success("Successfully loaded validator scores from file") + return scores + except Exception as e: + logger.error(f"Failed to load validator scores: {e}") + return None diff --git a/commons/scoring.py b/commons/scoring.py index adbed2bb..c541a0ef 100644 --- a/commons/scoring.py +++ b/commons/scoring.py @@ -79,8 +79,11 @@ def _reward_cubic( # shape: (num_miners,) # number range [-1, 1] x = F.cosine_similarity( - torch.from_numpy(miner_outputs), torch.from_numpy(ground_truth), dim=1 + torch.from_numpy(miner_outputs.copy()), + torch.from_numpy(ground_truth.copy()), + dim=1, ).numpy() + # Convert nans to -1 to send it to the bottom x = np.where(np.isnan(x), -1, x) @@ -411,13 +414,17 @@ def ground_truth_score_V1( ) miner_outputs = miner_outputs_normalised - logger.debug(f"scoring: raw miner outputs with nans\n{miner_outputs}") # use minmax scale to ensure ground truth is in the range [0, 1] ground_truth_arr = minmax_scale( np.array([rank for _, rank in cid_with_rank_sorted]) ).numpy() - logger.debug(f"scoring: ground truth\n{ground_truth_arr}") + + # reverse order here, because the lowest rank is the best + # e.g. ranks: ('cid1', 0), ('cid2', 1), ('cid3', 2), ('cid4', 3) + # after minmax scale: [0, 0.33, 0.667, 1] + # but we want the reverse, so: [1, 0.667, 0.33, 0], since cid1 is the best + ground_truth_arr = ground_truth_arr[::-1] logger.info(f"scoring: Miner outputs\n{miner_outputs}") logger.info(f"scoring: Ground truth\n{ground_truth_arr}") @@ -449,7 +456,7 @@ def ground_truth_score_V1( logger.debug(f"scoring: error calculating segment sums: {e}") pass - return torch.from_numpy(cubic_reward) + return torch.from_numpy(cubic_reward.copy()) @staticmethod def cmp_ground_truth( @@ -634,8 +641,8 @@ def calculate_score( ground_truth = gt_score[i] # NOTE: just use ground truth for now - hotkey_to_final_score[r.axon.hotkey] = ground_truth / len( - criteria_types + hotkey_to_final_score[r.axon.hotkey] = float( + ground_truth / len(criteria_types) ) criteria_to_miner_scores[criteria.type] = Score( diff --git a/docker-compose.validator.yaml b/docker-compose.validator.yaml index 02598809..29bfef01 100644 --- a/docker-compose.validator.yaml +++ b/docker-compose.validator.yaml @@ -131,3 +131,26 @@ services: prisma-setup-vali: condition: service_completed_successfully logging: *default-logging + + dataset-service: + image: ghcr.io/tensorplex-labs/dojo:dataset + env_file: + - .env.validator + ports: + - "127.0.0.1:9999:9999" + command: ["dataset-service"] + logging: *default-logging + + extract-dataset: + image: ghcr.io/tensorplex-labs/dojo:dataset + env_file: + - .env.validator + command: ["extract-dataset"] + networks: + - dojo-validator + volumes: + - ./:/app + - ./.env.validator:/app/.env + - prisma-binary:/root/prisma-python + - $HOME/.bittensor:/root/.bittensor + logging: *default-logging diff --git a/docker/Dockerfile.dataset b/docker/Dockerfile.dataset new file mode 100644 index 00000000..5cc80942 --- /dev/null +++ b/docker/Dockerfile.dataset @@ -0,0 +1,35 @@ +FROM python:3.11-slim-bookworm + +WORKDIR /app + +ENV PATH="/root/.cargo/bin/:$PATH" +ENV UV_SYSTEM_PYTHON=true +ENV NVM_DIR=/root/.nvm +ENV NODE_VERSION=v20.11.1 +ENV NODE_PATH=$NVM_DIR/versions/node/$NODE_VERSION/lib/node_modules +ENV PATH=$NVM_DIR/versions/node/$NODE_VERSION/bin:$PATH + +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + build-essential curl git ca-certificates \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv +COPY . . + +ARG TARGETPLATFORM + +RUN echo "Building for TARGETPLATFORM: $TARGETPLATFORM" + +RUN git config --global --add safe.directory /app + +# jank because pytorch has different versions for cpu for darwin VS linux, see pyproject.toml for specifics +# RUN if [ "$TARGETPLATFORM" = "linux/amd64" ]; then \ +# uv pip install --no-cache -e .[dataset] --find-links https://download.pytorch.org/whl/torch_stable.html; \ +# else \ +# uv pip install --no-cache -e .[dataset]; \ +# fi +RUN uv pip install --no-cache -e ".[dataset]" --find-links https://download.pytorch.org/whl/torch_stable.html; + +ENTRYPOINT ["./entrypoints.sh"] diff --git a/dojo/__init__.py b/dojo/__init__.py index 11625674..99eef378 100644 --- a/dojo/__init__.py +++ b/dojo/__init__.py @@ -30,7 +30,7 @@ def get_latest_git_tag(): ) -VALIDATOR_MIN_STAKE = 20000 +VALIDATOR_MIN_STAKE = int(os.getenv("VALIDATOR_MIN_STAKE", "20000")) TASK_DEADLINE = 6 * 60 * 60 # Define the time intervals for various tasks. @@ -44,7 +44,7 @@ def get_latest_git_tag(): if get_config().fast_mode: print("Running in fast mode for testing purposes...") - VALIDATOR_MIN_STAKE = 20000 + VALIDATOR_MIN_STAKE = int(os.getenv("VALIDATOR_MIN_STAKE", "20000")) TASK_DEADLINE = 180 VALIDATOR_RUN = 60 VALIDATOR_HEARTBEAT = 15 diff --git a/dojo/utils/config.py b/dojo/utils/config.py index a0022d54..2c399997 100644 --- a/dojo/utils/config.py +++ b/dojo/utils/config.py @@ -178,6 +178,18 @@ def add_args(parser): help="Whether to run in fast mode, for developers to test locally.", ) + parser.add_argument( + "--simulation", + action="store_true", + help="Whether to run the validator in simulation mode", + ) + + parser.add_argument( + "--simulation_bad_miner", + action="store_true", + help="Set miner simluation to a bad one", + ) + epoch_length = 100 known_args, _ = parser.parse_known_args() if known_args := vars(known_args): diff --git a/entrypoints.sh b/entrypoints.sh index 7f64dfe7..8a2e2fea 100755 --- a/entrypoints.sh +++ b/entrypoints.sh @@ -21,6 +21,17 @@ if [ "$1" = 'miner' ]; then echo "SUBTENSOR_ENDPOINT: ${SUBTENSOR_ENDPOINT}" echo "NETUID: ${NETUID}" + EXTRA_ARGS="" + if [ "${SIMULATION}" = "true" ]; then + EXTRA_ARGS="${EXTRA_ARGS} --simulation" + fi + if [ "${FAST_MODE}" = "true" ]; then + EXTRA_ARGS="${EXTRA_ARGS} --fast_mode" + fi + if [ "${SIMULATION_BAD_MINER}" = "true" ]; then + EXTRA_ARGS="${EXTRA_ARGS} --simulation_bad_miner" + fi + python main_miner.py \ --netuid ${NETUID} \ --subtensor.network ${SUBTENSOR_NETWORK} \ @@ -29,7 +40,8 @@ if [ "$1" = 'miner' ]; then --wallet.name ${WALLET_COLDKEY} \ --wallet.hotkey ${WALLET_HOTKEY} \ --axon.port ${AXON_PORT} \ - --neuron.type miner + --neuron.type miner \ + ${EXTRA_ARGS} fi # If the first argument is 'validator', run the validator script @@ -43,6 +55,14 @@ if [ "$1" = 'validator' ]; then echo "NETUID: ${NETUID}" echo "WANDB_PROJECT_NAME: ${WANDB_PROJECT_NAME}" + EXTRA_ARGS="" + if [ "${SIMULATION}" = "true" ]; then + EXTRA_ARGS="${EXTRA_ARGS} --simulation" + fi + if [ "${FAST_MODE}" = "true" ]; then + EXTRA_ARGS="${EXTRA_ARGS} --fast_mode" + fi + python main_validator.py \ --netuid ${NETUID} \ --subtensor.network ${SUBTENSOR_NETWORK} \ @@ -51,5 +71,29 @@ if [ "$1" = 'validator' ]; then --wallet.name ${WALLET_COLDKEY} \ --wallet.hotkey ${WALLET_HOTKEY} \ --neuron.type validator \ - --wandb.project_name ${WANDB_PROJECT_NAME} + --wandb.project_name ${WANDB_PROJECT_NAME} \ + ${EXTRA_ARGS} +fi + +if [ "$1" = 'extract-dataset' ]; then + echo "Environment variables:" + echo "WALLET_HOTKEY: ${WALLET_HOTKEY}" + echo "DATABASE_URL: ${DATABASE_URL}" + echo "DATASET_SERVICE_BASE_URL: ${DATASET_SERVICE_BASE_URL}" + echo "WALLET_COLDKEY: ${WALLET_COLDKEY}" + echo "WALLET_HOTKEY: ${WALLET_HOTKEY}" + python scripts/extract_dataset.py \ + --wallet.name ${WALLET_COLDKEY} \ + --wallet.hotkey ${WALLET_HOTKEY} +fi + +if [ "$1" = 'dataset-service' ]; then + echo "Environment variables:" + echo "PORT: ${PORT}" + echo "S3_BUCKET_NAME: ${S3_BUCKET_NAME}" + echo "AWS_REGION: ${AWS_REGION}" + echo "MAX_CHUNK_SIZE_MB: ${MAX_CHUNK_SIZE_MB}" + python entrypoints/dataset_service.py \ + --netuid 52 \ + --subtensor.network finney fi diff --git a/entrypoints/dataset_service.py b/entrypoints/dataset_service.py new file mode 100644 index 00000000..d712af32 --- /dev/null +++ b/entrypoints/dataset_service.py @@ -0,0 +1,176 @@ +import asyncio +import os +from typing import List + +import aioboto3 +import aiofiles +import bittensor as bt +import httpx +import uvicorn +from bittensor.btlogging import logging as logger +from fastapi import FastAPI, File, Form, HTTPException, UploadFile +from fastapi.middleware.cors import CORSMiddleware +from substrateinterface import Keypair + +from commons.objects import ObjectManager +from dojo import VALIDATOR_MIN_STAKE + +app = FastAPI(title="Dataset Upload Service") +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) +config = ObjectManager.get_config() +subtensor = bt.subtensor(config=config) +metagraph = subtensor.metagraph(netuid=52, lite=True) +AWS_REGION = os.getenv("AWS_REGION") +BUCKET_NAME = os.getenv("S3_BUCKET_NAME") +MAX_CHUNK_SIZE_MB = int(os.getenv("MAX_CHUNK_SIZE_MB", 50)) + + +def verify_hotkey_in_metagraph(hotkey: str) -> bool: + return hotkey in metagraph.hotkeys + + +def verify_signature(hotkey: str, signature: str, message: str) -> bool: + keypair = Keypair(ss58_address=hotkey, ss58_format=42) + if not keypair.verify(data=message, signature=signature): + logger.error(f"Invalid signature for address={hotkey}") + return False + + logger.success(f"Signature verified, signed by {hotkey}") + return True + + +def check_stake(hotkey: str) -> bool: + uid = -1 + try: + uid = metagraph.hotkeys.index(hotkey) + except ValueError: + logger.error(f"Hotkey {hotkey} not found in metagraph") + return False + + # Check if stake meets minimum threshold + stake = metagraph.S[uid].item() + + if stake < VALIDATOR_MIN_STAKE: + logger.error( + f"Insufficient stake for hotkey {hotkey}: {stake} < {VALIDATOR_MIN_STAKE}" + ) + return False + + logger.info(f"Stake check passed for {hotkey} with stake {stake}") + return True + + +@app.post("/upload_dataset") +async def upload_dataset( + hotkey: str = Form(...), + signature: str = Form(...), + message: str = Form(...), + files: List[UploadFile] = File(...), +): + try: + if not signature.startswith("0x"): + raise HTTPException( + status_code=401, detail="Invalid signature format, must be hex." + ) + + if not verify_signature(hotkey, signature, message): + logger.error(f"Invalid signature for address={hotkey}") + raise HTTPException(status_code=401, detail="Invalid signature.") + + if not verify_hotkey_in_metagraph(hotkey): + logger.error(f"Hotkey {hotkey} not found in metagraph") + raise HTTPException( + status_code=401, detail="Hotkey not found in metagraph." + ) + + if not check_stake(hotkey): + logger.error(f"Insufficient stake for hotkey {hotkey}") + raise HTTPException( + status_code=401, detail="Insufficient stake for hotkey." + ) + + session = aioboto3.Session(region_name=AWS_REGION) + async with session.resource("s3") as s3: + bucket = await s3.Bucket(BUCKET_NAME) + for file in files: + content = await file.read() + file_size = len(content) + if file_size > MAX_CHUNK_SIZE_MB * 1024 * 1024: # 50MB in bytes + raise HTTPException( + status_code=413, + detail=f"File too large. Maximum size is {MAX_CHUNK_SIZE_MB}MB", + ) + + filename = f"hotkey_{hotkey}_{file.filename}" + + await bucket.put_object( + Key=filename, + Body=content, + ) + except Exception as e: + logger.error(f"Error uploading dataset: {e}") + raise HTTPException(status_code=500, detail=f"Error uploading dataset: {e}") + + return { + "success": True, + "message": "Files uploaded successfully", + "filenames": [file.filename for file in files], + } + + +async def server(): + config = uvicorn.Config(app, host="0.0.0.0", port=9999) + server = uvicorn.Server(config) + await server.serve() + + +async def test_endpoint(): + # Create test data + test_data = { + "hotkey": "asdfg", + "signature": "0xasdfg", + "message": "On 2024-12-02 18:15:23.663947 +08 Tensorplex is awesome", + } + # Create a temporary test file + test_filename = "dataset_20241202.jsonl" + + # Build form data similar to how dojo.py does it + files = [] + + # Add file to form data if it exists + if os.path.exists(test_filename): + async with aiofiles.open(test_filename, "rb") as f: + file_content = await f.read() + files.append(("files", (test_filename, file_content, "application/json"))) + else: + raise FileNotFoundError(f"Test file {test_filename} not found") + + # Make request using httpx + async with httpx.AsyncClient() as client: + response = await client.post( + "http://localhost:8000/upload_dataset", + data={ + "hotkey": test_data["hotkey"], + "signature": test_data["signature"], + "message": test_data["message"], + }, + files=files, + timeout=30.0, + ) + print(f"Status: {response.status_code}") + print(f"Response: {response.json()}") + + +if __name__ == "__main__": + import sys + + if "--test" in sys.argv: + asyncio.run(test_endpoint()) + else: + asyncio.run(server()) diff --git a/neurons/validator.py b/neurons/validator.py index 3e601692..5f78b315 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -34,6 +34,7 @@ from commons.obfuscation.obfuscation_utils import obfuscate_html_and_js from commons.objects import ObjectManager from commons.orm import ORM +from commons.score_storage import ScoreStorage from commons.scoring import Scoring from commons.utils import ( _terminal_plot, @@ -44,7 +45,6 @@ set_expire_time, ttl_get_block, ) -from database.client import connect_db from dojo import __spec_version__ from dojo.protocol import ( CompletionResponse, @@ -106,10 +106,17 @@ def __init__(self): self.scores: torch.Tensor = torch.zeros( len(self.metagraph.hotkeys), dtype=torch.float32 ) - # manually always register and always sync metagraph when application starts self.check_registered() - self.executor = ThreadPoolExecutor(max_workers=2) + # Run score migration before loading state + migration_success = self.loop.run_until_complete(ScoreStorage.migrate_from_db()) + if not migration_success: + logger.error( + "Score migration failed - cannot continue without valid scores" + ) + raise RuntimeError("Score migration failed - validator cannot start") + + self.executor = ThreadPoolExecutor(max_workers=2) self.load_state() init_wandb(config=self.config, my_uid=self.uid, wallet=self.wallet) @@ -125,7 +132,7 @@ async def send_scores(self, synapse: ScoringResult, hotkeys: List[str]): ) await self.dendrite.forward( - axons=axons, synapse=synapse, deserialize=False, timeout=12 + axons=axons, synapse=synapse, deserialize=False, timeout=30 ) def obfuscate_model_names( @@ -418,7 +425,8 @@ async def save_state( logger.warning("Scores are all zeros, but saving anyway!") # raise EmptyScores("Skipping save as scores are all empty") - await ORM.create_or_update_validator_score(self.scores) + # await ORM.create_or_update_validator_score(self.scores) + await ScoreStorage.save(self.scores) logger.success(f"📦 Saved validator state with scores: {self.scores}") except EmptyScores as e: logger.debug(f"No need to to save validator state: {e}") @@ -427,8 +435,7 @@ async def save_state( async def _load_state(self): try: - await connect_db() - scores = await ORM.get_validator_score() + scores = await ScoreStorage.load() if scores is None: num_processed_tasks = await ORM.get_num_processed_tasks() @@ -497,7 +504,10 @@ async def _get_task_results_from_miner( # Send the request via Dendrite and get the response response: list[TaskResultRequest] = await self.dendrite.forward( # type: ignore - axons=[miner_axon], synapse=task_synapse, deserialize=False + axons=[miner_axon], + synapse=task_synapse, + deserialize=False, + timeout=30, ) if response and response[0]: @@ -724,14 +734,10 @@ async def update_score_and_send_feedback(self): validator_hotkeys: List[str] = self._get_validator_hotkeys() # Grab tasks that were expired TASK_DEADLINE duration ago - expire_from = ( - datetime_as_utc(datetime.now(timezone.utc)) - - timedelta(seconds=dojo.TASK_DEADLINE) - - timedelta(hours=2) - ) - expire_to = datetime_as_utc(datetime.now(timezone.utc)) - timedelta( - seconds=dojo.TASK_DEADLINE + expire_from = datetime_as_utc(datetime.now(timezone.utc)) - timedelta( + hours=2 ) + expire_to = datetime_as_utc(datetime.now(timezone.utc)) logger.debug( f"Updating with expire_from: {expire_from} and expire_to: {expire_to}" ) @@ -1300,7 +1306,7 @@ async def _log_wandb( ) score_data = { - "scores_by_hotkey": hotkey_to_score, + "scores_by_hotkey": [hotkey_to_score], "mean": { "consensus": mean_weighted_consensus_scores, "ground_truth": mean_weighted_gt_scores, diff --git a/pyproject.toml b/pyproject.toml index bb8b092e..0d842d9c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,7 @@ dependencies = [ [project.optional-dependencies] dev = ["commitizen", "curlify2", "pytest", "ruff"] test = ["pytest", "nox"] +dataset = ["aioboto3", "aiofiles", "python-multipart"] [project.scripts] dojo = "dojo_cli:main" diff --git a/scripts/extract_dataset.py b/scripts/extract_dataset.py new file mode 100644 index 00000000..29a95f18 --- /dev/null +++ b/scripts/extract_dataset.py @@ -0,0 +1,327 @@ +import asyncio +import os +from datetime import datetime +from typing import AsyncGenerator, List + +import aiofiles +import bittensor as bt +import httpx +import numpy as np +from bittensor.btlogging import logging as logger +from pydantic import BaseModel, model_serializer + +from commons.exceptions import ( + NoNewExpiredTasksYet, +) +from commons.objects import ObjectManager +from database.client import connect_db, disconnect_db +from database.mappers import ( + map_feedback_request_model_to_feedback_request, +) +from database.prisma.models import ( + Feedback_Request_Model, +) +from database.prisma.types import ( + Feedback_Request_ModelInclude, + Feedback_Request_ModelWhereInput, +) +from dojo import TASK_DEADLINE +from dojo.protocol import ( + CompletionResponses, + DendriteQueryResponse, +) +from dojo.utils.config import source_dotenv + +source_dotenv() + +DATASET_SERVICE_BASE_URL = os.getenv("DATASET_SERVICE_BASE_URL") +MAX_CHUNK_SIZE_MB = int(os.getenv("MAX_CHUNK_SIZE_MB", 50)) + +if DATASET_SERVICE_BASE_URL is None: + raise ValueError("DATASET_SERVICE_BASE_URL must be set") +if MAX_CHUNK_SIZE_MB is None: + raise ValueError("MAX_CHUNK_SIZE_MB must be set") + + +# represents a row in the jsonl dataset +class Row(BaseModel): + prompt: str + completions: list[CompletionResponses] + # shape (num_miners, num_completions) + raw_scores: list[list[float]] + # shape (num_completions) + mean_scores: list[float] + + class Config: + arbitrary_types_allowed = True + + @model_serializer + def serialize_model(self): + return { + "prompt": self.prompt, + "completions": self.completions, + "raw_scores": self.raw_scores, + "mean_scores": self.mean_scores, + } + + +async def build_jsonl(filename: str): + with open(filename, "w") as file: + batch_size = 10 + task_count = 0 + async for task_batch, has_more_batches in get_processed_tasks(batch_size): + if not has_more_batches and not task_batch: + break + + for task in task_batch: + # Extract prompt from validator request + prompt = task.request.prompt + + # Extract completions from miner responses + completions = task.request.completion_responses + + raw_scores = [] + for miner_response in task.miner_responses: + miner_ratings = [ + c.score for c in miner_response.completion_responses + ] + if any(rating is None for rating in miner_ratings): + continue + raw_scores.append(miner_ratings) + + # shape (num_completions, num_miners) + raw_scores_vec = np.array(raw_scores) + logger.info(f"raw_scores_vec shape: {raw_scores_vec.shape}") + logger.info(f"raw_scores_vec: {raw_scores_vec}") + + if raw_scores_vec.size > 0: + mean_scores = raw_scores_vec.mean(axis=1) + logger.info(f"mean_scores shape: {mean_scores.shape}") + jsonl_row = Row( + prompt=prompt, + completions=completions, + raw_scores=raw_scores, + mean_scores=mean_scores.tolist(), + ) + else: + jsonl_row = Row( + prompt=prompt, + completions=completions, + raw_scores=[], + mean_scores=[], + ) + + # Write the entry as a JSON line + file.write(jsonl_row.model_dump_json() + "\n") + + task_count += len(task_batch) + logger.info(f"Scraped task count: {task_count}") + + +async def get_processed_tasks( + batch_size: int = 10, +) -> AsyncGenerator[tuple[List[DendriteQueryResponse], bool], None]: + """Yields batches of processed Feedback_Request_Model records along with a boolean flag indicating the presence of additional batches. + + This function retrieves tasks that have been fully processed. The batch size can be specified to control the number of tasks returned in each batch. + + Args: + batch_size (int, optional): The number of tasks to include in each batch. Defaults to 10. + + Raises: + NoNewExpiredTasksYet: Raised if no processed tasks are available for retrieval. + + Yields: + AsyncGenerator[tuple[List[DendriteQueryResponse], bool], None]: An asynchronous generator yielding a tuple containing a list of DendriteQueryResponse objects and a boolean indicating if more batches are available. + """ + + # find all validator requests first + include_query = Feedback_Request_ModelInclude( + { + "completions": True, + "criteria_types": True, + "ground_truths": True, + "parent_request": True, + } + ) + + vali_where_query = Feedback_Request_ModelWhereInput( + { + "parent_id": None, # no parent means it's a validator request + # only check for tasks that are completely done + "is_processed": {"equals": True}, + } + ) + + # count first total including non + task_count = await Feedback_Request_Model.prisma().count( + where=vali_where_query, + ) + + logger.info(f"Count of processed tasks: {task_count}") + + if not task_count: + raise NoNewExpiredTasksYet( + f"No expired tasks found for processing, please wait for tasks to pass the task deadline of {TASK_DEADLINE} seconds." + ) + + for i in range(0, task_count, batch_size): + # find all unprocesed validator requests + validator_requests = await Feedback_Request_Model.prisma().find_many( + include=include_query, + where=vali_where_query, + order={"created_at": "desc"}, + skip=i, + take=batch_size, + ) + + # find all miner responses + processed_vali_request_ids = [r.id for r in validator_requests] + miner_responses = await Feedback_Request_Model.prisma().find_many( + include=include_query, + where={ + "parent_id": {"in": processed_vali_request_ids}, + "is_processed": {"equals": True}, + }, + order={"created_at": "desc"}, + ) + + # NOTE: technically a DendriteQueryResponse represents a task + tasks: list[DendriteQueryResponse] = [] + for validator_request in validator_requests: + vali_request = map_feedback_request_model_to_feedback_request( + validator_request + ) + + m_responses = list( + map( + lambda x: map_feedback_request_model_to_feedback_request( + x, is_miner=True + ), + [m for m in miner_responses if m.parent_id == validator_request.id], + ) + ) + + tasks.append( + DendriteQueryResponse(request=vali_request, miner_responses=m_responses) + ) + + # yield responses, so caller can do something + has_more_batches = True + yield tasks, has_more_batches + + yield [], False + + +async def upload(hotkey: str, signature: str, message: str, filename: str): + if not signature.startswith("0x"): + signature = f"0x{signature}" + + # Build form data similar to how dojo.py does it + form_body = { + "hotkey": hotkey, + "signature": signature, + "message": message, + } + # Add file to form data if it exists + if os.path.exists(filename): + chunks = await chunk_file(filename, MAX_CHUNK_SIZE_MB) + + # Make request using httpx + async with httpx.AsyncClient() as client: + for chunk_filename, chunk_content in chunks: + # Append to files list with correct format + files = [("files", (chunk_filename, chunk_content, "application/json"))] + response = await client.post( + f"{DATASET_SERVICE_BASE_URL}/upload_dataset", + data=form_body, + files=files, + timeout=60.0, + ) + logger.info(f"Status: {response.status_code}") + response_json = response.json() + logger.info(f"Response: {response_json}") + is_success = response.status_code == 200 and response_json.get( + "success" + ) + if not is_success: + raise Exception(f"Failed to upload file {chunk_filename}") + await asyncio.sleep(1) + + +async def chunk_file(filename: str, chunk_size_mb: int = 50): + chunk_size = chunk_size_mb * 1024 * 1024 # Convert MB to bytes + + if os.path.exists(filename): + async with aiofiles.open(filename) as f: # Open in text mode + chunks = [] + current_chunk = [] + current_chunk_size = 0 + + # ensure that when we chunk, we don't split across lines + async for line in f: + line_size = len(line.encode("utf-8")) # Get size of line in bytes + if current_chunk_size + line_size > chunk_size: + # Use consistent format + base, ext = os.path.splitext(filename) + chunk_filename = f"{base}_part{len(chunks) + 1}{ext}" + chunks.append((chunk_filename, "".join(current_chunk))) + current_chunk = [] + current_chunk_size = 0 + + current_chunk.append(line) + current_chunk_size += line_size + + # Use same format for last chunk + if current_chunk: + base, ext = os.path.splitext(filename) + chunk_filename = f"{base}_part{len(chunks) + 1}{ext}" + chunks.append((chunk_filename, "".join(current_chunk))) + + return chunks + else: + raise FileNotFoundError(f"Test file {filename} not found") + + +async def main(): + await connect_db() + config = ObjectManager.get_config() + wallet = bt.wallet(config=config) + hotkey = wallet.hotkey.ss58_address + message = f"Uploading dataset for validator with hotkey: {hotkey}" + signature = wallet.hotkey.sign(message).hex() # Convert signature to hex string + + # Create filename with current date + current_date = datetime.now().strftime("%Y%m%d") + filename = f"dataset_{current_date}.jsonl" + # Check if file already exists + if os.path.exists(filename): + logger.warning(f"File {filename} already exists, skipping scrape db step") + else: + await build_jsonl(filename) + + try: + upload_success = await upload(hotkey, signature, message, filename) + if upload_success: + logger.info("Upload successful! Removing local dataset file.") + os.remove(filename) + except Exception as e: + logger.error(f"Error occurred while trying to upload dataset: {e}") + finally: + await disconnect_db() + + +async def _test_chunking(): + filename = "dummy_dataset.jsonl" + chunks = await chunk_file(filename, MAX_CHUNK_SIZE_MB) + logger.info(f"number of chunks: {len(chunks)}") + for i, (chunk_filename, chunk_content) in enumerate(chunks, 1): + logger.info(f"\nSaving chunk {i} to {chunk_filename}") + async with aiofiles.open(chunk_filename, "w") as f: + await f.write(chunk_content) + logger.info(f"Saved chunk {i} ({len(chunk_content)} bytes)") + + +if __name__ == "__main__": + asyncio.run(main()) + # asyncio.run(_test_chunking()) diff --git a/scripts/inspect_scores.py b/scripts/inspect_scores.py new file mode 100644 index 00000000..fb04577b --- /dev/null +++ b/scripts/inspect_scores.py @@ -0,0 +1,78 @@ +import argparse +from typing import List + +import torch +from tabulate import tabulate +from termcolor import colored + + +def format_score_table(scores: torch.Tensor) -> List[List[str]]: + """Format scores into a table with 10 columns""" + table_data = [] + row = [] + for i, score in enumerate(scores): + score_str = f"{score.item():.4f}" + # Color non-zero scores + if score.item() > 0: + score_str = colored(score_str, "green") + row.append([i, score_str]) + + # Use 10 columns for better screen fit + if len(row) == 10 or i == len(scores) - 1: + table_data.append(row) + row = [] + return table_data + + +def inspect_scores( + file_path: str = "scores/validator_scores.pt", show_all: bool = False +): + try: + scores = torch.load(file_path) + + # Print Summary + print(colored("\n=== Scores Summary ===", "blue", attrs=["bold"])) + print(f"Total UIDs: {len(scores)}") + print(f"Data type: {scores.dtype}") + print(f"Device: {scores.device}") + + # Print Statistics + print(colored("\n=== Statistics ===", "blue", attrs=["bold"])) + print(f"Mean score: {scores.mean().item():.4f}") + print(f"Min score: {scores.min().item():.4f}") + print(f"Max score: {scores.max().item():.4f}") + print( + f"Non-zero UIDs: {torch.count_nonzero(scores).item()} " + f"({(torch.count_nonzero(scores).item()/len(scores)*100):.1f}%)" + ) + + # Print Top Scores + top_k = 10 # Show top 10 + values, indices = torch.topk(scores, k=min(top_k, len(scores))) + print(colored("\n=== Top 10 Scores ===", "blue", attrs=["bold"])) + top_scores = [ + [f"UID {idx}", f"{val.item():.4f}"] for idx, val in zip(indices, values) + ] + print(tabulate(top_scores, headers=["UID", "Score"], tablefmt="simple")) + + if show_all: + print(colored("\n=== All Scores ===", "blue", attrs=["bold"])) + table_data = format_score_table(scores) + for row in table_data: + # headers = [f"UID {i[0]}" for i in row] + values = [f"UID {i[0]} - {i[1]}" for i in row] + print(tabulate([values], tablefmt="simple")) + + print("\nNote: Green values indicate non-zero scores") + + except FileNotFoundError: + print(colored(f"Score file not found at {file_path}", "red")) + except Exception as e: + print(colored(f"Error reading scores: {e}", "red")) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--all", action="store_true", help="Show all scores") + args = parser.parse_args() + inspect_scores(show_all=args.all) diff --git a/simulator/__init__.py b/simulator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/simulator/miner.py b/simulator/miner.py new file mode 100644 index 00000000..3c1aedee --- /dev/null +++ b/simulator/miner.py @@ -0,0 +1,171 @@ +import json +import os +import random +import traceback +from datetime import datetime, timezone + +import redis +from bittensor.btlogging import logging as logger + +from commons.utils import get_new_uuid +from dojo.protocol import FeedbackRequest, Result, TaskResult, TaskResultRequest +from dojo.utils.config import get_config +from neurons.miner import Miner + + +class MinerSim(Miner): + def __init__(self): + super().__init__() + try: + # Initialize Redis connection + host = os.getenv("REDIS_HOST", "localhost") + port = int(os.getenv("REDIS_PORT", 6379)) + self.redis_client = redis.Redis( + host=host, port=port, db=0, decode_responses=True + ) + logger.info("Redis connection established") + + self._configure_simulation() + + self.is_bad_miner = get_config().simulation_bad_miner + logger.info(f"Miner role set to: {'bad' if self.is_bad_miner else 'good'}") + + logger.info("Starting Miner Simulator") + except Exception as e: + logger.error(f"Failed to connect to Redis: {e}") + raise + + def _configure_simulation(self): + """Configure simulation parameters with environment variables or defaults.""" + self.response_behaviors = { + "normal": float(os.getenv("SIM_NORMAL_RESP_PROB", 0.8)), + "no_response": float(os.getenv("SIM_NO_RESP_PROB", 0.1)), + "timeout": float(os.getenv("SIM_TIMEOUT_PROB", 0.1)), + } + + async def forward_feedback_request( + self, synapse: FeedbackRequest + ) -> FeedbackRequest: + try: + # Validate that synapse, dendrite, dendrite.hotkey, and response are not None + if not synapse or not synapse.dendrite or not synapse.dendrite.hotkey: + logger.error("Invalid synapse: dendrite or dendrite.hotkey is None.") + return synapse + + if not synapse.completion_responses: + logger.error("Invalid synapse: response field is None.") + return synapse + + # Empty out completion response since not needed in simulator + new_synapse = synapse.model_copy(deep=True) + new_synapse.completion_responses = [] + + synapse.dojo_task_id = synapse.request_id + self.hotkey_to_request[synapse.dendrite.hotkey] = synapse + + redis_key = f"feedback:{synapse.request_id}" + self.redis_client.set( + redis_key, + new_synapse.model_dump_json(), + ex=86400, # expire after 24 hours + ) + logger.info(f"Stored feedback request {synapse.request_id}") + + synapse.ground_truth = {} + return synapse + + except Exception as e: + logger.error(f"Error handling FeedbackRequest: {e}") + traceback.print_exc() + return synapse + + async def forward_task_result_request( + self, synapse: TaskResultRequest + ) -> TaskResultRequest | None: + try: + logger.info(f"Received TaskResultRequest for task id: {synapse.task_id}") + if not synapse or not synapse.task_id: + logger.error("Invalid TaskResultRequest: missing task_id") + return None + + # Simulate different response behaviors + # behavior = self._get_response_behavior() + + # if behavior in ['no_response', 'timeout']: + # logger.debug(f"Simulating {behavior} for task {synapse.task_id}") + # if behavior == 'timeout': + # await asyncio.sleep(30) + # return None + + redis_key = f"feedback:{synapse.task_id}" + request_data = self.redis_client.get(redis_key) + + request_dict = json.loads(request_data) if request_data else None + feedback_request = FeedbackRequest(**request_dict) if request_dict else None + + if not feedback_request: + logger.debug(f"No task result found for task id: {synapse.task_id}") + return None + + current_time = datetime.now(timezone.utc).isoformat() + + task_results = [] + for criteria_type in feedback_request.criteria_types: + result = Result( + type=criteria_type.type, + value=self._generate_scores(feedback_request.ground_truth), + ) + + task_result = TaskResult( + id=get_new_uuid(), + status="COMPLETED", + created_at=current_time, + updated_at=current_time, + result_data=[result], + worker_id=get_new_uuid(), + task_id=synapse.task_id, + ) + task_results.append(task_result) + + synapse.task_results = task_results + logger.info(f"TaskResultRequest: {synapse}") + + self.redis_client.delete(redis_key) + logger.debug(f"Processed task result for task {synapse.task_id}") + + return synapse + + except Exception as e: + traceback.print_exc() + logger.error(f"Error handling TaskResultRequest: {e}") + return None + + def _get_response_behavior(self) -> str: + """Determine the response behavior based on configured probabilities.""" + return random.choices( + list(self.response_behaviors.keys()), + weights=list(self.response_behaviors.values()), + )[0] + + def _generate_scores(self, ground_truth: dict) -> dict: + scores = {} + max_rank = max(ground_truth.values()) + + for k, v in ground_truth.items(): + base_weight = int(10 - (v * (10 / max_rank))) + if self.is_bad_miner: + deviation = random.randint(-5, 5) + else: + deviation = random.randint(-2, 2) + random_score = max(0, min(9, base_weight + deviation)) + score = int((random_score / (10 - 1)) * (100 - 1) + 1) + scores[k] = score + return scores + + # def __del__(self): + # """Cleanup Redis connection on object destruction""" + # try: + # self.redis_client.close() + # logger.info("Redis connection closed") + # except: + # pass diff --git a/simulator/validator.py b/simulator/validator.py new file mode 100644 index 00000000..08418769 --- /dev/null +++ b/simulator/validator.py @@ -0,0 +1,281 @@ +import asyncio +import traceback +from typing import List + +import aiohttp +import bittensor as bt +from bittensor.btlogging import logging as logger +from tenacity import RetryError + +import dojo +from commons.dataset.synthetic import SyntheticAPI +from commons.orm import ORM +from commons.utils import get_epoch_time, get_new_uuid, set_expire_time, ttl_get_block +from dojo.protocol import ( + DendriteQueryResponse, + FeedbackRequest, + MultiScoreCriteria, + TaskType, +) +from neurons.validator import Validator + + +class ValidatorSim(Validator): + def __init__(self): + self._last_block = None + self._block_check_attempts = 0 + self.MAX_BLOCK_CHECK_ATTEMPTS = 3 + self._connection_lock = asyncio.Lock() + + super().__init__() + logger.info("Starting Validator Simulator") + + async def _try_reconnect_subtensor(self): + self._block_check_attempts += 1 + if self._block_check_attempts >= self.MAX_BLOCK_CHECK_ATTEMPTS: + logger.error( + f"Failed to reconnect after {self.MAX_BLOCK_CHECK_ATTEMPTS} attempts" + ) + return False + + try: + logger.info( + f"Attempting to reconnect to subtensor (attempt {self._block_check_attempts}/{self.MAX_BLOCK_CHECK_ATTEMPTS})..." + ) + if hasattr(self.subtensor.substrate, "websocket"): + self.subtensor.substrate.websocket.close() + + self.subtensor = bt.subtensor(self.subtensor.config) + await asyncio.sleep(1) + return True + except Exception as e: + logger.error(f"Failed to reconnect to subtensor: {e}") + return await self._try_reconnect_subtensor() + + async def _ensure_subtensor_connection(self): + async with self._connection_lock: + try: + self.subtensor.get_current_block() + self._block_check_attempts = 0 + return True + except (BrokenPipeError, ConnectionError): + logger.warning("Connection lost, attempting immediate reconnection") + return await self._try_reconnect_subtensor() + except Exception as e: + logger.error(f"Unexpected error checking connection: {e}") + return False + + @property + def block(self): + try: + if not asyncio.get_event_loop().run_until_complete( + self._ensure_subtensor_connection() + ): + logger.warning( + "Subtensor connection failed - returning last known block" + ) + return self._last_block if self._last_block is not None else 0 + + self._last_block = ttl_get_block(self.subtensor) + self._block_check_attempts = 0 + return self._last_block + except Exception as e: + logger.error(f"Error getting block number: {e}") + return self._last_block if self._last_block is not None else 0 + + async def sync(self): + has_connection = await self._ensure_subtensor_connection() + if not has_connection: + logger.warning("Subtensor connection failed - continuing with partial sync") + + await super().sync() + + async def send_request( + self, + synapse: FeedbackRequest | None = None, + external_user: bool = False, + ): + start = get_epoch_time() + # typically the request may come from an external source however, + # initially will seed it with some data for miners to get started + if len(self._active_miner_uids) == 0: + logger.info("No active miners to send request to... skipping") + return + + request_id = get_new_uuid() + # sel_miner_uids = await self.get_miner_uids(external_user, request_id) + sel_miner_uids = sorted(list(self._active_miner_uids)) + + axons = [ + self.metagraph.axons[uid] + for uid in sel_miner_uids + if self.metagraph.axons[uid].hotkey.casefold() + != self.wallet.hotkey.ss58_address.casefold() + ] + if not len(axons): + logger.warning("🤷 No axons to query ... skipping") + return + + obfuscated_model_to_model = {} + + if synapse is None: + try: + data = await SyntheticAPI.get_qa() + except RetryError as e: + logger.error( + f"Exhausted all retry attempts for synthetic data generation: {e}" + ) + return + except ValueError as e: + logger.error(f"Invalid response from synthetic data API: {e}") + return + except aiohttp.ClientError as e: + logger.error(f"Network error when calling synthetic data API: {e}") + return + except Exception as e: + logger.error(f"Unexpected error during synthetic data generation: {e}") + logger.debug(f"Traceback: {traceback.format_exc()}") + return + + if not data: + logger.error("No data returned from synthetic data API") + return + + obfuscated_model_to_model = self.obfuscate_model_names(data.responses) + expire_at = set_expire_time(dojo.TASK_DEADLINE) + synapse = FeedbackRequest( + request_id=request_id, + task_type=str(TaskType.CODE_GENERATION), + criteria_types=[ + MultiScoreCriteria( + options=list(obfuscated_model_to_model.keys()), + min=1.0, + max=100.0, + ), + ], + prompt=data.prompt, + completion_responses=data.responses, + expire_at=expire_at, + ground_truth=data.ground_truth, # Added ground truth!!!!! + ) + elif external_user: + obfuscated_model_to_model = self.obfuscate_model_names( + synapse.completion_responses + ) + + logger.info( + f"⬆️ Sending feedback request for request id: {synapse.request_id}, miners uids:{sel_miner_uids} with expire_at: {synapse.expire_at}" + ) + + miner_responses: List[FeedbackRequest] = await self._send_shuffled_requests( + self.dendrite, axons, synapse + ) + + valid_miner_responses: List[FeedbackRequest] = [] + try: + for miner_response in miner_responses: + miner_hotkey = ( + miner_response.axon.hotkey if miner_response.axon else "??" + ) + # map obfuscated model names back to the original model names + real_model_ids = [] + + for i, completion in enumerate(miner_response.completion_responses): + found_model_id = obfuscated_model_to_model.get( + completion.model, None + ) + real_model_ids.append(found_model_id) + if found_model_id: + miner_response.completion_responses[i].model = found_model_id + synapse.completion_responses[i].model = found_model_id + + if any(c is None for c in real_model_ids): + logger.warning("Failed to map obfuscated model to original model") + continue + + if miner_response.dojo_task_id is None: + logger.debug(f"Miner {miner_hotkey} must provide the dojo task id") + continue + + # update the miner response with the real model ids + valid_miner_responses.append(miner_response) + except Exception as e: + logger.error(f"Failed to map obfuscated model to original model: {e}") + pass + + logger.info(f"⬇️ Received {len(valid_miner_responses)} valid responses") + if valid_miner_responses is None or len(valid_miner_responses) == 0: + logger.info("No valid miner responses to process... skipping") + return + + # include the ground_truth to keep in data manager + synapse.ground_truth = data.ground_truth + synapse.dendrite.hotkey = self.wallet.hotkey.ss58_address + response_data = DendriteQueryResponse( + request=synapse, + miner_responses=valid_miner_responses, + ) + + logger.debug("Attempting to saving dendrite response") + vali_request_model = await ORM.save_task( + validator_request=synapse, + miner_responses=valid_miner_responses, + ground_truth=data.ground_truth, + ) + + if vali_request_model is None: + logger.error("Failed to save dendrite response") + return + + # saving response + logger.success( + f"Saved dendrite response for request id: {response_data.request.request_id}" + ) + logger.info( + f"Sending request to miners & processing took {get_epoch_time() - start}" + ) + return + + @staticmethod + async def _send_shuffled_requests( + dendrite: bt.dendrite, axons: List[bt.AxonInfo], synapse: FeedbackRequest + ) -> list[FeedbackRequest]: + """Send the same request to all miners without shuffling the order. + WARNING: This should only be used for testing/debugging as it could allow miners to game the system. + + Args: + dendrite (bt.dendrite): Communication channel to send requests + axons (List[bt.AxonInfo]): List of miner endpoints + synapse (FeedbackRequest): The feedback request to send + + Returns: + list[FeedbackRequest]: List of miner responses + """ + all_responses = [] + batch_size = 10 + + for i in range(0, len(axons), batch_size): + batch_axons = axons[i : i + batch_size] + tasks = [] + + for axon in batch_axons: + tasks.append( + dendrite.forward( + axons=[axon], + synapse=synapse, + deserialize=False, + timeout=60, + ) + ) + + batch_responses = await asyncio.gather(*tasks) + flat_batch_responses = [ + response for sublist in batch_responses for response in sublist + ] + all_responses.extend(flat_batch_responses) + + logger.info( + f"Processed batch {i // batch_size + 1} of {(len(axons) - 1) // batch_size + 1}" + ) + + return all_responses