Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: migrate score data from db to .pt file #80

Merged
merged 8 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.miner.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ SUBTENSOR_ENDPOINT=wss://entrypoint-finney.opentensor.ai:443
# NETUID=98
# SUBTENSOR_NETWORK=test
# SUBTENSOR_ENDPOINT=ws://testnet-lite:9944
# VALIDATOR_MIN_STAKE=20000

# Task related config
# this a maximum of 4 workers may submit responses for a single task
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,6 @@ testing/

# prisma
database/prisma/

# scores data
scores/*.pt
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ DOJO_API_KEY= # blank for now
WALLET_COLDKEY=# the name of the coldkey
WALLET_HOTKEY=# the name of the hotkey
AXON_PORT=8888 # port to serve requests over the public network for validators to call
VALIDATOR_MIN_STAKE=20000 # minimum stake required for validators default is 20000 TAO (use this to bypass the blacklist function in testnet)
# Task related config
TASK_MAX_RESULT=4 # this means that each miner can have up to 4 workers fill in responses
```
Expand Down
4 changes: 4 additions & 0 deletions commons/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ class ObjectManager:
def get_miner(cls):
if get_config().simulation:
from simulator.miner import MinerSim

if cls._miner is None:
cls._miner = MinerSim()
else:
from neurons.miner import Miner

if cls._miner is None:
cls._miner = Miner()
return cls._miner
Expand All @@ -23,10 +25,12 @@ def get_miner(cls):
def get_validator(cls):
if get_config().simulation:
from simulator.validator import ValidatorSim

if cls._validator is None:
cls._validator = ValidatorSim()
else:
from neurons.validator import Validator

if cls._validator is None:
cls._validator = Validator()
return cls._validator
Expand Down
89 changes: 89 additions & 0 deletions commons/score_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json
from pathlib import Path

import torch
from bittensor.btlogging import logging as logger

from database.client import connect_db, disconnect_db
from database.prisma.models import Score_Model


class ScoreStorage:
"""Handles persistence of validator scores"""

SCORES_DIR = Path("scores")
SCORES_FILE = SCORES_DIR / "miner_scores.pt"

@classmethod
async def migrate_from_db(cls) -> bool:
"""One-time migration of scores from database to .pt file
Returns:
bool: True if migration successful or file already exists, False if migration failed
"""
try:
if cls.SCORES_FILE.exists():
logger.info("Scores file already exists, skipping migration")
return True

# Connect to database first
await connect_db()

try:
# Get scores from database
score_record = await Score_Model.prisma().find_first(
order={"created_at": "desc"}
)
if not score_record:
logger.warning("No scores found in database to migrate")
return True # Not an error, just no scores yet

scores = torch.tensor(json.loads(score_record.score))

# Create scores directory if it doesn't exist
cls.SCORES_DIR.mkdir(exist_ok=True)

# Save scores to .pt file
torch.save(scores, cls.SCORES_FILE)
logger.success(f"Successfully migrated scores to {cls.SCORES_FILE}")

# Verify the migration
loaded_scores = torch.load(cls.SCORES_FILE)
if torch.equal(scores, loaded_scores):
logger.success("Migration verification successful - scores match")
return True
else:
logger.error("Migration verification failed - scores do not match")
return False

finally:
await disconnect_db()

except Exception as e:
logger.error(f"Failed to migrate scores: {e}")
return False

@classmethod
async def save(cls, scores: torch.Tensor) -> None:
"""Save validator scores to .pt file"""
try:
cls.SCORES_DIR.mkdir(exist_ok=True)
torch.save(scores, cls.SCORES_FILE)
logger.success("Successfully saved validator scores to file")
except Exception as e:
logger.error(f"Failed to save validator scores: {e}")
raise

@classmethod
async def load(cls) -> torch.Tensor | None:
"""Load validator scores from .pt file"""
try:
if not cls.SCORES_FILE.exists():
logger.warning("No validator scores file found")
return None

scores = torch.load(cls.SCORES_FILE)
logger.success("Successfully loaded validator scores from file")
return scores
except Exception as e:
logger.error(f"Failed to load validator scores: {e}")
return None
4 changes: 2 additions & 2 deletions dojo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def get_latest_git_tag():
)


VALIDATOR_MIN_STAKE = 20000
VALIDATOR_MIN_STAKE = int(os.getenv("VALIDATOR_MIN_STAKE", "20000"))
TASK_DEADLINE = 6 * 60 * 60

# Define the time intervals for various tasks.
Expand All @@ -44,7 +44,7 @@ def get_latest_git_tag():

if get_config().fast_mode:
print("Running in fast mode for testing purposes...")
VALIDATOR_MIN_STAKE = 20000
VALIDATOR_MIN_STAKE = int(os.getenv("VALIDATOR_MIN_STAKE", "20000"))
TASK_DEADLINE = 180
VALIDATOR_RUN = 60
VALIDATOR_HEARTBEAT = 15
Expand Down
2 changes: 1 addition & 1 deletion entrypoints.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ if [ "$1" = 'validator' ]; then
--neuron.type validator \
--wandb.project_name ${WANDB_PROJECT_NAME} \
${EXTRA_ARGS}
fi
fi
19 changes: 13 additions & 6 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from commons.obfuscation.obfuscation_utils import obfuscate_html_and_js
from commons.objects import ObjectManager
from commons.orm import ORM
from commons.score_storage import ScoreStorage
from commons.scoring import Scoring
from commons.utils import (
_terminal_plot,
Expand All @@ -45,7 +46,6 @@
set_expire_time,
ttl_get_block,
)
from database.client import connect_db
from dojo import __spec_version__
from dojo.protocol import (
CompletionResponses,
Expand Down Expand Up @@ -103,10 +103,17 @@ def __init__(self):
self.scores: torch.Tensor = torch.zeros(
len(self.metagraph.hotkeys), dtype=torch.float32
)
# manually always register and always sync metagraph when application starts
self.check_registered()
self.executor = ThreadPoolExecutor(max_workers=2)

# Run score migration before loading state
migration_success = self.loop.run_until_complete(ScoreStorage.migrate_from_db())
if not migration_success:
logger.error(
"Score migration failed - cannot continue without valid scores"
)
raise RuntimeError("Score migration failed - validator cannot start")

self.executor = ThreadPoolExecutor(max_workers=2)
self.load_state()

init_wandb(config=self.config, my_uid=self.uid, wallet=self.wallet)
Expand Down Expand Up @@ -713,7 +720,8 @@ async def save_state(
logger.warning("Scores are all zeros, but saving anyway!")
# raise EmptyScores("Skipping save as scores are all empty")

await ORM.create_or_update_validator_score(self.scores)
# await ORM.create_or_update_validator_score(self.scores)
await ScoreStorage.save(self.scores)
logger.success(f"📦 Saved validator state with scores: {self.scores}")
except EmptyScores as e:
logger.debug(f"No need to to save validator state: {e}")
Expand All @@ -722,8 +730,7 @@ async def save_state(

async def _load_state(self):
try:
await connect_db()
scores = await ORM.get_validator_score()
scores = await ScoreStorage.load()

if scores is None:
num_processed_tasks = await ORM.get_num_processed_tasks()
Expand Down
78 changes: 78 additions & 0 deletions scripts/inspect_scores.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import argparse
from typing import List

import torch
from tabulate import tabulate
from termcolor import colored


def format_score_table(scores: torch.Tensor) -> List[List[str]]:
"""Format scores into a table with 10 columns"""
table_data = []
row = []
for i, score in enumerate(scores):
score_str = f"{score.item():.4f}"
# Color non-zero scores
if score.item() > 0:
score_str = colored(score_str, "green")
row.append([i, score_str])

# Use 10 columns for better screen fit
if len(row) == 10 or i == len(scores) - 1:
table_data.append(row)
row = []
return table_data


def inspect_scores(
file_path: str = "scores/validator_scores.pt", show_all: bool = False
):
try:
scores = torch.load(file_path)

# Print Summary
print(colored("\n=== Scores Summary ===", "blue", attrs=["bold"]))
print(f"Total UIDs: {len(scores)}")
print(f"Data type: {scores.dtype}")
print(f"Device: {scores.device}")

# Print Statistics
print(colored("\n=== Statistics ===", "blue", attrs=["bold"]))
print(f"Mean score: {scores.mean().item():.4f}")
print(f"Min score: {scores.min().item():.4f}")
print(f"Max score: {scores.max().item():.4f}")
print(
f"Non-zero UIDs: {torch.count_nonzero(scores).item()} "
f"({(torch.count_nonzero(scores).item()/len(scores)*100):.1f}%)"
)

# Print Top Scores
top_k = 10 # Show top 10
values, indices = torch.topk(scores, k=min(top_k, len(scores)))
print(colored("\n=== Top 10 Scores ===", "blue", attrs=["bold"]))
top_scores = [
[f"UID {idx}", f"{val.item():.4f}"] for idx, val in zip(indices, values)
]
print(tabulate(top_scores, headers=["UID", "Score"], tablefmt="simple"))

if show_all:
print(colored("\n=== All Scores ===", "blue", attrs=["bold"]))
table_data = format_score_table(scores)
for row in table_data:
# headers = [f"UID {i[0]}" for i in row]
values = [f"UID {i[0]} - {i[1]}" for i in row]
print(tabulate([values], tablefmt="simple"))

print("\nNote: Green values indicate non-zero scores")

except FileNotFoundError:
print(colored(f"Score file not found at {file_path}", "red"))
except Exception as e:
print(colored(f"Error reading scores: {e}", "red"))


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--all", action="store_true", help="Show all scores")
args = parser.parse_args()
inspect_scores(show_all=args.all)
33 changes: 17 additions & 16 deletions simulator/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ def __init__(self):
host = os.getenv("REDIS_HOST", "localhost")
port = int(os.getenv("REDIS_PORT", 6379))
self.redis_client = redis.Redis(
host=host,
port=port,
db=0,
decode_responses=True
host=host, port=port, db=0, decode_responses=True
)
logger.info("Redis connection established")

Expand All @@ -42,12 +39,14 @@ def __init__(self):
def _configure_simulation(self):
"""Configure simulation parameters with environment variables or defaults."""
self.response_behaviors = {
'normal': float(os.getenv("SIM_NORMAL_RESP_PROB", 0.8)),
'no_response': float(os.getenv("SIM_NO_RESP_PROB", 0.1)),
'timeout': float(os.getenv("SIM_TIMEOUT_PROB", 0.1))
"normal": float(os.getenv("SIM_NORMAL_RESP_PROB", 0.8)),
"no_response": float(os.getenv("SIM_NO_RESP_PROB", 0.1)),
"timeout": float(os.getenv("SIM_TIMEOUT_PROB", 0.1)),
}

async def forward_feedback_request(self, synapse: FeedbackRequest) -> FeedbackRequest:
async def forward_feedback_request(
self, synapse: FeedbackRequest
) -> FeedbackRequest:
try:
# Validate that synapse, dendrite, dendrite.hotkey, and response are not None
if not synapse or not synapse.dendrite or not synapse.dendrite.hotkey:
Expand All @@ -69,7 +68,7 @@ async def forward_feedback_request(self, synapse: FeedbackRequest) -> FeedbackRe
self.redis_client.set(
redis_key,
new_synapse.model_dump_json(),
ex=86400 # expire after 24 hours
ex=86400, # expire after 24 hours
)
logger.info(f"Stored feedback request {synapse.request_id}")

Expand All @@ -81,7 +80,9 @@ async def forward_feedback_request(self, synapse: FeedbackRequest) -> FeedbackRe
traceback.print_exc()
return synapse

async def forward_task_result_request(self, synapse: TaskResultRequest) -> TaskResultRequest | None:
async def forward_task_result_request(
self, synapse: TaskResultRequest
) -> TaskResultRequest | None:
try:
logger.info(f"Received TaskResultRequest for task id: {synapse.task_id}")
if not synapse or not synapse.task_id:
Expand All @@ -91,9 +92,9 @@ async def forward_task_result_request(self, synapse: TaskResultRequest) -> TaskR
# Simulate different response behaviors
behavior = self._get_response_behavior()

if behavior in ['no_response', 'timeout']:
if behavior in ["no_response", "timeout"]:
logger.debug(f"Simulating {behavior} for task {synapse.task_id}")
if behavior == 'timeout':
if behavior == "timeout":
await asyncio.sleep(30)
return None

Expand All @@ -113,17 +114,17 @@ async def forward_task_result_request(self, synapse: TaskResultRequest) -> TaskR
for criteria_type in feedback_request.criteria_types:
result = Result(
type=criteria_type.type,
value=self._generate_scores(feedback_request.ground_truth)
value=self._generate_scores(feedback_request.ground_truth),
)

task_result = TaskResult(
id=get_new_uuid(),
status='COMPLETED',
status="COMPLETED",
created_at=current_time,
updated_at=current_time,
result_data=[result],
worker_id=get_new_uuid(),
task_id=synapse.task_id
task_id=synapse.task_id,
)
task_results.append(task_result)

Expand All @@ -144,7 +145,7 @@ def _get_response_behavior(self) -> str:
"""Determine the response behavior based on configured probabilities."""
return random.choices(
list(self.response_behaviors.keys()),
weights=list(self.response_behaviors.values())
weights=list(self.response_behaviors.values()),
)[0]

def _generate_scores(self, ground_truth: dict) -> dict:
Expand Down
Loading