From ab1532c5790fd3efa6afbe156a71841cb0fa7a63 Mon Sep 17 00:00:00 2001 From: matthewdeng Date: Tue, 28 Nov 2023 18:57:30 -0800 Subject: [PATCH] [tune] remove legacy client API (#41469) --- doc/source/tune/api/api.rst | 1 - doc/source/tune/api/client.rst | 41 --- python/ray/tune/BUILD | 8 - python/ray/tune/TuneClient.ipynb | 82 ------ python/ray/tune/execution/tune_controller.py | 21 -- .../ray/tune/tests/test_tune_save_restore.py | 3 - python/ray/tune/tests/test_tune_server.py | 167 ------------ python/ray/tune/tune.py | 7 - python/ray/tune/web_server.py | 258 ------------------ 9 files changed, 588 deletions(-) delete mode 100644 doc/source/tune/api/client.rst delete mode 100644 python/ray/tune/TuneClient.ipynb delete mode 100644 python/ray/tune/tests/test_tune_server.py delete mode 100644 python/ray/tune/web_server.py diff --git a/doc/source/tune/api/api.rst b/doc/source/tune/api/api.rst index c0b66114580b..2a352e01d37d 100644 --- a/doc/source/tune/api/api.rst +++ b/doc/source/tune/api/api.rst @@ -27,5 +27,4 @@ on `Github`_. env.rst integration.rst internals.rst - client.rst cli.rst diff --git a/doc/source/tune/api/client.rst b/doc/source/tune/api/client.rst deleted file mode 100644 index a1eda338ca01..000000000000 --- a/doc/source/tune/api/client.rst +++ /dev/null @@ -1,41 +0,0 @@ -Tune Client API -=============== - -You can interact with an ongoing experiment with the Tune Client API. The Tune Client API is organized around REST, -which includes resource-oriented URLs, accepts form-encoded requests, returns JSON-encoded responses, -and uses standard HTTP protocol. - -To allow Tune to receive and respond to your API calls, you have to start your experiment with ``tune.run(server_port)``: - -.. code-block:: python - - tune.run(..., server_port=4321) - -The easiest way to use the Tune Client API is with the built-in TuneClient. To use TuneClient, -verify that you have the ``requests`` library installed: - -.. code-block:: bash - - $ pip install requests - -Then, on the client side, you can use the following class. If on a cluster, you may want to forward this port -(e.g. ``ssh -L :localhost:
``) so that you can use the Client on your local machine. - -.. autoclass:: ray.tune.web_server.TuneClient - :members: - -For an example notebook for using the Client API, see the -`Client API Example `__. - -The API also supports curl. Here are the examples for getting trials (``GET /trials/[:id]``): - -.. code-block:: bash - - $ curl http://
:/trials - $ curl http://
:/trials/ - -And stopping a trial (``PUT /trials/:id``): - -.. code-block:: bash - - $ curl -X PUT http://
:/trials/ diff --git a/python/ray/tune/BUILD b/python/ray/tune/BUILD index 197d886c70d7..b274f0d4c63a 100644 --- a/python/ray/tune/BUILD +++ b/python/ray/tune/BUILD @@ -375,14 +375,6 @@ py_test( tags = ["team:ml", "exclusive"], ) -py_test( - name = "test_tune_server", - size = "medium", - srcs = ["tests/test_tune_server.py"], - deps = [":tune_lib"], - tags = ["team:ml", "exclusive"], -) - py_test( name = "test_tuner", size = "large", diff --git a/python/ray/tune/TuneClient.ipynb b/python/ray/tune/TuneClient.ipynb deleted file mode 100644 index 32ceb2ed34cf..000000000000 --- a/python/ray/tune/TuneClient.ipynb +++ /dev/null @@ -1,82 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from ray.tune.web_server import TuneClient\n", - "\n", - "manager = TuneClient(tune_address=\"localhost\", port_forward=4321)\n", - "\n", - "x = manager.get_all_trials()\n", - "\n", - "[((y[\"id\"]), y[\"status\"]) for y in x[\"trials\"]]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "scrolled": false - }, - "outputs": [], - "source": [ - "for y in x[\"trials\"][-10:]:\n", - " manager.stop_trial(y[\"id\"])" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import yaml\n", - "\n", - "with open(\"../rllib/tuned_examples/hyperband-cartpole.yaml\") as f:\n", - " d = yaml.safe_load(f)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "name, spec = [x for x in d.items()][0]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "manager.add_trial(name, spec)" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.8" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/python/ray/tune/execution/tune_controller.py b/python/ray/tune/execution/tune_controller.py index 4d66ccaaf7d4..b51b14e73224 100644 --- a/python/ray/tune/execution/tune_controller.py +++ b/python/ray/tune/execution/tune_controller.py @@ -61,7 +61,6 @@ from ray.tune.utils.log import Verbosity, has_verbosity from ray.tune.execution.placement_groups import PlacementGroupFactory from ray.tune.utils.serialization import TuneFunctionDecoder, TuneFunctionEncoder -from ray.tune.web_server import TuneServer from ray.util.annotations import DeveloperAPI, Deprecated from ray.util.debug import log_once @@ -82,7 +81,6 @@ def __init__( scheduler: Optional[TrialScheduler] = None, stopper: Optional[Stopper] = None, resume: Union[str, bool] = False, - server_port: Optional[int] = None, fail_fast: bool = False, checkpoint_period: Union[str, int] = None, callbacks: Optional[List[Callback]] = None, @@ -205,11 +203,6 @@ def __init__( int(os.environ.get("TUNE_PRINT_ALL_TRIAL_ERRORS", "1")) ) - self._server = None - self._server_port = server_port - if server_port is not None: - self._server = TuneServer(self, self._server_port) - self._trials: List[Trial] = [] self._live_trials: Set[Trial] = set() # Set of non-terminated trials self._cached_trial_decisions = {} @@ -752,13 +745,6 @@ def step(self): self._iteration += 1 - if self._server: - with warn_if_slow("server"): - self._process_stop_requests() - - if self.is_finished(): - self._server.shutdown() - with warn_if_slow("on_step_end"): self.on_step_end() with warn_if_slow("callbacks.on_step_end"): @@ -2040,7 +2026,6 @@ def __getstate__(self): "_trials", "_live_trials", "_stop_queue", - "_server", "_search_alg", "_placeholder_resolvers", "_scheduler_alg", @@ -2070,12 +2055,9 @@ def __getstate__(self): "_actor_cache", ]: del state[k] - state["launch_web_server"] = bool(self._server) return state def __setstate__(self, state): - launch_web_server = state.pop("launch_web_server") - # Use session_str from previous checkpoint if does not exist session_str = state.pop("_session_str") self.__dict__.setdefault("_session_str", session_str) @@ -2086,9 +2068,6 @@ def __setstate__(self, state): self.__dict__.update(state) self._checkpoint_manager = self._create_checkpoint_manager() - if launch_web_server: - self._server = TuneServer(self, self._server_port) - class _TrialExecutorWrapper: """Wraps around TrialExecutor class, intercepts API calls and warns users diff --git a/python/ray/tune/tests/test_tune_save_restore.py b/python/ray/tune/tests/test_tune_save_restore.py index fcdc8f21325d..eccc60b81cef 100644 --- a/python/ray/tune/tests/test_tune_save_restore.py +++ b/python/ray/tune/tests/test_tune_save_restore.py @@ -10,7 +10,6 @@ import ray from ray import tune from ray.train import CheckpointConfig -from ray.rllib import _register_all from ray.tune import Trainable from ray.tune.utils import validate_save_restore @@ -52,8 +51,6 @@ def tearDown(self): shutil.rmtree(self.absolute_local_dir, ignore_errors=True) self.absolute_local_dir = None ray.shutdown() - # Without this line, test_tune_server.testAddTrial would fail. - _register_all() def _get_trial_dir(self, absoulte_exp_dir): print("looking for", self.MockTrainable._name) diff --git a/python/ray/tune/tests/test_tune_server.py b/python/ray/tune/tests/test_tune_server.py deleted file mode 100644 index 8ad7a59963f7..000000000000 --- a/python/ray/tune/tests/test_tune_server.py +++ /dev/null @@ -1,167 +0,0 @@ -import requests -import socket -import subprocess -import unittest -import json - -import ray -from ray.rllib import _register_all -from ray.tune import PlacementGroupFactory -from ray.tune.experiment.trial import Trial -from ray.tune.web_server import TuneClient -from ray.tune.execution.tune_controller import TuneController - -from ray.train.tests.util import mock_storage_context - - -def get_valid_port(): - port = 4321 - while True: - try: - print("Trying port", port) - port_test_socket = socket.socket() - port_test_socket.bind(("127.0.0.1", port)) - port_test_socket.close() - break - except socket.error: - port += 1 - return port - - -class TuneServerSuite(unittest.TestCase): - def basicSetup(self): - - ray.init(num_cpus=4, num_gpus=1) - port = get_valid_port() - self.runner = TuneController(server_port=port, storage=mock_storage_context()) - runner = self.runner - kwargs = { - "stopping_criterion": {"training_iteration": 3}, - "placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]), - "storage": mock_storage_context(), - } - trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] - for t in trials: - runner.add_trial(t) - client = TuneClient("localhost", port) - return runner, client - - def tearDown(self): - print("Tearing down....") - try: - self.runner._server.shutdown() - self.runner = None - except Exception as e: - print(e) - ray.shutdown() - _register_all() - - def testAddTrial(self): - runner, client = self.basicSetup() - for i in range(3): - runner.step() - spec = { - "run": "__fake", - "stop": {"training_iteration": 3}, - "resources_per_trial": {"cpu": 1, "gpu": 1}, - } - client.add_trial("test", spec) - runner.step() - all_trials = client.get_all_trials()["trials"] - runner.step() - self.assertEqual(len(all_trials), 3) - - def testGetTrials(self): - runner, client = self.basicSetup() - for i in range(3): - runner.step() - all_trials = client.get_all_trials()["trials"] - self.assertEqual(len(all_trials), 2) - tid = all_trials[0]["id"] - client.get_trial(tid) - runner.step() - self.assertEqual(len(all_trials), 2) - - def testGetTrialsWithFunction(self): - runner, client = self.basicSetup() - test_trial = Trial( - "__fake", - trial_id="function_trial", - stopping_criterion={"training_iteration": 3}, - config={"callbacks": {"on_episode_start": lambda x: None}}, - storage=mock_storage_context(), - ) - runner.add_trial(test_trial) - - for i in range(3): - runner.step() - all_trials = client.get_all_trials()["trials"] - self.assertEqual(len(all_trials), 3) - client.get_trial("function_trial") - runner.step() - self.assertEqual(len(all_trials), 3) - - def testStopTrial(self): - """Check if Stop Trial works.""" - runner, client = self.basicSetup() - while not any(t.status == Trial.RUNNING for t in runner.get_trials()): - runner.step() - all_trials = client.get_all_trials()["trials"] - self.assertEqual( - len([t for t in all_trials if t["status"] == Trial.RUNNING]), 1 - ) - - tid = [t for t in all_trials if t["status"] == Trial.RUNNING][0]["id"] - client.stop_trial(tid) - runner.step() - - all_trials = client.get_all_trials()["trials"] - self.assertEqual( - len([t for t in all_trials if t["status"] == Trial.RUNNING]), 0 - ) - - def testStopExperiment(self): - """Check if stop_experiment works.""" - runner, client = self.basicSetup() - while not any(t.status == Trial.RUNNING for t in runner.get_trials()): - runner.step() - all_trials = client.get_all_trials()["trials"] - self.assertEqual( - len([t for t in all_trials if t["status"] == Trial.RUNNING]), 1 - ) - - client.stop_experiment() - runner.step() - self.assertTrue(runner.is_finished()) - self.assertRaises( - requests.exceptions.ReadTimeout, lambda: client.get_all_trials(timeout=1) - ) - - def testCurlCommand(self): - """Check if Stop Trial works.""" - runner, client = self.basicSetup() - for i in range(2): - runner.step() - stdout = subprocess.check_output( - 'curl "http://{}:{}/trials"'.format( - client.server_address, client.server_port - ), - shell=True, - ) - self.assertNotEqual(stdout, None) - curl_trials = json.loads(stdout.decode())["trials"] - client_trials = client.get_all_trials()["trials"] - for curl_trial, client_trial in zip(curl_trials, client_trials): - self.assertEqual(curl_trial.keys(), client_trial.keys()) - self.assertEqual(curl_trial["id"], client_trial["id"]) - self.assertEqual( - curl_trial["trainable_name"], client_trial["trainable_name"] - ) - self.assertEqual(curl_trial["status"], client_trial["status"]) - - -if __name__ == "__main__": - import pytest - import sys - - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 49200338d79d..a05445646e88 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -259,7 +259,6 @@ def run( max_failures: int = 0, fail_fast: bool = False, restore: Optional[str] = None, - server_port: Optional[int] = None, resume: Union[bool, str] = False, reuse_actors: Optional[bool] = None, raise_on_failed_trial: bool = True, @@ -419,7 +418,6 @@ def run( is best used with `ray.init(local_mode=True)`). restore: Path to checkpoint. Only makes sense to set if running 1 trial. Defaults to None. - server_port: Port number for launching TuneServer. resume: One of [True, False, "LOCAL", "REMOTE", "PROMPT", "AUTO"]. Can be suffixed with one or more of ["+ERRORED", "+ERRORED_ONLY", "+RESTART_ERRORED", "+RESTART_ERRORED_ONLY"] (e.g. ``AUTO+ERRORED``). @@ -929,7 +927,6 @@ class and registered trainables. scheduler=scheduler, stopper=experiments[0].stopper, resume=resume, - server_port=server_port, fail_fast=fail_fast, callbacks=callbacks, metric=metric, @@ -1075,7 +1072,6 @@ class and registered trainables. def run_experiments( experiments: Union[Experiment, Mapping, Sequence[Union[Experiment, Mapping]]], scheduler: Optional[TrialScheduler] = None, - server_port: Optional[int] = None, verbose: Optional[Union[int, AirVerbosity, Verbosity]] = None, progress_reporter: Optional[ProgressReporter] = None, resume: Union[bool, str] = False, @@ -1127,7 +1123,6 @@ def run_experiments( remote_run.remote( experiments, scheduler, - server_port, verbose, progress_reporter, resume, @@ -1147,7 +1142,6 @@ def run_experiments( if concurrent: return run( experiments, - server_port=server_port, verbose=verbose, progress_reporter=progress_reporter, resume=resume, @@ -1162,7 +1156,6 @@ def run_experiments( for exp in experiments: trials += run( exp, - server_port=server_port, verbose=verbose, progress_reporter=progress_reporter, resume=resume, diff --git a/python/ray/tune/web_server.py b/python/ray/tune/web_server.py deleted file mode 100644 index 37bcb54f4566..000000000000 --- a/python/ray/tune/web_server.py +++ /dev/null @@ -1,258 +0,0 @@ -import json -import logging -import threading -from typing import Tuple, List, TYPE_CHECKING - -from urllib.parse import urljoin, urlparse -from http.server import SimpleHTTPRequestHandler, HTTPServer - -import ray.cloudpickle as cloudpickle -from ray.tune import TuneError -from ray.tune.search import BasicVariantGenerator -from ray._private.utils import binary_to_hex, hex_to_binary -from ray.util.annotations import DeveloperAPI - -if TYPE_CHECKING: - from ray.tune.execution.tune_controller import TuneController - -logger = logging.getLogger(__name__) - -try: - import requests # `requests` is not part of stdlib. -except ImportError: - requests = None - logger.exception( - "Couldn't import `requests` library. " - "Be sure to install it on the client side." - ) - - -@DeveloperAPI -class TuneClient: - """Client to interact with an ongoing Tune experiment. - - Requires a TuneServer to have started running. - - Attributes: - tune_address: Address of running TuneServer - port_forward: Port number of running TuneServer - """ - - def __init__(self, tune_address: str, port_forward: int): - self._tune_address = tune_address - self._port_forward = port_forward - self._path = "http://{}:{}".format(tune_address, port_forward) - - def get_all_trials(self, timeout=None): - """Returns a list of all trials' information.""" - response = requests.get(urljoin(self._path, "trials"), timeout=timeout) - return self._deserialize(response) - - def get_trial(self, trial_id, timeout=None): - """Returns trial information by trial_id.""" - response = requests.get( - urljoin(self._path, "trials/{}".format(trial_id)), timeout=timeout - ) - return self._deserialize(response) - - def add_trial(self, name, specification): - """Adds a trial by name and specification (dict).""" - payload = {"name": name, "spec": specification} - response = requests.post(urljoin(self._path, "trials"), json=payload) - return self._deserialize(response) - - def stop_trial(self, trial_id): - """Requests to stop trial by trial_id.""" - response = requests.put(urljoin(self._path, "trials/{}".format(trial_id))) - return self._deserialize(response) - - def stop_experiment(self): - """Requests to stop the entire experiment.""" - response = requests.put(urljoin(self._path, "stop_experiment")) - return self._deserialize(response) - - @property - def server_address(self): - return self._tune_address - - @property - def server_port(self): - return self._port_forward - - def _load_trial_info(self, trial_info): - trial_info["config"] = cloudpickle.loads(hex_to_binary(trial_info["config"])) - trial_info["result"] = cloudpickle.loads(hex_to_binary(trial_info["result"])) - - def _deserialize(self, response): - parsed = response.json() - - if "trial" in parsed: - self._load_trial_info(parsed["trial"]) - elif "trials" in parsed: - for trial_info in parsed["trials"]: - self._load_trial_info(trial_info) - - return parsed - - -@DeveloperAPI -def RunnerHandler(runner): - class Handler(SimpleHTTPRequestHandler): - """A Handler is a custom handler for TuneServer. - - Handles all requests and responses coming into and from - the TuneServer. - """ - - def _do_header(self, response_code: int = 200, headers: List[Tuple] = None): - """Sends the header portion of the HTTP response. - - Parameters: - response_code: Standard HTTP response code - headers: Standard HTTP response headers - """ - if headers is None: - headers = [("Content-type", "application/json")] - - self.send_response(response_code) - for key, value in headers: - self.send_header(key, value) - self.end_headers() - - def do_HEAD(self): - """HTTP HEAD handler method.""" - self._do_header() - - def do_GET(self): - """HTTP GET handler method.""" - response_code = 200 - message = "" - try: - result = self._get_trial_by_url(self.path) - resource = {} - if result: - if isinstance(result, list): - infos = [self._trial_info(t) for t in result] - resource["trials"] = infos - else: - resource["trial"] = self._trial_info(result) - message = json.dumps(resource) - except TuneError as e: - response_code = 404 - message = str(e) - - self._do_header(response_code=response_code) - self.wfile.write(message.encode()) - - def do_PUT(self): - """HTTP PUT handler method.""" - response_code = 200 - message = "" - try: - resource = {} - - if self.path.endswith("stop_experiment"): - runner.request_stop_experiment() - trials = list(runner.get_trials()) - else: - trials = self._get_trial_by_url(self.path) - if trials: - if not isinstance(trials, list): - trials = [trials] - for t in trials: - runner.request_stop_trial(t) - - resource["trials"] = [self._trial_info(t) for t in trials] - message = json.dumps(resource) - except TuneError as e: - response_code = 404 - message = str(e) - - self._do_header(response_code=response_code) - self.wfile.write(message.encode()) - - def do_POST(self): - """HTTP POST handler method.""" - response_code = 201 - - content_len = int(self.headers.get("Content-Length"), 0) - raw_body = self.rfile.read(content_len) - parsed_input = json.loads(raw_body.decode()) - resource = self._add_trials(parsed_input["name"], parsed_input["spec"]) - - headers = [("Content-type", "application/json"), ("Location", "/trials/")] - self._do_header(response_code=response_code, headers=headers) - self.wfile.write(json.dumps(resource).encode()) - - def _trial_info(self, trial): - """Returns trial information as JSON.""" - if trial.last_result: - result = trial.last_result.copy() - else: - result = None - info_dict = { - "id": trial.trial_id, - "trainable_name": trial.trainable_name, - "config": binary_to_hex(cloudpickle.dumps(trial.config)), - "status": trial.status, - "result": binary_to_hex(cloudpickle.dumps(result)), - } - return info_dict - - def _get_trial_by_url(self, url): - """Parses url to get either all trials or trial by trial_id.""" - parts = urlparse(url) - path = parts.path - - if path == "/trials": - return list(runner.get_trials()) - else: - trial_id = path.split("/")[-1] - return runner.get_trial(trial_id) - - def _add_trials(self, name, spec): - """Add trial by invoking TuneController.""" - resource = {} - resource["trials"] = [] - trial_generator = BasicVariantGenerator() - trial_generator.add_configurations({name: spec}) - while not trial_generator.is_finished(): - trial = trial_generator.next_trial() - if not trial: - break - runner.add_trial(trial) - resource["trials"].append(self._trial_info(trial)) - return resource - - return Handler - - -@DeveloperAPI -class TuneServer(threading.Thread): - """A TuneServer is a thread that initializes and runs a HTTPServer. - - The server handles requests from a TuneClient. - - Attributes: - runner: Runner that modifies and accesses trials. - port_forward: Port number of TuneServer. - """ - - DEFAULT_PORT = 4321 - - def __init__(self, runner: "TuneController", port: int = None): - """Initialize HTTPServer and serve forever by invoking self.run()""" - threading.Thread.__init__(self) - self._port = port if port else self.DEFAULT_PORT - address = ("localhost", self._port) - logger.info("Starting Tune Server...") - self._server = HTTPServer(address, RunnerHandler(runner)) - self.daemon = True - self.start() - - def run(self): - self._server.serve_forever() - - def shutdown(self): - """Shutdown the underlying server.""" - self._server.shutdown()