From b6eb6489509879c302b1cd23f609d4410e4e4a75 Mon Sep 17 00:00:00 2001
From: jarvis8x7b <157810922+jarvis8x7b@users.noreply.github.com>
Date: Wed, 27 Nov 2024 18:22:20 +0800
Subject: [PATCH 1/7] feat: add validator api dataset service, extract dataset
script
build: add deps for dataset service
build: add docker compose services & entrypoints
---
Makefile | 3 +
docker-compose.validator.yaml | 23 +++
docker/Dockerfile.dataset | 35 ++++
entrypoints.sh | 23 +++
entrypoints/dataset_service.py | 156 ++++++++++++++++
pyproject.toml | 19 +-
scripts/extract_dataset.py | 327 +++++++++++++++++++++++++++++++++
test_dataset_endpoint/app.py | 80 ++++++++
8 files changed, 657 insertions(+), 9 deletions(-)
create mode 100644 docker/Dockerfile.dataset
create mode 100644 entrypoints/dataset_service.py
create mode 100644 scripts/extract_dataset.py
create mode 100644 test_dataset_endpoint/app.py
diff --git a/Makefile b/Makefile
index 8156bdbd..b98ba784 100644
--- a/Makefile
+++ b/Makefile
@@ -74,6 +74,9 @@ miner-worker-api:
dojo-cli:
docker compose --env-file .env.miner -f docker-compose.miner.yaml run --rm dojo-cli
+extract-dataset:
+ docker compose -f docker-compose.validator.yaml run --rm --remove-orphans extract-dataset
+
# ---------------------------------------------------------------------------- #
# CORE SERVICE LOGGING #
# ---------------------------------------------------------------------------- #
diff --git a/docker-compose.validator.yaml b/docker-compose.validator.yaml
index 02598809..29bfef01 100644
--- a/docker-compose.validator.yaml
+++ b/docker-compose.validator.yaml
@@ -131,3 +131,26 @@ services:
prisma-setup-vali:
condition: service_completed_successfully
logging: *default-logging
+
+ dataset-service:
+ image: ghcr.io/tensorplex-labs/dojo:dataset
+ env_file:
+ - .env.validator
+ ports:
+ - "127.0.0.1:9999:9999"
+ command: ["dataset-service"]
+ logging: *default-logging
+
+ extract-dataset:
+ image: ghcr.io/tensorplex-labs/dojo:dataset
+ env_file:
+ - .env.validator
+ command: ["extract-dataset"]
+ networks:
+ - dojo-validator
+ volumes:
+ - ./:/app
+ - ./.env.validator:/app/.env
+ - prisma-binary:/root/prisma-python
+ - $HOME/.bittensor:/root/.bittensor
+ logging: *default-logging
diff --git a/docker/Dockerfile.dataset b/docker/Dockerfile.dataset
new file mode 100644
index 00000000..5cc80942
--- /dev/null
+++ b/docker/Dockerfile.dataset
@@ -0,0 +1,35 @@
+FROM python:3.11-slim-bookworm
+
+WORKDIR /app
+
+ENV PATH="/root/.cargo/bin/:$PATH"
+ENV UV_SYSTEM_PYTHON=true
+ENV NVM_DIR=/root/.nvm
+ENV NODE_VERSION=v20.11.1
+ENV NODE_PATH=$NVM_DIR/versions/node/$NODE_VERSION/lib/node_modules
+ENV PATH=$NVM_DIR/versions/node/$NODE_VERSION/bin:$PATH
+
+RUN apt-get update \
+ && apt-get install -y --no-install-recommends \
+ build-essential curl git ca-certificates \
+ && apt-get clean \
+ && rm -rf /var/lib/apt/lists/*
+
+COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv
+COPY . .
+
+ARG TARGETPLATFORM
+
+RUN echo "Building for TARGETPLATFORM: $TARGETPLATFORM"
+
+RUN git config --global --add safe.directory /app
+
+# jank because pytorch has different versions for cpu for darwin VS linux, see pyproject.toml for specifics
+# RUN if [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
+# uv pip install --no-cache -e .[dataset] --find-links https://download.pytorch.org/whl/torch_stable.html; \
+# else \
+# uv pip install --no-cache -e .[dataset]; \
+# fi
+RUN uv pip install --no-cache -e ".[dataset]" --find-links https://download.pytorch.org/whl/torch_stable.html;
+
+ENTRYPOINT ["./entrypoints.sh"]
diff --git a/entrypoints.sh b/entrypoints.sh
index 7f64dfe7..b4a8f637 100755
--- a/entrypoints.sh
+++ b/entrypoints.sh
@@ -53,3 +53,26 @@ if [ "$1" = 'validator' ]; then
--neuron.type validator \
--wandb.project_name ${WANDB_PROJECT_NAME}
fi
+
+if [ "$1" = 'extract-dataset' ]; then
+ echo "Environment variables:"
+ echo "WALLET_HOTKEY: ${WALLET_HOTKEY}"
+ echo "DATABASE_URL: ${DATABASE_URL}"
+ echo "DATASET_SERVICE_BASE_URL: ${DATASET_SERVICE_BASE_URL}"
+ echo "WALLET_COLDKEY: ${WALLET_COLDKEY}"
+ echo "WALLET_HOTKEY: ${WALLET_HOTKEY}"
+ python scripts/extract_dataset.py \
+ --wallet.name ${WALLET_COLDKEY} \
+ --wallet.hotkey ${WALLET_HOTKEY}
+fi
+
+if [ "$1" = 'dataset-service' ]; then
+ echo "Environment variables:"
+ echo "PORT: ${PORT}"
+ echo "S3_BUCKET_NAME: ${S3_BUCKET_NAME}"
+ echo "AWS_REGION: ${AWS_REGION}"
+ echo "MAX_CHUNK_SIZE_MB: ${MAX_CHUNK_SIZE_MB}"
+ python entrypoints/dataset_service.py \
+ --netuid 52 \
+ --subtensor.network finney
+fi
diff --git a/entrypoints/dataset_service.py b/entrypoints/dataset_service.py
new file mode 100644
index 00000000..7a932a07
--- /dev/null
+++ b/entrypoints/dataset_service.py
@@ -0,0 +1,156 @@
+import asyncio
+import os
+from typing import List
+
+import aioboto3
+import aiofiles
+import bittensor as bt
+import httpx
+import uvicorn
+from bittensor.btlogging import logging as logger
+from fastapi import FastAPI, File, Form, HTTPException, UploadFile
+from fastapi.middleware.cors import CORSMiddleware
+from substrateinterface import Keypair
+
+from commons.objects import ObjectManager
+from dojo import VALIDATOR_MIN_STAKE
+
+app = FastAPI(title="Dataset Upload Service")
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+config = ObjectManager.get_config()
+subtensor = bt.subtensor(config=config)
+metagraph = subtensor.metagraph(netuid=52, lite=True)
+AWS_REGION = os.getenv("AWS_REGION")
+BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
+MAX_CHUNK_SIZE_MB = int(os.getenv("MAX_CHUNK_SIZE_MB", 50))
+
+
+def verify_signature(hotkey: str, signature: str, message: str) -> bool:
+ keypair = Keypair(ss58_address=hotkey, ss58_format=42)
+ if not keypair.verify(data=message, signature=signature):
+ logger.error(f"Invalid signature for address={hotkey}")
+ return False
+
+ logger.success(f"Signature verified, signed by {hotkey}")
+ return True
+
+
+def check_stake(hotkey: str) -> bool:
+ uid = -1
+ try:
+ uid = metagraph.hotkeys.index(hotkey)
+ except ValueError:
+ logger.error(f"Hotkey {hotkey} not found in metagraph")
+ return False
+
+ # Check if stake meets minimum threshold
+ stake = metagraph.S[uid].item()
+
+ if stake < VALIDATOR_MIN_STAKE:
+ logger.error(
+ f"Insufficient stake for hotkey {hotkey}: {stake} < {VALIDATOR_MIN_STAKE}"
+ )
+ return False
+
+ logger.info(f"Stake check passed for {hotkey} with stake {stake}")
+ return True
+
+
+@app.post("/upload_dataset")
+async def upload_dataset(
+ hotkey: str = Form(...),
+ signature: str = Form(...),
+ message: str = Form(...),
+ files: List[UploadFile] = File(...),
+):
+ try:
+ if not signature.startswith("0x"):
+ raise HTTPException(
+ status_code=401, detail="Invalid signature format, must be hex."
+ )
+
+ session = aioboto3.Session(region_name=AWS_REGION)
+ async with session.resource("s3") as s3:
+ bucket = await s3.Bucket(BUCKET_NAME)
+ for file in files:
+ content = await file.read()
+ file_size = len(content)
+ if file_size > MAX_CHUNK_SIZE_MB * 1024 * 1024: # 50MB in bytes
+ raise HTTPException(
+ status_code=413,
+ detail=f"File too large. Maximum size is {MAX_CHUNK_SIZE_MB}MB",
+ )
+
+ filename = f"{file.filename}"
+
+ await bucket.put_object(
+ Key=filename,
+ Body=content,
+ )
+ except Exception as e:
+ logger.error(f"Error uploading dataset: {e}")
+ raise HTTPException(status_code=500, detail=f"Error uploading dataset: {e}")
+
+ return {
+ "success": True,
+ "message": "Files uploaded successfully",
+ "filenames": [file.filename for file in files],
+ }
+
+
+async def server():
+ config = uvicorn.Config(app, host="0.0.0.0", port=9999)
+ server = uvicorn.Server(config)
+ await server.serve()
+
+
+async def test_endpoint():
+ # Create test data
+ test_data = {
+ "hotkey": "asdfg",
+ "signature": "0xasdfg",
+ "message": "On 2024-12-02 18:15:23.663947 +08 Tensorplex is awesome",
+ }
+ # Create a temporary test file
+ test_filename = "dataset_20241202.jsonl"
+
+ # Build form data similar to how dojo.py does it
+ files = []
+
+ # Add file to form data if it exists
+ if os.path.exists(test_filename):
+ async with aiofiles.open(test_filename, "rb") as f:
+ file_content = await f.read()
+ files.append(("files", (test_filename, file_content, "application/json")))
+ else:
+ raise FileNotFoundError(f"Test file {test_filename} not found")
+
+ # Make request using httpx
+ async with httpx.AsyncClient() as client:
+ response = await client.post(
+ "http://localhost:8000/upload_dataset",
+ data={
+ "hotkey": test_data["hotkey"],
+ "signature": test_data["signature"],
+ "message": test_data["message"],
+ },
+ files=files,
+ timeout=30.0,
+ )
+ print(f"Status: {response.status_code}")
+ print(f"Response: {response.json()}")
+
+
+if __name__ == "__main__":
+ import sys
+
+ if "--test" in sys.argv:
+ asyncio.run(test_endpoint())
+ else:
+ asyncio.run(server())
diff --git a/pyproject.toml b/pyproject.toml
index bb8b092e..00ee0f51 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -8,11 +8,11 @@ name = "dojo"
description = "dojo subnet"
readme = "README.md"
authors = [
- {name = "jarvis8x7b"},
- {name = "karootplx"},
- {name = "codebender37"}
+ { name = "jarvis8x7b" },
+ { name = "karootplx" },
+ { name = "codebender37" }
]
-license = {text = "MIT"}
+license = { text = "MIT" }
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
@@ -61,6 +61,7 @@ dependencies = [
[project.optional-dependencies]
dev = ["commitizen", "curlify2", "pytest", "ruff"]
test = ["pytest", "nox"]
+dataset = ["aioboto3", "aiofiles", "python-multipart"]
[project.scripts]
dojo = "dojo_cli:main"
@@ -143,15 +144,15 @@ select = [
"E4", # import-related errors (e.g., unused imports, import order)
"E7", # statement-related errors (e.g., multiple statements on one line)
"E9", # runtime errors (e.g., syntax errors, undefined names)
- "F", # pyflakes errors (e.g., unused variables, undefined names)
+ "F", # pyflakes errors (e.g., unused variables, undefined names)
"UP", # pyupgrade rules (suggests newer python syntax)
- "I" # isort rules (sorts and organizes imports)
+ "I" # isort rules (sorts and organizes imports)
]
ignore = [
"UP006", # Preserve 'typing.Tuple' instead of 'tuple'
"UP035", # Preserve 'typing.Dict' instead of 'dict'
- "C901", # Ignore McCabe complexity (if you use flake8 complexity checks)
- "E203" # Ignore whitespace before ':', conflicts with Black] # Ignore specific pyupgrade rules that prevent the project from running
+ "C901", # Ignore McCabe complexity (if you use flake8 complexity checks)
+ "E203" # Ignore whitespace before ':', conflicts with Black] # Ignore specific pyupgrade rules that prevent the project from running
]
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
@@ -162,7 +163,7 @@ unfixable = []
known-third-party = ["wandb"]
[tool.setuptools]
-packages = {find = {}}
+packages = { find = {} }
include-package-data = true
# allows us to use CLI even though it is a standalone script
py-modules = ["dojo_cli"]
diff --git a/scripts/extract_dataset.py b/scripts/extract_dataset.py
new file mode 100644
index 00000000..ba43bfc0
--- /dev/null
+++ b/scripts/extract_dataset.py
@@ -0,0 +1,327 @@
+import asyncio
+import os
+from datetime import datetime
+from typing import AsyncGenerator, List
+
+import aiofiles
+import bittensor as bt
+import httpx
+import numpy as np
+from bittensor.btlogging import logging as logger
+from pydantic import BaseModel, model_serializer
+
+from commons.exceptions import (
+ NoNewExpiredTasksYet,
+)
+from commons.objects import ObjectManager
+from database.client import connect_db, disconnect_db
+from database.mappers import (
+ map_feedback_request_model_to_feedback_request,
+)
+from database.prisma.models import (
+ Feedback_Request_Model,
+)
+from database.prisma.types import (
+ Feedback_Request_ModelInclude,
+ Feedback_Request_ModelWhereInput,
+)
+from dojo import TASK_DEADLINE
+from dojo.protocol import (
+ CompletionResponses,
+ DendriteQueryResponse,
+)
+from dojo.utils.config import source_dotenv
+
+source_dotenv()
+
+DATASET_SERVICE_BASE_URL = os.getenv("DATASET_SERVICE_BASE_URL")
+MAX_CHUNK_SIZE_MB = int(os.getenv("MAX_CHUNK_SIZE_MB", 50))
+
+if DATASET_SERVICE_BASE_URL is None:
+ raise ValueError("DATASET_SERVICE_BASE_URL must be set")
+if MAX_CHUNK_SIZE_MB is None:
+ raise ValueError("MAX_CHUNK_SIZE_MB must be set")
+
+
+# represents a row in the jsonl dataset
+class Row(BaseModel):
+ prompt: str
+ completions: list[CompletionResponses]
+ # shape (num_completions, num_miners)
+ raw_scores: list[list[float]]
+ # shape (num_completions)
+ mean_scores: list[float]
+
+ class Config:
+ arbitrary_types_allowed = True
+
+ @model_serializer
+ def serialize_model(self):
+ return {
+ "prompt": self.prompt,
+ "completions": self.completions,
+ "raw_scores": self.raw_scores,
+ "mean_scores": self.mean_scores,
+ }
+
+
+async def build_jsonl(filename: str):
+ with open(filename, "w") as file:
+ batch_size = 10
+ task_count = 0
+ async for task_batch, has_more_batches in get_processed_tasks(batch_size):
+ if not has_more_batches and not task_batch:
+ break
+
+ for task in task_batch:
+ # Extract prompt from validator request
+ prompt = task.request.prompt
+
+ # Extract completions from miner responses
+ completions = task.request.completion_responses
+
+ raw_scores = []
+ for miner_response in task.miner_responses:
+ miner_ratings = [
+ c.score for c in miner_response.completion_responses
+ ]
+ if any(rating is None for rating in miner_ratings):
+ continue
+ raw_scores.append(miner_ratings)
+
+ # shape (num_completions, num_miners)
+ raw_scores_vec = np.array(raw_scores)
+ logger.info(f"raw_scores_vec shape: {raw_scores_vec.shape}")
+ logger.info(f"raw_scores_vec: {raw_scores_vec}")
+
+ if raw_scores_vec.size > 0:
+ mean_scores = raw_scores_vec.mean(axis=1)
+ logger.info(f"mean_scores shape: {mean_scores.shape}")
+ jsonl_row = Row(
+ prompt=prompt,
+ completions=completions,
+ raw_scores=raw_scores,
+ mean_scores=mean_scores.tolist(),
+ )
+ else:
+ jsonl_row = Row(
+ prompt=prompt,
+ completions=completions,
+ raw_scores=[],
+ mean_scores=[],
+ )
+
+ # Write the entry as a JSON line
+ file.write(jsonl_row.model_dump_json() + "\n")
+
+ task_count += len(task_batch)
+ logger.info(f"Scraped task count: {task_count}")
+
+
+async def get_processed_tasks(
+ batch_size: int = 10,
+) -> AsyncGenerator[tuple[List[DendriteQueryResponse], bool], None]:
+ """Yields batches of processed Feedback_Request_Model records along with a boolean flag indicating the presence of additional batches.
+
+ This function retrieves tasks that have been fully processed. The batch size can be specified to control the number of tasks returned in each batch.
+
+ Args:
+ batch_size (int, optional): The number of tasks to include in each batch. Defaults to 10.
+
+ Raises:
+ NoNewExpiredTasksYet: Raised if no processed tasks are available for retrieval.
+
+ Yields:
+ AsyncGenerator[tuple[List[DendriteQueryResponse], bool], None]: An asynchronous generator yielding a tuple containing a list of DendriteQueryResponse objects and a boolean indicating if more batches are available.
+ """
+
+ # find all validator requests first
+ include_query = Feedback_Request_ModelInclude(
+ {
+ "completions": True,
+ "criteria_types": True,
+ "ground_truths": True,
+ "parent_request": True,
+ }
+ )
+
+ vali_where_query = Feedback_Request_ModelWhereInput(
+ {
+ "parent_id": None, # no parent means it's a validator request
+ # only check for tasks that are completely done
+ "is_processed": {"equals": True},
+ }
+ )
+
+ # count first total including non
+ task_count = await Feedback_Request_Model.prisma().count(
+ where=vali_where_query,
+ )
+
+ logger.info(f"Count of processed tasks: {task_count}")
+
+ if not task_count:
+ raise NoNewExpiredTasksYet(
+ f"No expired tasks found for processing, please wait for tasks to pass the task deadline of {TASK_DEADLINE} seconds."
+ )
+
+ for i in range(0, task_count, batch_size):
+ # find all unprocesed validator requests
+ validator_requests = await Feedback_Request_Model.prisma().find_many(
+ include=include_query,
+ where=vali_where_query,
+ order={"created_at": "desc"},
+ skip=i,
+ take=batch_size,
+ )
+
+ # find all miner responses
+ processed_vali_request_ids = [r.id for r in validator_requests]
+ miner_responses = await Feedback_Request_Model.prisma().find_many(
+ include=include_query,
+ where={
+ "parent_id": {"in": processed_vali_request_ids},
+ "is_processed": {"equals": True},
+ },
+ order={"created_at": "desc"},
+ )
+
+ # NOTE: technically a DendriteQueryResponse represents a task
+ tasks: list[DendriteQueryResponse] = []
+ for validator_request in validator_requests:
+ vali_request = map_feedback_request_model_to_feedback_request(
+ validator_request
+ )
+
+ m_responses = list(
+ map(
+ lambda x: map_feedback_request_model_to_feedback_request(
+ x, is_miner=True
+ ),
+ [m for m in miner_responses if m.parent_id == validator_request.id],
+ )
+ )
+
+ tasks.append(
+ DendriteQueryResponse(request=vali_request, miner_responses=m_responses)
+ )
+
+ # yield responses, so caller can do something
+ has_more_batches = True
+ yield tasks, has_more_batches
+
+ yield [], False
+
+
+async def upload(hotkey: str, signature: str, message: str, filename: str):
+ if not signature.startswith("0x"):
+ signature = f"0x{signature}"
+
+ # Build form data similar to how dojo.py does it
+ form_body = {
+ "hotkey": hotkey,
+ "signature": signature,
+ "message": message,
+ }
+ # Add file to form data if it exists
+ if os.path.exists(filename):
+ chunks = await chunk_file(filename, MAX_CHUNK_SIZE_MB)
+
+ # Make request using httpx
+ async with httpx.AsyncClient() as client:
+ for chunk_filename, chunk_content in chunks:
+ # Append to files list with correct format
+ files = [("files", (chunk_filename, chunk_content, "application/json"))]
+ response = await client.post(
+ f"{DATASET_SERVICE_BASE_URL}/upload_dataset",
+ data=form_body,
+ files=files,
+ timeout=60.0,
+ )
+ logger.info(f"Status: {response.status_code}")
+ response_json = response.json()
+ logger.info(f"Response: {response_json}")
+ is_success = response.status_code == 200 and response_json.get(
+ "success"
+ )
+ if not is_success:
+ raise Exception(f"Failed to upload file {chunk_filename}")
+ await asyncio.sleep(1)
+
+
+async def chunk_file(filename: str, chunk_size_mb: int = 50):
+ chunk_size = chunk_size_mb * 1024 * 1024 # Convert MB to bytes
+
+ if os.path.exists(filename):
+ async with aiofiles.open(filename) as f: # Open in text mode
+ chunks = []
+ current_chunk = []
+ current_chunk_size = 0
+
+ # ensure that when we chunk, we don't split across lines
+ async for line in f:
+ line_size = len(line.encode("utf-8")) # Get size of line in bytes
+ if current_chunk_size + line_size > chunk_size:
+ # Use consistent format
+ base, ext = os.path.splitext(filename)
+ chunk_filename = f"{base}_part{len(chunks) + 1}{ext}"
+ chunks.append((chunk_filename, "".join(current_chunk)))
+ current_chunk = []
+ current_chunk_size = 0
+
+ current_chunk.append(line)
+ current_chunk_size += line_size
+
+ # Use same format for last chunk
+ if current_chunk:
+ base, ext = os.path.splitext(filename)
+ chunk_filename = f"{base}_part{len(chunks) + 1}{ext}"
+ chunks.append((chunk_filename, "".join(current_chunk)))
+
+ return chunks
+ else:
+ raise FileNotFoundError(f"Test file {filename} not found")
+
+
+async def main():
+ await connect_db()
+ config = ObjectManager.get_config()
+ wallet = bt.wallet(config=config)
+ hotkey = wallet.hotkey.ss58_address
+ message = f"Uploading dataset for validator with hotkey: {hotkey}"
+ signature = wallet.hotkey.sign(message).hex() # Convert signature to hex string
+
+ # Create filename with current date
+ current_date = datetime.now().strftime("%Y%m%d")
+ filename = f"dataset_{current_date}.jsonl"
+ # Check if file already exists
+ if os.path.exists(filename):
+ logger.warning(f"File {filename} already exists, skipping scrape db step")
+ else:
+ await build_jsonl(filename)
+
+ try:
+ upload_success = await upload(hotkey, signature, message, filename)
+ if upload_success:
+ logger.info("Upload successful! Removing local dataset file.")
+ os.remove(filename)
+ except Exception as e:
+ logger.error(f"Error occurred while trying to upload dataset: {e}")
+ finally:
+ await disconnect_db()
+
+
+async def _test_chunking():
+ filename = "dummy_dataset.jsonl"
+ chunks = await chunk_file(filename, MAX_CHUNK_SIZE_MB)
+ logger.info(f"number of chunks: {len(chunks)}")
+ for i, (chunk_filename, chunk_content) in enumerate(chunks, 1):
+ logger.info(f"\nSaving chunk {i} to {chunk_filename}")
+ async with aiofiles.open(chunk_filename, "w") as f:
+ await f.write(chunk_content)
+ logger.info(f"Saved chunk {i} ({len(chunk_content)} bytes)")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
+ # asyncio.run(_test_chunking())
diff --git a/test_dataset_endpoint/app.py b/test_dataset_endpoint/app.py
new file mode 100644
index 00000000..6719da71
--- /dev/null
+++ b/test_dataset_endpoint/app.py
@@ -0,0 +1,80 @@
+import asyncio
+from fastapi import Form, File, UploadFile, Request, FastAPI
+from typing import List
+from fastapi.responses import HTMLResponse
+from fastapi.templating import Jinja2Templates
+import aiofiles
+import os
+import httpx
+
+import uvicorn
+app = FastAPI()
+templates = Jinja2Templates(directory="templates")
+
+
+@app.post("/submit")
+def submit(
+ name: str = Form(...),
+ point: float = Form(...),
+ is_accepted: bool = Form(...),
+ files: List[UploadFile] = File(...),
+):
+ return {
+ "JSON Payload": {"name": name, "point": point, "is_accepted": is_accepted},
+ "Filenames": [file.filename for file in files],
+ }
+
+
+@app.get("/", response_class=HTMLResponse)
+async def main(request: Request):
+ return templates.TemplateResponse("index.html", {"request": request})
+
+
+
+async def server():
+ config = uvicorn.Config(app, host="0.0.0.0", port=8000)
+ server = uvicorn.Server(config)
+ await server.serve()
+
+async def test_endpoint():
+ # Create test data
+ test_data = {
+ 'name': 'test_user',
+ 'point': 95.5,
+ 'is_accepted': True
+ }
+
+ # Create a temporary test file
+ test_filename = "dataset_20241202.jsonl"
+
+ # Build form data similar to how dojo.py does it
+ form_body = {
+ 'name': ('', test_data['name']),
+ 'point': ('', str(test_data['point'])),
+ 'is_accepted': ('', str(test_data['is_accepted']))
+ }
+
+ # Add file to form data if it exists
+ if os.path.exists(test_filename):
+ async with aiofiles.open(test_filename, 'rb') as f:
+ file_content = await f.read()
+ form_body['files'] = (test_filename, file_content)
+ else:
+ raise FileNotFoundError(f"Test file {test_filename} not found")
+
+ # Make request using httpx
+ async with httpx.AsyncClient() as client:
+ response = await client.post(
+ 'http://localhost:8000/submit',
+ files=form_body,
+ timeout=15.0
+ )
+ print(f"Status: {response.status_code}")
+ print(f"Response: {response.json()}")
+
+if __name__ == "__main__":
+ import sys
+ if "--test" in sys.argv:
+ asyncio.run(test_endpoint())
+ else:
+ asyncio.run(server())
From 3749c129c1e802a6ab030a781bfd0dfd163fea31 Mon Sep 17 00:00:00 2001
From: jarvis8x7b <157810922+jarvis8x7b@users.noreply.github.com>
Date: Tue, 3 Dec 2024 17:06:08 +0800
Subject: [PATCH 2/7] chore: remove test script
---
test_dataset_endpoint/app.py | 80 ------------------------------------
1 file changed, 80 deletions(-)
delete mode 100644 test_dataset_endpoint/app.py
diff --git a/test_dataset_endpoint/app.py b/test_dataset_endpoint/app.py
deleted file mode 100644
index 6719da71..00000000
--- a/test_dataset_endpoint/app.py
+++ /dev/null
@@ -1,80 +0,0 @@
-import asyncio
-from fastapi import Form, File, UploadFile, Request, FastAPI
-from typing import List
-from fastapi.responses import HTMLResponse
-from fastapi.templating import Jinja2Templates
-import aiofiles
-import os
-import httpx
-
-import uvicorn
-app = FastAPI()
-templates = Jinja2Templates(directory="templates")
-
-
-@app.post("/submit")
-def submit(
- name: str = Form(...),
- point: float = Form(...),
- is_accepted: bool = Form(...),
- files: List[UploadFile] = File(...),
-):
- return {
- "JSON Payload": {"name": name, "point": point, "is_accepted": is_accepted},
- "Filenames": [file.filename for file in files],
- }
-
-
-@app.get("/", response_class=HTMLResponse)
-async def main(request: Request):
- return templates.TemplateResponse("index.html", {"request": request})
-
-
-
-async def server():
- config = uvicorn.Config(app, host="0.0.0.0", port=8000)
- server = uvicorn.Server(config)
- await server.serve()
-
-async def test_endpoint():
- # Create test data
- test_data = {
- 'name': 'test_user',
- 'point': 95.5,
- 'is_accepted': True
- }
-
- # Create a temporary test file
- test_filename = "dataset_20241202.jsonl"
-
- # Build form data similar to how dojo.py does it
- form_body = {
- 'name': ('', test_data['name']),
- 'point': ('', str(test_data['point'])),
- 'is_accepted': ('', str(test_data['is_accepted']))
- }
-
- # Add file to form data if it exists
- if os.path.exists(test_filename):
- async with aiofiles.open(test_filename, 'rb') as f:
- file_content = await f.read()
- form_body['files'] = (test_filename, file_content)
- else:
- raise FileNotFoundError(f"Test file {test_filename} not found")
-
- # Make request using httpx
- async with httpx.AsyncClient() as client:
- response = await client.post(
- 'http://localhost:8000/submit',
- files=form_body,
- timeout=15.0
- )
- print(f"Status: {response.status_code}")
- print(f"Response: {response.json()}")
-
-if __name__ == "__main__":
- import sys
- if "--test" in sys.argv:
- asyncio.run(test_endpoint())
- else:
- asyncio.run(server())
From e1430a5a9d355d649a80eb1bc6921532a6fd3880 Mon Sep 17 00:00:00 2001
From: jarvis8x7b <157810922+jarvis8x7b@users.noreply.github.com>
Date: Tue, 3 Dec 2024 17:17:50 +0800
Subject: [PATCH 3/7] chore: fix comment
---
scripts/extract_dataset.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/scripts/extract_dataset.py b/scripts/extract_dataset.py
index ba43bfc0..29a95f18 100644
--- a/scripts/extract_dataset.py
+++ b/scripts/extract_dataset.py
@@ -47,7 +47,7 @@
class Row(BaseModel):
prompt: str
completions: list[CompletionResponses]
- # shape (num_completions, num_miners)
+ # shape (num_miners, num_completions)
raw_scores: list[list[float]]
# shape (num_completions)
mean_scores: list[float]
From 2fdb443fca683eae42bca3b81419c9956853b3e2 Mon Sep 17 00:00:00 2001
From: jarvis8x7b <157810922+jarvis8x7b@users.noreply.github.com>
Date: Wed, 4 Dec 2024 15:51:41 +0800
Subject: [PATCH 4/7] chore: update readme and env example
---
.env.validator.example | 1 +
README.md | 11 +++++++++++
2 files changed, 12 insertions(+)
diff --git a/.env.validator.example b/.env.validator.example
index 332e8a41..7b41ec36 100644
--- a/.env.validator.example
+++ b/.env.validator.example
@@ -1,5 +1,6 @@
WALLET_COLDKEY=
WALLET_HOTKEY=
+DATASET_SERVICE_BASE_URL=https://dojo-validator-api.tensorplex.ai
# Mainnet related config
NETUID=52
diff --git a/README.md b/README.md
index b6d6c047..3ebd4010 100644
--- a/README.md
+++ b/README.md
@@ -48,6 +48,7 @@
- [Option 2: Decentralised Method](#option-2-decentralised-method)
- [Setup Subscription Key for Labellers on UI to connect to Dojo Subnet for scoring](#setup-subscription-key-for-labellers-on-ui-to-connect-to-dojo-subnet-for-scoring)
- [Validating](#validating)
+ - [Data Collection](#data-collection)
- [Auto-updater](#auto-updater)
- [Dojo CLI](#dojo-cli)
- [For Dojo developerss](#for-dojo-developerss)
@@ -416,6 +417,7 @@ cp .env.validator.example .env.validator
WALLET_COLDKEY=# the name of the coldkey
WALLET_HOTKEY=# the name of the hotkey
+DATASET_SERVICE_BASE_URL=https://dojo-validator-api.tensorplex.ai
# head to https://wandb.ai/authorize to get your API key
WANDB_API_KEY=""
@@ -448,6 +450,15 @@ make validator
To start with autoupdate for validators (**strongly recommended**), see the [Auto-updater](#auto-updater) section.
+## Data Collection
+
+To export all data that has been collected from the validator, ensure that you have the environment variables setup properly as in [validator-setup](#validating), then run the following:
+
+```bash
+make validator-pull
+make extract-dataset
+```
+
# Auto-updater
> [!WARNING]
From 70ce852cbf8f509fb6f21c6269dec38924840dbf Mon Sep 17 00:00:00 2001
From: jarvis8x7b <157810922+jarvis8x7b@users.noreply.github.com>
Date: Wed, 4 Dec 2024 16:05:40 +0800
Subject: [PATCH 5/7] fix: add auth
---
entrypoints/dataset_service.py | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git a/entrypoints/dataset_service.py b/entrypoints/dataset_service.py
index 7a932a07..9952fd70 100644
--- a/entrypoints/dataset_service.py
+++ b/entrypoints/dataset_service.py
@@ -31,6 +31,10 @@
MAX_CHUNK_SIZE_MB = int(os.getenv("MAX_CHUNK_SIZE_MB", 50))
+def verify_hotkey_in_metagraph(hotkey: str) -> bool:
+ return hotkey in metagraph.hotkeys
+
+
def verify_signature(hotkey: str, signature: str, message: str) -> bool:
keypair = Keypair(ss58_address=hotkey, ss58_format=42)
if not keypair.verify(data=message, signature=signature):
@@ -75,6 +79,22 @@ async def upload_dataset(
status_code=401, detail="Invalid signature format, must be hex."
)
+ if not verify_signature(hotkey, signature, message):
+ logger.error(f"Invalid signature for address={hotkey}")
+ raise HTTPException(status_code=401, detail="Invalid signature.")
+
+ if not verify_hotkey_in_metagraph(hotkey):
+ logger.error(f"Hotkey {hotkey} not found in metagraph")
+ raise HTTPException(
+ status_code=401, detail="Hotkey not found in metagraph."
+ )
+
+ if not check_stake(hotkey):
+ logger.error(f"Insufficient stake for hotkey {hotkey}")
+ raise HTTPException(
+ status_code=401, detail="Insufficient stake for hotkey."
+ )
+
session = aioboto3.Session(region_name=AWS_REGION)
async with session.resource("s3") as s3:
bucket = await s3.Bucket(BUCKET_NAME)
From b08d64ce555a1db0cbf44815870897d161d5aa9e Mon Sep 17 00:00:00 2001
From: jarvis8x7b <157810922+jarvis8x7b@users.noreply.github.com>
Date: Wed, 4 Dec 2024 17:42:07 +0800
Subject: [PATCH 6/7] refactor: add hotkey to filename
---
entrypoints/dataset_service.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/entrypoints/dataset_service.py b/entrypoints/dataset_service.py
index 9952fd70..d712af32 100644
--- a/entrypoints/dataset_service.py
+++ b/entrypoints/dataset_service.py
@@ -107,7 +107,7 @@ async def upload_dataset(
detail=f"File too large. Maximum size is {MAX_CHUNK_SIZE_MB}MB",
)
- filename = f"{file.filename}"
+ filename = f"hotkey_{hotkey}_{file.filename}"
await bucket.put_object(
Key=filename,
From 1c0860327c81df22a61ccaa3687ae52c1b550c46 Mon Sep 17 00:00:00 2001
From: jarvis8x7b <157810922+jarvis8x7b@users.noreply.github.com>
Date: Wed, 4 Dec 2024 19:56:30 +0800
Subject: [PATCH 7/7] chore: format toml
---
pyproject.toml | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
diff --git a/pyproject.toml b/pyproject.toml
index 00ee0f51..0d842d9c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -8,11 +8,11 @@ name = "dojo"
description = "dojo subnet"
readme = "README.md"
authors = [
- { name = "jarvis8x7b" },
- { name = "karootplx" },
- { name = "codebender37" }
+ {name = "jarvis8x7b"},
+ {name = "karootplx"},
+ {name = "codebender37"}
]
-license = { text = "MIT" }
+license = {text = "MIT"}
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
@@ -144,15 +144,15 @@ select = [
"E4", # import-related errors (e.g., unused imports, import order)
"E7", # statement-related errors (e.g., multiple statements on one line)
"E9", # runtime errors (e.g., syntax errors, undefined names)
- "F", # pyflakes errors (e.g., unused variables, undefined names)
+ "F", # pyflakes errors (e.g., unused variables, undefined names)
"UP", # pyupgrade rules (suggests newer python syntax)
- "I" # isort rules (sorts and organizes imports)
+ "I" # isort rules (sorts and organizes imports)
]
ignore = [
"UP006", # Preserve 'typing.Tuple' instead of 'tuple'
"UP035", # Preserve 'typing.Dict' instead of 'dict'
- "C901", # Ignore McCabe complexity (if you use flake8 complexity checks)
- "E203" # Ignore whitespace before ':', conflicts with Black] # Ignore specific pyupgrade rules that prevent the project from running
+ "C901", # Ignore McCabe complexity (if you use flake8 complexity checks)
+ "E203" # Ignore whitespace before ':', conflicts with Black] # Ignore specific pyupgrade rules that prevent the project from running
]
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
@@ -163,7 +163,7 @@ unfixable = []
known-third-party = ["wandb"]
[tool.setuptools]
-packages = { find = {} }
+packages = {find = {}}
include-package-data = true
# allows us to use CLI even though it is a standalone script
py-modules = ["dojo_cli"]