diff --git a/.env.api.example b/.env.api.example index d2a046d7..eaf06982 100644 --- a/.env.api.example +++ b/.env.api.example @@ -1,3 +1,5 @@ -API_PORT = "8005" -API_HOST = "0.0.0.0" -# SCORING_KEY = "YOUR_SCORING_API_KEY_GOES_HERE" +API_PORT = "42170" # Port for the API server +API_HOST = "0.0.0.0" # Host for the API server +SCORING_KEY = "123" # The scoring key for the validator (must match the scoring key in the .env.validator file) +SCORE_ORGANICS = True # Whether to score organics +VALIDATOR_API = "0.0.0.0:8094" # The validator API to forward responses to for scoring \ No newline at end of file diff --git a/.gitignore b/.gitignore index 0f23e623..3655df69 100644 --- a/.gitignore +++ b/.gitignore @@ -183,3 +183,4 @@ wandb .vscode api_keys.json prompting/api/api_keys.json +weights.csv diff --git a/api.config.js b/api.config.js new file mode 100644 index 00000000..b0308f84 --- /dev/null +++ b/api.config.js @@ -0,0 +1,12 @@ +module.exports = { + apps: [ + { + name: 'api_server', + script: 'poetry', + interpreter: 'none', + args: ['run', 'python', 'validator_api/api.py'], + min_uptime: '5m', + max_restarts: 5 + } + ] +}; diff --git a/api_keys.json b/api_keys.json index 0967ef42..9e26dfee 100644 --- a/api_keys.json +++ b/api_keys.json @@ -1 +1 @@ -{} +{} \ No newline at end of file diff --git a/neurons/validator.py b/neurons/validator.py index a0c9c782..e5a6f6ac 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -8,7 +8,8 @@ import multiprocessing as mp import time -from loguru import logger +import loguru +import torch from prompting.api.api import start_scoring_api from prompting.llms.model_manager import model_scheduler @@ -20,48 +21,116 @@ from prompting.weight_setting.weight_setter import weight_setter from shared.profiling import profiler +# Add a handler to write logs to a file +loguru.logger.add("logfile.log", rotation="1000 MB", retention="10 days", level="DEBUG") +from loguru import logger + +torch.multiprocessing.set_start_method("spawn", force=True) + NEURON_SAMPLE_SIZE = 100 +def create_loop_process(task_queue, scoring_queue, reward_events): + async def spawn_loops(task_queue, scoring_queue, reward_events): + logger.info("Starting Profiler...") + asyncio.create_task(profiler.print_stats(), name="Profiler"), + logger.info("Starting ModelScheduler...") + asyncio.create_task(model_scheduler.start(scoring_queue), name="ModelScheduler"), + logger.info("Starting TaskScorer...") + asyncio.create_task(task_scorer.start(scoring_queue, reward_events), name="TaskScorer"), + logger.info("Starting WeightSetter...") + asyncio.create_task(weight_setter.start(reward_events)) + + # Main monitoring loop + start = time.time() + + logger.info("Starting Main Monitoring Loop...") + while True: + await asyncio.sleep(5) + current_time = time.time() + time_diff = current_time - start + start = current_time + + # Check if all tasks are still running + logger.debug(f"Running {time_diff:.2f} seconds") + logger.debug(f"Number of tasks in Task Queue: {len(task_queue)}") + logger.debug(f"Number of tasks in Scoring Queue: {len(scoring_queue)}") + logger.debug(f"Number of tasks in Reward Events: {len(reward_events)}") + + asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events)) + + +def start_api(): + async def start(): + await start_scoring_api() + while True: + await asyncio.sleep(10) + logger.debug("Running API...") + + asyncio.run(start()) + + +def create_task_loop(task_queue, scoring_queue): + async def start(task_queue, scoring_queue): + logger.info("Starting AvailabilityCheckingLoop...") + asyncio.create_task(availability_checking_loop.start()) + + logger.info("Starting TaskSender...") + asyncio.create_task(task_sender.start(task_queue, scoring_queue)) + + logger.info("Starting TaskLoop...") + asyncio.create_task(task_loop.start(task_queue, scoring_queue)) + while True: + await asyncio.sleep(10) + logger.debug("Running task loop...") + + asyncio.run(start(task_queue, scoring_queue)) + + async def main(): # will start checking the availability of miners at regular intervals, needed for API and Validator - asyncio.create_task(availability_checking_loop.start()) - - if shared_settings.DEPLOY_SCORING_API: - # Use multiprocessing to bypass API blocking issue. - api_process = mp.Process(target=lambda: asyncio.run(start_scoring_api())) - api_process.start() - - GPUInfo.log_gpu_info() - # start profiling - asyncio.create_task(profiler.print_stats()) - - # start rotating LLM models - asyncio.create_task(model_scheduler.start()) - - # start creating tasks - asyncio.create_task(task_loop.start()) - - # will start checking the availability of miners at regular intervals - asyncio.create_task(availability_checking_loop.start()) - - # start sending tasks to miners - asyncio.create_task(task_sender.start()) - - # sets weights at regular intervals (synchronised between all validators) - asyncio.create_task(weight_setter.start()) - - # start scoring tasks in separate loop - asyncio.create_task(task_scorer.start()) - # # TODO: Think about whether we want to store the task queue locally in case of a crash - # # TODO: Possibly run task scorer & model scheduler with a lock so I don't unload a model whilst it's generating - # # TODO: Make weight setting happen as specific intervals as we load/unload models - start = time.time() - await asyncio.sleep(60) - while True: - await asyncio.sleep(5) - time_diff = -start + (start := time.time()) - logger.debug(f"Running {time_diff:.2f} seconds") + with torch.multiprocessing.Manager() as manager: + reward_events = manager.list() + scoring_queue = manager.list() + task_queue = manager.list() + + # Create process pool for managed processes + processes = [] + + try: + # # Start checking the availability of miners at regular intervals + + if shared_settings.DEPLOY_SCORING_API: + # Use multiprocessing to bypass API blocking issue + api_process = mp.Process(target=start_api, name="API_Process") + api_process.start() + processes.append(api_process) + + loop_process = mp.Process( + target=create_loop_process, args=(task_queue, scoring_queue, reward_events), name="LoopProcess" + ) + task_loop_process = mp.Process( + target=create_task_loop, args=(task_queue, scoring_queue), name="TaskLoopProcess" + ) + loop_process.start() + task_loop_process.start() + processes.append(loop_process) + processes.append(task_loop_process) + GPUInfo.log_gpu_info() + + while True: + await asyncio.sleep(10) + logger.debug("Running...") + + except Exception as e: + logger.error(f"Main loop error: {e}") + raise + finally: + # Clean up processes + for process in processes: + if process.is_alive(): + process.terminate() + process.join() # The main function parses the configuration and runs the validator. diff --git a/poetry.lock b/poetry.lock index ca9c9733..07fc3c60 100644 --- a/poetry.lock +++ b/poetry.lock @@ -968,13 +968,13 @@ files = [ [[package]] name = "click" -version = "8.1.7" +version = "8.1.8" description = "Composable command line interface toolkit" optional = false python-versions = ">=3.7" files = [ - {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"}, - {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"}, + {file = "click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2"}, + {file = "click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a"}, ] [package.dependencies] @@ -1953,13 +1953,13 @@ test = ["flaky", "ipyparallel", "pre-commit", "pytest (>=7.0)", "pytest-asyncio [[package]] name = "ipython" -version = "8.30.0" +version = "8.31.0" description = "IPython: Productive Interactive Computing" optional = false python-versions = ">=3.10" files = [ - {file = "ipython-8.30.0-py3-none-any.whl", hash = "sha256:85ec56a7e20f6c38fce7727dcca699ae4ffc85985aa7b23635a8008f918ae321"}, - {file = "ipython-8.30.0.tar.gz", hash = "sha256:cb0a405a306d2995a5cbb9901894d240784a9f341394c6ba3f4fe8c6eb89ff6e"}, + {file = "ipython-8.31.0-py3-none-any.whl", hash = "sha256:46ec58f8d3d076a61d128fe517a51eb730e3aaf0c184ea8c17d16e366660c6a6"}, + {file = "ipython-8.31.0.tar.gz", hash = "sha256:b6a2274606bec6166405ff05e54932ed6e5cfecaca1fc05f2cacde7bb074d70b"}, ] [package.dependencies] @@ -2024,13 +2024,13 @@ testing = ["Django", "attrs", "colorama", "docopt", "pytest (<9.0.0)"] [[package]] name = "jinja2" -version = "3.1.4" +version = "3.1.5" description = "A very fast and expressive template engine." optional = false python-versions = ">=3.7" files = [ - {file = "jinja2-3.1.4-py3-none-any.whl", hash = "sha256:bc5dd2abb727a5319567b7a813e6a2e7318c39f4f487cfe6c89c6f9c7d25197d"}, - {file = "jinja2-3.1.4.tar.gz", hash = "sha256:4a3aee7acbbe7303aede8e9648d13b8bf88a429282aa6122a993f0ac800cb369"}, + {file = "jinja2-3.1.5-py3-none-any.whl", hash = "sha256:aba0f4dc9ed8013c424088f68a5c226f7d6097ed89b246d7749c2ec4175c6adb"}, + {file = "jinja2-3.1.5.tar.gz", hash = "sha256:8fefff8dc3034e27bb80d67c671eb8a9bc424c0ef4c0826edbff304cceff43bb"}, ] [package.dependencies] @@ -3679,32 +3679,32 @@ files = [ [[package]] name = "psutil" -version = "6.1.0" +version = "6.1.1" description = "Cross-platform lib for process and system monitoring in Python." optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" files = [ - {file = "psutil-6.1.0-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:ff34df86226c0227c52f38b919213157588a678d049688eded74c76c8ba4a5d0"}, - {file = "psutil-6.1.0-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:c0e0c00aa18ca2d3b2b991643b799a15fc8f0563d2ebb6040f64ce8dc027b942"}, - {file = "psutil-6.1.0-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:000d1d1ebd634b4efb383f4034437384e44a6d455260aaee2eca1e9c1b55f047"}, - {file = "psutil-6.1.0-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:5cd2bcdc75b452ba2e10f0e8ecc0b57b827dd5d7aaffbc6821b2a9a242823a76"}, - {file = "psutil-6.1.0-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:045f00a43c737f960d273a83973b2511430d61f283a44c96bf13a6e829ba8fdc"}, - {file = "psutil-6.1.0-cp27-none-win32.whl", hash = "sha256:9118f27452b70bb1d9ab3198c1f626c2499384935aaf55388211ad982611407e"}, - {file = "psutil-6.1.0-cp27-none-win_amd64.whl", hash = "sha256:a8506f6119cff7015678e2bce904a4da21025cc70ad283a53b099e7620061d85"}, - {file = "psutil-6.1.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:6e2dcd475ce8b80522e51d923d10c7871e45f20918e027ab682f94f1c6351688"}, - {file = "psutil-6.1.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:0895b8414afafc526712c498bd9de2b063deaac4021a3b3c34566283464aff8e"}, - {file = "psutil-6.1.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9dcbfce5d89f1d1f2546a2090f4fcf87c7f669d1d90aacb7d7582addece9fb38"}, - {file = "psutil-6.1.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:498c6979f9c6637ebc3a73b3f87f9eb1ec24e1ce53a7c5173b8508981614a90b"}, - {file = "psutil-6.1.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d905186d647b16755a800e7263d43df08b790d709d575105d419f8b6ef65423a"}, - {file = "psutil-6.1.0-cp36-cp36m-win32.whl", hash = "sha256:6d3fbbc8d23fcdcb500d2c9f94e07b1342df8ed71b948a2649b5cb060a7c94ca"}, - {file = "psutil-6.1.0-cp36-cp36m-win_amd64.whl", hash = "sha256:1209036fbd0421afde505a4879dee3b2fd7b1e14fee81c0069807adcbbcca747"}, - {file = "psutil-6.1.0-cp37-abi3-win32.whl", hash = "sha256:1ad45a1f5d0b608253b11508f80940985d1d0c8f6111b5cb637533a0e6ddc13e"}, - {file = "psutil-6.1.0-cp37-abi3-win_amd64.whl", hash = "sha256:a8fb3752b491d246034fa4d279ff076501588ce8cbcdbb62c32fd7a377d996be"}, - {file = "psutil-6.1.0.tar.gz", hash = "sha256:353815f59a7f64cdaca1c0307ee13558a0512f6db064e92fe833784f08539c7a"}, + {file = "psutil-6.1.1-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:9ccc4316f24409159897799b83004cb1e24f9819b0dcf9c0b68bdcb6cefee6a8"}, + {file = "psutil-6.1.1-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:ca9609c77ea3b8481ab005da74ed894035936223422dc591d6772b147421f777"}, + {file = "psutil-6.1.1-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:8df0178ba8a9e5bc84fed9cfa61d54601b371fbec5c8eebad27575f1e105c0d4"}, + {file = "psutil-6.1.1-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:1924e659d6c19c647e763e78670a05dbb7feaf44a0e9c94bf9e14dfc6ba50468"}, + {file = "psutil-6.1.1-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:018aeae2af92d943fdf1da6b58665124897cfc94faa2ca92098838f83e1b1bca"}, + {file = "psutil-6.1.1-cp27-none-win32.whl", hash = "sha256:6d4281f5bbca041e2292be3380ec56a9413b790579b8e593b1784499d0005dac"}, + {file = "psutil-6.1.1-cp27-none-win_amd64.whl", hash = "sha256:c777eb75bb33c47377c9af68f30e9f11bc78e0f07fbf907be4a5d70b2fe5f030"}, + {file = "psutil-6.1.1-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:fc0ed7fe2231a444fc219b9c42d0376e0a9a1a72f16c5cfa0f68d19f1a0663e8"}, + {file = "psutil-6.1.1-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:0bdd4eab935276290ad3cb718e9809412895ca6b5b334f5a9111ee6d9aff9377"}, + {file = "psutil-6.1.1-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b6e06c20c05fe95a3d7302d74e7097756d4ba1247975ad6905441ae1b5b66003"}, + {file = "psutil-6.1.1-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:97f7cb9921fbec4904f522d972f0c0e1f4fabbdd4e0287813b21215074a0f160"}, + {file = "psutil-6.1.1-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33431e84fee02bc84ea36d9e2c4a6d395d479c9dd9bba2376c1f6ee8f3a4e0b3"}, + {file = "psutil-6.1.1-cp36-cp36m-win32.whl", hash = "sha256:384636b1a64b47814437d1173be1427a7c83681b17a450bfc309a1953e329603"}, + {file = "psutil-6.1.1-cp36-cp36m-win_amd64.whl", hash = "sha256:8be07491f6ebe1a693f17d4f11e69d0dc1811fa082736500f649f79df7735303"}, + {file = "psutil-6.1.1-cp37-abi3-win32.whl", hash = "sha256:eaa912e0b11848c4d9279a93d7e2783df352b082f40111e078388701fd479e53"}, + {file = "psutil-6.1.1-cp37-abi3-win_amd64.whl", hash = "sha256:f35cfccb065fff93529d2afb4a2e89e363fe63ca1e4a5da22b603a85833c2649"}, + {file = "psutil-6.1.1.tar.gz", hash = "sha256:cf8496728c18f2d0b45198f06895be52f36611711746b7f30c464b422b50e2f5"}, ] [package.extras] -dev = ["black", "check-manifest", "coverage", "packaging", "pylint", "pyperf", "pypinfo", "pytest-cov", "requests", "rstcheck", "ruff", "sphinx", "sphinx_rtd_theme", "toml-sort", "twine", "virtualenv", "wheel"] +dev = ["abi3audit", "black", "check-manifest", "coverage", "packaging", "pylint", "pyperf", "pypinfo", "pytest-cov", "requests", "rstcheck", "ruff", "sphinx", "sphinx_rtd_theme", "toml-sort", "twine", "virtualenv", "vulture", "wheel"] test = ["pytest", "pytest-xdist", "setuptools"] [[package]] @@ -5937,13 +5937,13 @@ devenv = ["check-manifest", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3) [[package]] name = "urllib3" -version = "2.2.3" +version = "2.3.0" description = "HTTP library with thread-safe connection pooling, file post, and more." optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" files = [ - {file = "urllib3-2.2.3-py3-none-any.whl", hash = "sha256:ca899ca043dcb1bafa3e262d73aa25c465bfb49e0bd9dd5d59f1d0acba2f8fac"}, - {file = "urllib3-2.2.3.tar.gz", hash = "sha256:e7d814a81dad81e6caf2ec9fdedb284ecc9c73076b62654547cc64ccdcae26e9"}, + {file = "urllib3-2.3.0-py3-none-any.whl", hash = "sha256:1cee9ad369867bfdbbb48b7dd50374c0967a0bb7710050facf0dd6911440e3df"}, + {file = "urllib3-2.3.0.tar.gz", hash = "sha256:f8c5449b3cf0861679ce7e0503c7b44b5ec981bec0d1d3795a07f1ba96f0204d"}, ] [package.extras] diff --git a/prompting/api/api.py b/prompting/api/api.py index a3038581..63678ab7 100644 --- a/prompting/api/api.py +++ b/prompting/api/api.py @@ -18,7 +18,7 @@ def health(): async def start_scoring_api(): - logger.info("Starting API...") + logger.info(f"Starting Scoring API on https://0.0.0.0:{shared_settings.SCORING_API_PORT}") uvicorn.run( "prompting.api.api:app", host="0.0.0.0", port=shared_settings.SCORING_API_PORT, loop="asyncio", reload=False ) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index a2492db2..ee18ade1 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -1,7 +1,8 @@ import uuid from typing import Any -from fastapi import APIRouter, Request +from fastapi import APIRouter, Depends, Header, HTTPException, Request +from loguru import logger from prompting.llms.model_zoo import ModelZoo from prompting.rewards.scoring import task_scorer @@ -14,10 +15,25 @@ router = APIRouter() +def validate_scoring_key(api_key: str = Header(...)): + if api_key != shared_settings.SCORING_KEY: + raise HTTPException(status_code=403, detail="Invalid API key") + + @router.post("/scoring") -async def score_response(request: Request): # , api_key_data: dict = Depends(validate_api_key)): +async def score_response(request: Request, api_key_data: dict = Depends(validate_scoring_key)): + model = None payload: dict[str, Any] = await request.json() body = payload.get("body") + + try: + if body.get("model") is not None: + model = ModelZoo.get_model_by_id(body.get("model")) + except Exception: + logger.warning( + f"Organic request with model {body.get('model')} made but the model cannot be found in model zoo. Skipping scoring." + ) + return uid = int(payload.get("uid")) chunks = payload.get("chunks") llm_model = ModelZoo.get_model_by_id(model) if (model := body.get("model")) else None @@ -39,3 +55,4 @@ async def score_response(request: Request): # , api_key_data: dict = Depends(va step=-1, task_id=str(uuid.uuid4()), ) + logger.info("Organic tas appended to scoring queue") diff --git a/prompting/llms/hf_llm.py b/prompting/llms/hf_llm.py index ad5d54c8..934071e3 100644 --- a/prompting/llms/hf_llm.py +++ b/prompting/llms/hf_llm.py @@ -15,8 +15,8 @@ def __init__(self, model_id="hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4 Initialize Hugging Face model with reproducible settings and optimizations """ # Create a random seed for reproducibility - self.seed = random.randint(0, 1_000_000) - self.set_random_seeds(self.seed) + # self.seed = random.randint(0, 1_000_000) + # self.set_random_seeds(self.seed) self.model: PreTrainedModel = AutoModelForCausalLM.from_pretrained( model_id, torch_dtype=torch.float16, @@ -65,9 +65,12 @@ def generate(self, messages: list[str] | list[dict], sampling_params=None, seed= )[0] logger.debug( - f"PROMPT: {messages}\n\nRESPONSES: {results}\n\n" - f"SAMPLING PARAMS: {params}\n\n" - f"TIME FOR RESPONSE: {timer.elapsed_time}" + f"""REPRODUCIBLEHF WAS QUERIED: + PROMPT: {messages}\n\n + RESPONSES: {results}\n\n + SAMPLING PARAMS: {params}\n\n + SEED: {seed}\n\n + TIME FOR RESPONSE: {timer.elapsed_time}""" ) return results if len(results) > 1 else results[0] diff --git a/prompting/llms/model_manager.py b/prompting/llms/model_manager.py index b5d236c8..b0f516af 100644 --- a/prompting/llms/model_manager.py +++ b/prompting/llms/model_manager.py @@ -9,7 +9,6 @@ from prompting.llms.hf_llm import ReproducibleHF from prompting.llms.model_zoo import ModelConfig, ModelZoo from prompting.llms.utils import GPUInfo -from prompting.mutable_globals import scoring_queue from shared.loop_runner import AsyncLoopRunner from shared.settings import shared_settings @@ -158,6 +157,11 @@ def generate( class AsyncModelScheduler(AsyncLoopRunner): llm_model_manager: ModelManager interval: int = 14400 + scoring_queue: list | None = None + + async def start(self, scoring_queue: list): + self.scoring_queue = scoring_queue + return await super().start() async def initialise_loop(self): model_manager.load_always_active_models() @@ -165,7 +169,7 @@ async def initialise_loop(self): async def run_step(self): """This method is called periodically according to the interval.""" # try to load the model belonging to the oldest task in the queue - selected_model = scoring_queue[0].task.llm_model if scoring_queue else None + selected_model = self.scoring_queue[0].task.llm_model if self.scoring_queue else None if not selected_model: selected_model = ModelZoo.get_random(max_ram=self.llm_model_manager.total_ram) logger.info(f"Loading model {selected_model.llm_model_id} for {self.interval} seconds.") @@ -174,7 +178,7 @@ async def run_step(self): logger.info(f"Model {selected_model.llm_model_id} is already loaded.") return - logger.debug(f"Active models: {model_manager.active_models.keys()}") + logger.debug(f"Active models: {self.llm_model_manager.active_models.keys()}") # Load the selected model loop = asyncio.get_running_loop() await loop.run_in_executor(None, self.llm_model_manager.load_model, selected_model) diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index 7a1a6285..1ba9d3cd 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -5,7 +5,6 @@ from loguru import logger from pydantic import ConfigDict -from prompting import mutable_globals from prompting.llms.model_manager import model_manager, model_scheduler from prompting.tasks.base_task import BaseTextTask from prompting.tasks.task_registry import TaskRegistry @@ -33,9 +32,16 @@ class TaskScorer(AsyncLoopRunner): is_running: bool = False thread: threading.Thread = None interval: int = 10 + scoring_queue: list | None = None + reward_events: list | None = None model_config = ConfigDict(arbitrary_types_allowed=True) + async def start(self, scoring_queue, reward_events): + self.scoring_queue = scoring_queue + self.reward_events = reward_events + return await super().start() + def add_to_queue( self, task: BaseTextTask, @@ -45,7 +51,7 @@ def add_to_queue( step: int, task_id: str, ) -> None: - mutable_globals.scoring_queue.append( + self.scoring_queue.append( ScoringConfig( task=task, response=response, @@ -55,26 +61,24 @@ def add_to_queue( task_id=task_id, ) ) - logger.debug( - f"SCORING: Added to queue: {task.__class__.__name__}. Queue size: {len(mutable_globals.scoring_queue)}" - ) + logger.debug(f"SCORING: Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") async def run_step(self) -> RewardLoggingEvent: await asyncio.sleep(0.1) # Only score responses for which the model is loaded scorable = [ scoring_config - for scoring_config in mutable_globals.scoring_queue + for scoring_config in self.scoring_queue if (scoring_config.task.llm_model in model_manager.active_models.keys()) or (scoring_config.task.llm_model is None) ] if len(scorable) == 0: logger.debug("Nothing to score. Skipping scoring step.") # Run a model_scheduler step to load a new model as there are no more tasks to be scored - if len(mutable_globals.scoring_queue) > 0: + if len(self.scoring_queue) > 0: await model_scheduler.run_step() return - mutable_globals.scoring_queue.remove(scorable[0]) + self.scoring_queue.remove(scorable[0]) scoring_config: ScoringConfig = scorable.pop(0) # here we generate the actual reference @@ -94,7 +98,7 @@ async def run_step(self) -> RewardLoggingEvent: model_id=scoring_config.task.llm_model, task=scoring_config.task, ) - mutable_globals.reward_events.append(reward_events) + self.reward_events.append(reward_events) logger.debug( f"REFERENCE: {scoring_config.task.reference}\n\n||||RESPONSES: {scoring_config.response.completions}" ) @@ -111,6 +115,7 @@ async def run_step(self) -> RewardLoggingEvent: block=scoring_config.block, step=scoring_config.step, task_id=scoring_config.task_id, + task_dict=scoring_config.task.model_dump(), ) ) logger.info("Adding scores to rewards_and_uids") diff --git a/prompting/tasks/base_task.py b/prompting/tasks/base_task.py index 404a912e..9434df94 100644 --- a/prompting/tasks/base_task.py +++ b/prompting/tasks/base_task.py @@ -51,7 +51,7 @@ class BaseTextTask(BaseTask): reference: str | None = None llm_model: ModelConfig = None llm_model_id: str = None - seed: str = None + seed: int = Field(default_factory=lambda: random.randint(0, 1000000), allow_mutation=False) query_system_prompt: ClassVar[str | None] = None reference_system_prompt: ClassVar[str | None] = None augmentation_system_prompt: ClassVar[str | None] = None @@ -63,7 +63,6 @@ class BaseTextTask(BaseTask): def get_model_id_and_seed(self) -> "BaseTextTask": if self.llm_model: self.llm_model_id = self.llm_model.llm_model_id if self.llm_model else None - self.seed = random.randint(0, 1000000) return self def make_query(self, dataset_entry: DatasetEntry, **kwargs) -> str: @@ -90,7 +89,7 @@ def generate_query( """Generates a query to be used for generating the challenge""" logger.info("🤖 Generating query...") llm_messages = [LLMMessage(role="system", content=self.query_system_prompt)] if self.query_system_prompt else [] - llm_messages += [LLMMessage(role="user", content=message) for message in messages] + llm_messages.extend([LLMMessage(role="user", content=message) for message in messages]) self.query = LLMWrapper.chat_complete(messages=LLMMessages(*llm_messages)) diff --git a/prompting/tasks/inference.py b/prompting/tasks/inference.py index 84295ee9..07f1100c 100644 --- a/prompting/tasks/inference.py +++ b/prompting/tasks/inference.py @@ -38,7 +38,7 @@ class InferenceTask(BaseTextTask): reference: str | None = None llm_model: ModelConfig | None = None llm_model_id: ModelConfig | None = random.choice(ModelZoo.models_configs).llm_model_id - seed: int = Field(default_factory=lambda: random.randint(0, 1_000_000)) + seed: int = Field(default_factory=lambda: random.randint(0, 1_000_000), allow_mutation=False) sampling_params: dict[str, float] = shared_settings.SAMPLING_PARAMS @model_validator(mode="after") diff --git a/prompting/tasks/task_creation.py b/prompting/tasks/task_creation.py index c215a75e..21b1d754 100644 --- a/prompting/tasks/task_creation.py +++ b/prompting/tasks/task_creation.py @@ -5,7 +5,6 @@ from pydantic import ConfigDict from prompting.miner_availability.miner_availability import miner_availabilities -from prompting.mutable_globals import scoring_queue, task_queue from prompting.tasks.task_registry import TaskRegistry from shared.logging import ErrorLoggingEvent, ValidatorLoggingEvent from shared.loop_runner import AsyncLoopRunner @@ -18,14 +17,20 @@ class TaskLoop(AsyncLoopRunner): is_running: bool = False thread: threading.Thread = None interval: int = 10 - + task_queue: list | None = [] + scoring_queue: list | None = [] model_config = ConfigDict(arbitrary_types_allowed=True) + async def start(self, task_queue, scoring_queue): + self.task_queue = task_queue + self.scoring_queue = scoring_queue + await super().start() + async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: - if len(task_queue) > shared_settings.TASK_QUEUE_LENGTH_THRESHOLD: + if len(self.task_queue) > shared_settings.TASK_QUEUE_LENGTH_THRESHOLD: logger.debug("Task queue is full. Skipping task generation.") return None - if len(scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD: + if len(self.scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD: logger.debug("Scoring queue is full. Skipping task generation.") return None await asyncio.sleep(0.1) @@ -55,7 +60,9 @@ async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: if not task.query: logger.debug(f"Generating query for task: {task.__class__.__name__}.") task.make_query(dataset_entry=dataset_entry) - task_queue.append(task) + + logger.debug(f"Appending task: {task.__class__.__name__} to task queue.") + self.task_queue.append(task) except Exception as ex: logger.exception(ex) return None diff --git a/prompting/tasks/task_sending.py b/prompting/tasks/task_sending.py index 7288e4f6..659ed814 100644 --- a/prompting/tasks/task_sending.py +++ b/prompting/tasks/task_sending.py @@ -3,19 +3,19 @@ import time from typing import List +import bittensor as bt from loguru import logger -from prompting import mutable_globals from prompting.miner_availability.miner_availability import miner_availabilities -from prompting.mutable_globals import scoring_queue -from prompting.rewards.scoring import task_scorer + +# from prompting.rewards.scoring import task_scorer +from prompting.rewards.scoring import ScoringConfig from prompting.tasks.base_task import BaseTextTask from prompting.tasks.inference import InferenceTask from shared.dendrite import DendriteResponseEvent, SynapseStreamResult from shared.epistula import query_miners from shared.logging import ErrorLoggingEvent, ValidatorLoggingEvent from shared.loop_runner import AsyncLoopRunner -from shared.misc import ttl_get_block from shared.settings import shared_settings from shared.timer import Timer @@ -61,6 +61,8 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: log_stream_results(stream_results) + logger.debug("🔍 Creating response event") + response_event = DendriteResponseEvent( stream_results=stream_results, uids=uids, @@ -68,6 +70,7 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: shared_settings.INFERENCE_TIMEOUT if isinstance(task, InferenceTask) else shared_settings.NEURON_TIMEOUT ), ) + logger.debug("🔍 Response event created") return response_event @@ -76,10 +79,25 @@ class TaskSender(AsyncLoopRunner): _lock: asyncio.Lock = asyncio.Lock() time_of_block_sync: float | None = None + task_queue: list | None = None + scoring_queue: list | None = None + subtensor: bt.Subtensor | None = None + + class Config: + arbitrary_types_allowed = True + + async def start(self, task_queue, scoring_queue): + self.task_queue = task_queue + self.scoring_queue = scoring_queue + + # shared_settings is not initialised inside this process, meaning it cannot access any non-constants from here + self.subtensor = bt.subtensor(network=shared_settings.SUBTENSOR_NETWORK) + return await super().start() + @property def block(self): self.time_of_block_sync = time.time() - return ttl_get_block() + return self.subtensor.get_current_block() @property def estimate_block(self): @@ -101,9 +119,7 @@ def estimate_block(self): return estimated_block - async def run_step( - self, k: int = shared_settings.ORGANIC_SAMPLE_SIZE, timeout: float = shared_settings.NEURON_TIMEOUT - ) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: + async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: """Executes a single step of the agent, which consists of: - Getting a list of uids to query - Querying the network @@ -118,16 +134,16 @@ async def run_step( timeout (float): The timeout for the queries. exclude (list, optional): The list of uids to exclude from the query. Defaults to []. """ - while len(scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD: + while len(self.scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD: logger.debug("Scoring queue is full. Waiting 1 second...") await asyncio.sleep(1) - while len(mutable_globals.task_queue) == 0: + while len(self.task_queue) == 0: logger.warning("No tasks in queue. Waiting 1 second...") await asyncio.sleep(1) try: # get task from the task queue - mutable_globals.task_queue: list[BaseTextTask] - task = mutable_globals.task_queue.pop(0) + self.task_queue: list[BaseTextTask] + task = self.task_queue.pop(0) # send the task to the miners and collect the responses with Timer() as timer: @@ -135,17 +151,22 @@ async def run_step( if response_event is None: logger.warning("No response event collected. This should not be happening.") return - logger.debug(f"Collected responses in {timer.final_time:.2f} seconds") - # scoring_manager will score the responses as and when the correct model is loaded - task_scorer.add_to_queue( + logger.debug("🔍 Estimating block") + estimated_block = self.estimate_block + logger.debug("🔍 Creating scoring config") + + scoring_config = ScoringConfig( task=task, response=response_event, dataset_entry=task.dataset_entry, - block=self.estimate_block, + block=estimated_block, step=self.step, task_id=task.task_id, ) + logger.debug(f"Collected responses in {timer.final_time:.2f} seconds") + self.scoring_queue.append(scoring_config) + logger.debug(f"SCORING: Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") # Log the step event. return ValidatorLoggingEvent( diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index 9db78cc6..d1e1a9de 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -6,7 +6,7 @@ import pandas as pd from loguru import logger -from prompting import __spec_version__, mutable_globals +from prompting import __spec_version__ from prompting.llms.model_zoo import ModelZoo from prompting.rewards.reward import WeightedRewardEvent from prompting.tasks.inference import InferenceTask @@ -17,17 +17,8 @@ from shared.settings import shared_settings FILENAME = "validator_weights.npz" - -try: - with np.load(FILENAME) as data: - PAST_WEIGHTS = [data[key] for key in data.files] - logger.debug(f"Loaded Past Weights: {PAST_WEIGHTS}") -except FileNotFoundError: - logger.info("No weights file found - this is expected on a new validator, starting with empty weights") - PAST_WEIGHTS = [] -except Exception as ex: - logger.error(f"Couldn't load weights from file: {ex}") WEIGHTS_HISTORY_LENGTH = 24 +PAST_WEIGHTS: list[np.ndarray] = [] def apply_reward_func(raw_rewards: np.ndarray, p=0.5): @@ -52,7 +43,9 @@ def save_weights(weights: list[np.ndarray]): np.savez_compressed(FILENAME, *weights) -def set_weights(weights: np.ndarray, step: int = 0): +def set_weights( + weights: np.ndarray, step: int = 0, subtensor: bt.Subtensor | None = None, metagraph: bt.Metagraph | None = None +): """ Sets the validator weights to the metagraph hotkeys based on the scores it has received from the miners. The weights determine the trust and incentive level the validator assigns to miner nodes on the network. """ @@ -80,8 +73,8 @@ def set_weights(weights: np.ndarray, step: int = 0): uids=shared_settings.METAGRAPH.uids, weights=averaged_weights, netuid=shared_settings.NETUID, - subtensor=shared_settings.SUBTENSOR, - metagraph=shared_settings.METAGRAPH, + subtensor=subtensor, + metagraph=metagraph, ) # Convert to uint16 weights and uids. @@ -124,7 +117,7 @@ def set_weights(weights: np.ndarray, step: int = 0): return # Set the weights on chain via our subtensor connection. - result = shared_settings.SUBTENSOR.set_weights( + result = subtensor.set_weights( wallet=shared_settings.WALLET, netuid=shared_settings.NETUID, uids=uint_uids, @@ -145,21 +138,42 @@ class WeightSetter(AsyncLoopRunner): sync: bool = True interval: int = 60 * 22 # set rewards every 20 minutes + reward_events: list[list[WeightedRewardEvent]] | None = None + subtensor: bt.Subtensor | None = None + metagraph: bt.Metagraph | None = None # interval: int = 60 + class Config: + arbitrary_types_allowed = True + + async def start(self, reward_events): + self.reward_events = reward_events + self.subtensor = bt.Subtensor(network=shared_settings.SUBTENSOR_NETWORK) + self.metagraph = self.subtensor.metagraph(netuid=shared_settings.NETUID) + global PAST_WEIGHTS + + try: + with np.load(FILENAME) as data: + PAST_WEIGHTS = [data[key] for key in data.files] + logger.debug(f"Loaded Past Weights: {PAST_WEIGHTS}") + except FileNotFoundError: + logger.info("No weights file found - this is expected on a new validator, starting with empty weights") + PAST_WEIGHTS = [] + except Exception as ex: + logger.error(f"Couldn't load weights from file: {ex}") + return await super().start() + async def run_step(self): await asyncio.sleep(0.01) try: logger.info("Reward setting loop running") - if len(mutable_globals.reward_events) == 0: + if len(self.reward_events) == 0: logger.warning("No reward events in queue, skipping weight setting...") return - logger.debug(f"Found {len(mutable_globals.reward_events)} reward events in queue") + logger.debug(f"Found {len(self.reward_events)} reward events in queue") # reward_events is a list of lists of WeightedRewardEvents - the 'sublists' each contain the multiple reward events for a single task - mutable_globals.reward_events: list[ - list[WeightedRewardEvent] - ] = mutable_globals.reward_events # to get correct typehinting + self.reward_events: list[list[WeightedRewardEvent]] = self.reward_events # to get correct typehinting # reward_dict = {uid: 0 for uid in get_uids(sampling_mode="all")} reward_dict = {uid: 0 for uid in range(1024)} @@ -174,7 +188,7 @@ async def run_step(self): logger.debug(f"Miner rewards before processing: {miner_rewards}") inference_events: list[WeightedRewardEvent] = [] - for reward_events in mutable_globals.reward_events: + for reward_events in self.reward_events: await asyncio.sleep(0.01) for reward_event in reward_events: if np.sum(reward_event.rewards) > 0: @@ -225,8 +239,8 @@ async def run_step(self): except Exception as ex: logger.exception(f"{ex}") # set weights on chain - set_weights(final_rewards, step=self.step) - mutable_globals.reward_events = [] # empty reward events queue + set_weights(final_rewards, step=self.step, subtensor=self.subtensor, metagraph=self.metagraph) + self.reward_events = [] # empty reward events queue await asyncio.sleep(0.01) return final_rewards diff --git a/pyproject.toml b/pyproject.toml index 21f4d70e..b24f97a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "prompting" -version = "2.15.1" +version = "2.15.2" description = "Subnetwork 1 runs on Bittensor and is maintained by Macrocosmos. It's an effort to create decentralised AI" authors = ["Kalei Brady, Dmytro Bobrenko, Felix Quinque, Steffen Cruz, Richard Wardle"] readme = "README.md" diff --git a/run_api.sh b/run_api.sh index 6614e20f..40dcc610 100644 --- a/run_api.sh +++ b/run_api.sh @@ -29,7 +29,7 @@ echo "module.exports = { name: 'api_server', script: 'poetry', interpreter: 'none', - args: ['run', 'python', 'api/api.py'], + args: ['run', 'python', 'validator_api/api.py'], min_uptime: '5m', max_restarts: 5 } diff --git a/shared/logging.py b/shared/logging.py index daf4ee28..f2fdbd15 100644 --- a/shared/logging.py +++ b/shared/logging.py @@ -180,6 +180,7 @@ class RewardLoggingEvent(BaseEvent): reference: str challenge: str task: str + task_dict: dict model_config = ConfigDict(arbitrary_types_allowed=True) diff --git a/shared/misc.py b/shared/misc.py index 1b2bbe7a..c26285ec 100644 --- a/shared/misc.py +++ b/shared/misc.py @@ -5,10 +5,10 @@ from math import floor from typing import Any, Callable +import bittensor as bt from loguru import logger from shared.exceptions import BittensorError -from shared.settings import shared_settings class classproperty: @@ -86,7 +86,7 @@ def _ttl_hash_gen(seconds: int): # 12 seconds updating block. @ttl_cache(maxsize=1, ttl=12) -def ttl_get_block() -> int: +def ttl_get_block(subtensor: bt.Subtensor | None = None) -> int: """ Retrieves the current block number from the blockchain. This method is cached with a time-to-live (TTL) of 12 seconds, meaning that it will only refresh the block number from the blockchain at most every 12 seconds, @@ -105,7 +105,7 @@ def ttl_get_block() -> int: Note: self here is the miner or validator instance """ try: - return shared_settings.SUBTENSOR.get_current_block() + return subtensor.get_current_block() except Exception as e: raise BittensorError(f"Bittensor error: {str(e)}") from e diff --git a/shared/settings.py b/shared/settings.py index a74e5615..1fb9f84d 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -164,6 +164,8 @@ def validate_mode(cls, v): logger.warning( "No SCORING_KEY found in .env.api file. You must add a scoring key that will allow us to forward miner responses to the validator for scoring." ) + if not os.getenv("SCORE_ORGANICS"): + logger.warning("Not scoring organics. This means that miners may not respond as consistently.") elif v["mode"] == "miner": if not dotenv.load_dotenv(".env.miner"): logger.warning("No .env.miner file found. Please create one.") @@ -210,7 +212,6 @@ def load(cls, mode: Literal["miner", "validator", "mock", "api"]) -> "SharedSett def complete_settings(cls, values: dict[str, Any]) -> dict[str, Any]: mode = values["mode"] netuid = values.get("NETUID", 61) - if netuid is None: raise ValueError("NETUID must be specified") values["TEST"] = netuid != 1 @@ -222,32 +223,6 @@ def complete_settings(cls, values: dict[str, Any]) -> dict[str, Any]: logger.info("Running in mock mode. Bittensor objects will not be initialized.") return values - # load slow packages only if not in mock mode - import torch - - if not values.get("NEURON_DEVICE"): - values["NEURON_DEVICE"] = "cuda" if torch.cuda.is_available() else "cpu" - - # Ensure SAVE_PATH exists. - save_path = values.get("SAVE_PATH", "./storage") - if not os.path.exists(save_path): - os.makedirs(save_path) - if values.get("SN19_API_KEY") is None or values.get("SN19_API_URL") is None: - logger.warning( - "It is strongly recommended to provide an SN19 API KEY + URL to avoid incurring OpenAI API costs." - ) - if mode == "validator": - if values.get("OPENAI_API_KEY") is None: - raise Exception( - "You must provide an OpenAI API key as a backup. It is recommended to also provide an SN19 API key + url to avoid incurring API costs." - ) - if values.get("SCORING_ADMIN_KEY") is None and values.get("DEPLOY_SCORING_API"): - logger.warning("You must provide a SCORING_ADMIN_KEY to access the API. Disabling scoring endpoint") - values["DEPLOY_SCORING_API"] = False - if values.get("PROXY_URL") is None: - logger.warning( - "You must provide a proxy URL to use the DuckDuckGo API - your vtrust might decrease if no DDG URL is provided." - ) return values @cached_property diff --git a/shared/uids.py b/shared/uids.py index ac49e007..ef494ba2 100644 --- a/shared/uids.py +++ b/shared/uids.py @@ -114,7 +114,7 @@ def get_top_incentive_uids(k: int, vpermit_tao_limit: int) -> np.ndarray: # Extract the top uids. top_k_uids = [uid for uid, incentive in uid_incentive_pairs_sorted[:k]] - return np.array(top_k_uids) + return np.array(top_k_uids).astype(int) def get_uids( @@ -125,7 +125,8 @@ def get_uids( ) -> np.ndarray: if shared_settings.TEST and shared_settings.TEST_MINER_IDS: return random.sample( - list(np.array(shared_settings.TEST_MINER_IDS)), min(len(shared_settings.TEST_MINER_IDS), k or 10**6) + list(np.array(shared_settings.TEST_MINER_IDS).astype(int)), + min(len(shared_settings.TEST_MINER_IDS), k or 10**6), ) if sampling_mode == "random": return get_random_uids(k=k, exclude=exclude or []) diff --git a/tests/prompting/test_weight_settings.py b/tests/prompting/test_weight_settings.py index cce49182..00b923a5 100644 --- a/tests/prompting/test_weight_settings.py +++ b/tests/prompting/test_weight_settings.py @@ -43,7 +43,7 @@ def test_run_step_with_reward_events(): with ( patch("shared.uids.get_uids") as mock_get_uids, patch("prompting.weight_setting.weight_setter.TaskRegistry") as MockTaskRegistry, - patch("prompting.weight_setting.weight_setter.mutable_globals") as mock_mutable_globals, + # patch("prompting.weight_setting.weight_setter.mutable_globals") as mock_mutable_globals, patch("prompting.weight_setting.weight_setter.set_weights") as mock_set_weights, patch("prompting.weight_setting.weight_setter.logger") as mock_logger, ): @@ -75,7 +75,9 @@ def __init__(self, task, uids, rewards, weight): mock_task_registry.get_task_config = MagicMock(return_value=mock_task_registry.task_configs[0]) # Set up the mock mutable_globals - mock_mutable_globals.reward_events = [ + + weight_setter = WeightSetter() + reward_events = [ [ WeightedRewardEvent( task=mock_task_registry.task_configs[0], uids=mock_uids, rewards=[1.0, 2.0, 3.0, 4.0, 5.0], weight=1 @@ -87,8 +89,7 @@ def __init__(self, task, uids, rewards, weight): ), ], ] - - weight_setter = WeightSetter() + weight_setter.reward_events = reward_events output = asyncio.run(weight_setter.run_step()) print(output) diff --git a/validator_api/API_docs.md b/validator_api/API_docs.md index 71030a30..bf609989 100644 --- a/validator_api/API_docs.md +++ b/validator_api/API_docs.md @@ -20,10 +20,15 @@ This document describes the API endpoints available for [Subnet 1](https://githu ## Getting Started -Follow these steps to set up and run the API server: +SN1 can run either in validator mode or in API mode. Both modes will require the validator hotkey. + +As a validator, you MUST be running one instance in validator mode and can launch an arbitrary number of API instances. These API instances will proxy the responses from miners to the validator for scoring. + +To set up and run the API server: 1. **Install dependencies**: Ensure all required dependencies are installed using Poetry. -2. **Run the API server**: Start the server to access the API endpoints. +2. **Set up the .env.api file**: Copy the .env.api.example file to .env.api and fill in the validator hotkey. +3. **Run the API server**: Start the server to access the API endpoints. Use the following command: diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py new file mode 100644 index 00000000..b0cef872 --- /dev/null +++ b/validator_api/chat_completion.py @@ -0,0 +1,104 @@ +import asyncio +import json +import random +from typing import AsyncGenerator + +import httpx +from fastapi import HTTPException +from fastapi.responses import StreamingResponse +from loguru import logger + +from shared.epistula import make_openai_query +from shared.settings import shared_settings +from shared.uids import get_uids + + +async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): + uid = int(uid) # sometimes uid is type np.uint64 + logger.info(f"Forwarding response to scoring with body: {body}") + if not shared_settings.SCORE_ORGANICS: # Allow disabling of scoring by default + return + + if body.get("task") != "InferenceTask": + logger.debug(f"Skipping forwarding for non-inference task: {body.get('task')}") + return + url = f"http://{shared_settings.VALIDATOR_API}/scoring" + payload = {"body": body, "chunks": chunks, "uid": uid} + try: + timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post( + url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} + ) + if response.status_code == 200: + logger.info(f"Forwarding response completed with status {response.status_code}") + + else: + logger.exception( + f"Forwarding response uid {uid} failed with status {response.status_code} and payload {payload}" + ) + + except Exception as e: + logger.error(f"Tried to forward response to {url} with payload {payload}") + logger.exception(f"Error while forwarding response: {e}") + + +async def stream_response( + response, collected_chunks: list[str], body: dict[str, any], uid: int +) -> AsyncGenerator[str, None]: + chunks_received = False + try: + async for chunk in response: + chunks_received = True + collected_chunks.append(chunk.choices[0].delta.content) + yield f"data: {json.dumps(chunk.model_dump())}\n\n" + + if not chunks_received: + logger.error("Stream is empty: No chunks were received") + yield 'data: {"error": "502 - Response is empty"}\n\n' + yield "data: [DONE]\n\n" + + # Forward the collected chunks after streaming is complete + asyncio.create_task(forward_response(uid=uid, body=body, chunks=collected_chunks)) + except asyncio.CancelledError: + logger.info("Client disconnected, streaming cancelled") + raise + except Exception as e: + logger.exception(f"Error during streaming: {e}") + yield 'data: {"error": "Internal server Error"}\n\n' + + +async def chat_completion(body: dict[str, any], uid: int | None = None) -> tuple | StreamingResponse: + """Handle regular chat completion without mixture of miners.""" + if uid is None: + uid = random.choice(get_uids(sampling_mode="top_incentive", k=100)) + + if uid is None: + logger.error("No available miner found") + raise HTTPException(status_code=503, detail="No available miner found") + + logger.debug(f"Querying uid {uid}") + STREAM = body.get("stream", False) + + collected_chunks: list[str] = [] + + logger.info(f"Making {'streaming' if STREAM else 'non-streaming'} openai query with body: {body}") + response = await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=STREAM) + + if STREAM: + return StreamingResponse( + stream_response(response, collected_chunks, body, uid), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) + else: + asyncio.create_task(forward_response(uid=uid, body=body, chunks=response[1])) + return response[0] + + +async def get_response_from_miner(body: dict[str, any], uid: int) -> tuple: + """Get response from a single miner.""" + return await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=False) diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index bc3d2e2f..34681f0e 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -1,106 +1,28 @@ -import asyncio -import json import random -import httpx -from fastapi import APIRouter, HTTPException, Request +from fastapi import APIRouter, Request from loguru import logger from starlette.responses import StreamingResponse -from shared.epistula import make_openai_query -from shared.settings import shared_settings -from shared.uids import get_uids +from validator_api.chat_completion import chat_completion +from validator_api.mixture_of_miners import mixture_of_miners router = APIRouter() -async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): - if not shared_settings.SCORE_ORGANICS: # Allow disabling of scoring by default - return - - # if body.get("task") != "InferenceTask": - # logger.debug(f"Skipping forwarding for non-inference task: {body.get('task')}") - # return - url = f"http://{shared_settings.VALIDATOR_API}/scoring" - payload = {"body": body, "chunks": chunks, "uid": uid} - # headers = { - # "Authorization": f"Bearer {shared_settings.SCORING_KEY}", #Add API key in Authorization header - # "Content-Type": "application/json", - # } - try: - timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) - async with httpx.AsyncClient(timeout=timeout) as client: - response = await client.post(url, json=payload) # , headers=headers) - if response.status_code == 200: - logger.info(f"Forwarding response completed with status {response.status_code}") - - else: - logger.exception( - f"Forwarding response uid {uid} failed with status {response.status_code} and payload {payload}" - ) - - except Exception as e: - logger.error(f"Tried to forward response to {url} with payload {payload}") - logger.exception(f"Error while forwarding response: {e}") - - @router.post("/v1/chat/completions") -async def chat_completion(request: Request): # , cbackground_tasks: BackgroundTasks): +async def completions(request: Request): + """Main endpoint that handles both regular and mixture of miners chat completion.""" try: body = await request.json() body["seed"] = int(body.get("seed") or random.randint(0, 1000000)) - STREAM = body.get("stream") or False - logger.debug(f"Streaming: {STREAM}") - # Get random miner from top 100 incentive. - uid = random.choice(get_uids(sampling_mode="top_incentive", k=100)) - # uid = get_available_miner(task=body.get("task"), model=body.get("model")) - if uid is None: - logger.error("No available miner found") - raise HTTPException(status_code=503, detail="No available miner found") - logger.debug(f"Querying uid {uid}") - - collected_chunks: list[str] = [] - - # Create a wrapper for the streaming response - async def stream_with_error_handling(): - chunks_received = False - try: - async for chunk in response: - chunks_received = True - collected_chunks.append(chunk.choices[0].delta.content) - yield f"data: {json.dumps(chunk.model_dump())}\n\n" - - if not chunks_received: - logger.error("Stream is empty: No chunks were received") - yield 'data: {"error": "502 - Response is empty"}\n\n' - yield "data: [DONE]\n\n" - - # Once the stream is done, forward the collected chunks - asyncio.create_task(forward_response(uid=uid, body=body, chunks=collected_chunks)) - # background_tasks.add_task(forward_response, uid=uid, body=body, chunks=collected_chunks) - except asyncio.CancelledError: - logger.info("Client disconnected, streaming cancelled") - raise - except Exception as e: - logger.exception(f"Error during streaming: {e}") - yield 'data: {"error": "Internal server Error"}\n\n' - - logger.info(f"Making {'streaming' if STREAM else 'non-streaming'} openai query with body: {body}") - response = await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=STREAM) - if STREAM: - return StreamingResponse( - stream_with_error_handling(), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - }, - ) + # Choose between regular completion and mixture of miners. + if body.get("mixture", False): + return await mixture_of_miners(body) else: - asyncio.create_task(forward_response(uid=uid, body=body, chunks=response[1])) - return response[0] + return await chat_completion(body) except Exception as e: - logger.exception(f"Error setting up streaming: {e}") + logger.exception(f"Error in chat completion: {e}") return StreamingResponse(content="Internal Server Error", status_code=500) diff --git a/validator_api/mixture_of_miners.py b/validator_api/mixture_of_miners.py new file mode 100644 index 00000000..e2aaa05a --- /dev/null +++ b/validator_api/mixture_of_miners.py @@ -0,0 +1,88 @@ +import asyncio +import copy +import random + +from fastapi import HTTPException +from fastapi.responses import StreamingResponse +from loguru import logger + +from shared.uids import get_uids +from validator_api.chat_completion import chat_completion, get_response_from_miner + +DEFAULT_SYSTEM_PROMPT = """You have been provided with a set of responses from various open-source models to the latest user query. +Your task is to synthesize these responses into a single, high-quality and concise response. +It is crucial to follow the provided instuctions or examples in the given prompt if any, and ensure the answer is in correct and expected format. +Critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. +Your response should not simply replicate the given answers but should offer a refined and accurate reply to the instruction. +Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability. +Responses from models:""" + +TASK_SYSTEM_PROMPT = { + None: DEFAULT_SYSTEM_PROMPT, + # Add more task-specific system prompts here. +} + +NUM_MIXTURE_MINERS = 5 +TOP_INCENTIVE_POOL = 100 + + +async def get_miner_response(body: dict, uid: str) -> tuple | None: + """Get response from a single miner with error handling.""" + try: + return await get_response_from_miner(body, uid) + except Exception as e: + logger.error(f"Error getting response from miner {uid}: {e}") + return None + + +async def mixture_of_miners(body: dict[str, any]) -> tuple | StreamingResponse: + """Handle chat completion with mixture of miners approach. + + Based on Mixture-of-Agents Enhances Large Language Model Capabilities, 2024, Wang et al.: + https://arxiv.org/abs/2406.04692 + + Args: + body: Query parameters: + messages: User prompt. + stream: If True, stream the response. + model: Optional model used for inference, SharedSettings.LLM_MODEL is used by default. + task: Optional task, see prompting/tasks/task_registry.py, InferenceTask is used by default. + """ + body_first_step = copy.deepcopy(body) + body_first_step["stream"] = False + + # Get multiple miners + miner_uids = get_uids(sampling_mode="top_incentive", k=NUM_MIXTURE_MINERS) + if len(miner_uids) == 0: + raise HTTPException(status_code=503, detail="No available miners found") + + # Concurrently collect responses from all miners. + miner_tasks = [get_miner_response(body_first_step, uid) for uid in miner_uids] + responses = await asyncio.gather(*miner_tasks) + + # Filter out None responses (failed requests). + valid_responses = [r for r in responses if r is not None] + + if not valid_responses: + raise HTTPException(status_code=503, detail="Failed to get responses from miners") + + # Extract completions from the responses. + completions = [response[1][0] for response in valid_responses if response and len(response) > 1] + + task_name = body.get("task") + system_prompt = TASK_SYSTEM_PROMPT.get(task_name, DEFAULT_SYSTEM_PROMPT) + + # Aggregate responses into one system prompt. + agg_system_prompt = system_prompt + "\n" + "\n".join([f"{i+1}. {comp}" for i, comp in enumerate(completions)]) + + # Prepare new messages with the aggregated system prompt. + new_messages = [{"role": "system", "content": agg_system_prompt}] + new_messages.extend([msg for msg in body["messages"] if msg["role"] != "system"]) + + # Update the body with the new messages. + final_body = copy.deepcopy(body) + final_body["messages"] = new_messages + + # Get final response using a random top miner. + final_uid = random.choice(get_uids(sampling_mode="top_incentive", k=TOP_INCENTIVE_POOL)) + return await chat_completion(final_body, final_uid)