Skip to content

Commit

Permalink
UPDATE: mlflow and hyperparameter tuning sample to Ray 2.5 and Python…
Browse files Browse the repository at this point in the history
… 3.10

Updates:
- Ray is updated to 2.5.0
- Python is updated to 3.10.11
- mlflow is updated to 2.4.1
- mlflow_run.py uses median and asha scheduler instead of PBT
- checkpoints are saved now according to increasing average reward
  • Loading branch information
akzaidi committed Aug 16, 2023
1 parent 9e32391 commit ad2776b
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 75 deletions.
2 changes: 1 addition & 1 deletion examples/hyperparameter-tuning-and-monitoring/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ az ml environment create --name ray-mlflow-env --conda-file conda.yml --image mc
Note that the environment in this sample is slightly different than the _Getting Started on AML_ sample, so you will need to create a new environment.

## What is being "simulated"?
This example uses the classic ["CartPole-v1"](https://gymnasium.farama.org/environments/classic_control/cart_pole/) simulation environment. The example runs Ray Tune's [population based training (PBT)](https://docs.ray.io/en/latest/_modules/ray/tune/schedulers/pbt.html) to simultaneously train and optimize a group of agents - regularly testing the agents, replicating the top performers, and perturbing their hyperparameters. The MLflow integration allows logging of all artifacts produced by Ray Tune, such as the model checkpoints, to MLflow.
This example uses the classic ["CartPole-v1"](https://gymnasium.farama.org/environments/classic_control/cart_pole/) simulation environment. The example uses Ray Tune to conduct hyperparameter search and uses MLflow for logging all the artifacts produced by Ray Tune, such as the model checkpoints, to MLflow.

A user guide providing more details on hyperparameter tuning with PBT is available in [user_guides/hyperparameter-tuning.md](../../docs/user_guides/hyperparameter-tuning.md).

Expand Down
17 changes: 11 additions & 6 deletions examples/hyperparameter-tuning-and-monitoring/conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ channels:
- anaconda
- conda-forge
dependencies:
- python=3.8.5
- pip=22.3.1
- python=3.10.11
- pip=23.0.1
- pip:
# Dependencies for Ray on AML
- azureml-mlflow
- azureml-defaults
- mlflow==1.27.0
# - mlflow==1.27.0
- mlflow==2.4.1
- mlflow-skinny==2.4.1
- ray-on-aml
- ray[data]==2.3.0
- ray[rllib]==2.3.0
- ray[data]==2.5.0
- ray[rllib]==2.5.0
# Deps for RLlib
- torch==2.0.1
- tensorflow==2.11.1
- tensorflow-probability==0.19.0
- tensorboardX==2.2
- torch==2.0.1
# Dependencies for the sample
- gymnasium
- numpy==1.24.2
- numpy==1.24.3
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def list_experiments(path: Optional[str] = None) -> List:
).mlflow_tracking_uri
mlflow.set_tracking_uri(mlflow_tracking_uri)

experiments = mlflow.list_experiments()
experiments = mlflow.search_experiments()

return experiments

Expand Down
4 changes: 2 additions & 2 deletions examples/hyperparameter-tuning-and-monitoring/job.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
$schema: https://azuremlschemas.azureedge.net/latest/commandJob.schema.json
code: src
command: >-
python mlflow_run.py --num-tune-samples 100
python mlflow_run.py --num-tune-samples 20 --scheduler asha
# your enviornment name should include the `azureml` prefix and the `@<version-name>` suffix
environment: azureml:ray-mlflow-env@latest
# your compute name should have `azureml:` prefix
Expand All @@ -14,4 +14,4 @@ distribution:
type: mpi
# Modify the following and num_rollout_workers in main to use more workers
resources:
instance_count: 10
instance_count: 5
133 changes: 68 additions & 65 deletions examples/hyperparameter-tuning-and-monitoring/src/mlflow_run.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import argparse
import random

import mlflow
from mlflow.utils.mlflow_tags import MLFLOW_PARENT_RUN_ID
from ray import air, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback
from ray.tune.schedulers import PopulationBasedTraining
from ray.tune.schedulers import ASHAScheduler, MedianStoppingRule
from ray.tune.stopper import TrialPlateauStopper
from ray_on_aml.core import Ray_On_AML


def run(num_tune_samples: int = 10, env_name: str = "CartPole-v1") -> tune.ResultGrid:
def run(
num_tune_samples: int = 10, env_name: str = "CartPole-v1", scheduler: str = "median"
) -> tune.ResultGrid:
"""Run Ray Tune with MLflow on AzureML or locally.
This is an example of using Ray Tune and MLflow with Ray on AzureML.
It is based on the PBT example from the Ray Tune documentation:
Expand All @@ -21,20 +23,18 @@ def run(num_tune_samples: int = 10, env_name: str = "CartPole-v1") -> tune.Resul
Number of samples to run with Ray Tune, by default 10
env_name : str, optional
Name of the environment to use, by default "CartPole-v1"
scheduler: str, optional
Name of the scheduler to use, option of median or asha, by default "median"
This example uses the `MLflowLoggerCallback` from Ray Tune to log the results
to MLflow. The MLflow integration allows you to log all the
artifacts produced by Ray Tune, such as the model checkpoints, to MLflow. For
more information, see the documentation:
https://docs.ray.io/en/releases-2.3.1/tune/api/doc/ray.air.integrations.mlflow.MLflowLoggerCallback.html#ray.air.integrations.mlflow.MLflowLoggerCallback
https://docs.ray.io/en/releases-2.3.1/tune/api/doc/
ray.air.integrations.mlflow.MLflowLoggerCallback.html
This example uses a fixed starting value for the hyperparameters. The
hyperparameters are then tuned using the Population Based Training (PBT)
algorithm. The hyperparameters used for the tuning algorithm are specified
in the `tune_config` dictionary, and the algorithm hyperparameters
to be tuned are specified in the `param_space` dictionary with a
function of the search space for each hyperparameter.
hyperparameters are then tuned using the scheduler specified by `scheduler`.
You can control the population size using `num_tune_samples`.
This script can be run on AML or locally. If running locally, pass the parameter
Expand All @@ -50,50 +50,38 @@ def run(num_tune_samples: int = 10, env_name: str = "CartPole-v1") -> tune.Resul
A set of Result objects for interacting with Ray Tune results.
"""

# Postprocess the perturbed config to ensure it's still valid
def explore(config):
# ensure we collect enough environment iteration to do a sgd step
if config["train_batch_size"] < config["sgd_minibatch_size"] * 2:
config["train_batch_size"] = config["sgd_minibatch_size"] * 2
# ensure we run at least one sgd iter
if config["num_sgd_iter"] < 1:
config["num_sgd_iter"] = 1
return config

# Define the resample distributions for the hyperparameters to mutate during PBT
# these parameters should be from the
# there are model-specific hyperparameters:
# https://docs.ray.io/en/releases-2.3.0/rllib/rllib-algorithms.html#ppo
# and general training hyperaparameters:
# https://docs.ray.io/en/releases-2.3.0/rllib/rllib-training.html#specifying-training-options
# which you can define here
hyperparam_mutations = {
"lambda": lambda: random.uniform(0.9, 1.0),
"clip_param": lambda: random.uniform(0.01, 0.5),
"lr": [1e-3, 5e-4, 1e-4, 5e-5, 1e-5],
"num_sgd_iter": lambda: random.randint(1, 30),
"sgd_minibatch_size": lambda: random.randint(128, 16384),
"train_batch_size": lambda: random.randint(2000, 160000),
}

# the scheduler we use for tuning is population based training
# other schedulers:
# https://docs.ray.io/en/releases-2.3.0/tune/api/schedulers.html
# see
# https://docs.ray.io/en/releases-2.3.0/tune/api/doc/ray.tune.schedulers.PopulationBasedTraining.html
pbt = PopulationBasedTraining(
time_attr="time_total_s",
perturbation_interval=120,
resample_probability=0.25,
hyperparam_mutations=hyperparam_mutations,
custom_explore_fn=explore,
)
if scheduler == "median":
tune_sched = MedianStoppingRule(
time_attr="training_iteration",
metric="episode_reward_mean",
mode="max",
grace_period=10,
min_samples_required=10,
)
elif scheduler == "asha":
tune_sched = ASHAScheduler(
time_attr="training_iteration",
metric="episode_reward_mean",
mode="max",
max_t=1000,
grace_period=10,
reduction_factor=3,
brackets=1,
)
else:
raise ValueError(
f"Unknown scheduler provided: {scheduler}. Valid options are median or asha"
)

# define the stopping criteria for training
# here we stop after 100 training gradient steps or when the average
# see available stopping criteria in the ray docs:
# https://docs.ray.io/en/latest/tune/api/stoppers.html
# we stop when a trial plateaus
# https://docs.ray.io/en/latest/tune/api/doc/ray.tune.stopper.TrialPlateauStopper.html
stopping_criteria = TrialPlateauStopper(metric="episode_reward_mean", mode="max")
# another example: 100 training gradient steps or when the average
# episode reward during a training batch reaches 300
# Stop when we've either reached 100 training iterations or reward=300
stopping_criteria = {"training_iteration": 100, "episode_reward_mean": 300}
# stopping_criteria = {"training_iteration": 100, "episode_reward_mean": 300}

# Get current run ID from MLFlow and pass it the callback
current_run = mlflow.active_run()
Expand All @@ -104,9 +92,9 @@ def explore(config):
# the algorithm/trainable to be tuned
"PPO",
tune_config=tune.TuneConfig(
metric="episode_reward_mean",
mode="max",
scheduler=pbt,
# metric="episode_reward_mean",
# mode="max",
scheduler=tune_sched,
# the number of hyperparameters to sample
num_samples=num_tune_samples,
),
Expand All @@ -124,31 +112,40 @@ def explore(config):
param_space={
"env": env_name,
"kl_coeff": 1.0,
"num_workers": 4,
"num_cpus": 1, # number of CPUs to use per trial
"num_gpus": 0, # number of GPUs to use per trial
"scaling_config": air.ScalingConfig(
num_workers=10, resources_per_worker={"cpu": 2}
),
# "num_cpus": 1, # number of CPUs to use per trial
# "num_gpus": 0, # number of GPUs to use per trial
# For DiagGaussian action distributions, make the second half of the model
# outputs floating bias variables instead of state-dependent. This only
# has an effect is using the default fully connected net.
# does not work for non-continuous action spaces
# "model": {"free_log_std": True},
# These params are tuned from a fixed starting value.
"lambda": 0.95,
"clip_param": 0.2,
"lr": 1e-4,
"lambda": tune.choice([0.9, 0.95, 1.0]),
"clip_param": tune.choice([0.1, 0.2, 0.3]),
"lr": tune.choice([1e-3, 5e-4, 1e-4]),
# These params start off randomly drawn from a set.
"num_sgd_iter": tune.choice([10, 20, 30]),
"sgd_minibatch_size": tune.choice([128, 512, 2048]),
"train_batch_size": tune.choice([10000, 20000, 40000]),
# "sgd_minibatch_size": tune.choice([128, 512, 2048]),
"sgd_minibatch_size": 2048,
# "train_batch_size": tune.choice([10000, 20000, 40000]),
"train_batch_size": 40000,
},
# MLFlow callback uses parent_run_id and tracks all hyperparameter
# runs as child jobs
run_config=air.RunConfig(
stop=stopping_criteria,
checkpoint_config=air.CheckpointConfig(
num_to_keep=20,
checkpoint_score_attribute="episode_reward_mean",
checkpoint_score_order="max",
),
callbacks=[
MLflowLoggerCallback(
tags={MLFLOW_PARENT_RUN_ID: current_run.info.run_id},
experiment_name="pbt_ppo",
experiment_name="hpt_ppo",
save_artifact=True,
)
],
Expand Down Expand Up @@ -176,6 +173,13 @@ def explore(config):
default="CartPole-v1",
help="Registered gym environment to use for training",
)

parser.add_argument(
"--scheduler",
type=str,
default="median",
help="Scheduler to use for training. Options are median or asha",
)
args, _ = parser.parse_known_args()

if not args.test_local:
Expand All @@ -186,8 +190,7 @@ def explore(config):
print("head node detected")
ray.init(address="auto")
print(ray.cluster_resources())

run(args.num_tune_samples)
run(args.num_tune_samples, args.env_name, args.scheduler)
else:
print("in worker node")

Expand Down

0 comments on commit ad2776b

Please sign in to comment.