Skip to content

Commit

Permalink
[tune/train] clean up tune/train result output (#32234)
Browse files Browse the repository at this point in the history
* [tune/train] remove duplicated keys in tune/train results.

Signed-off-by: xwjiang2010 <xwjiang2010@gmail.com>

* timestamp

Signed-off-by: xwjiang2010 <xwjiang2010@gmail.com>

* result_timestamp defaults to None

Signed-off-by: xwjiang2010 <xwjiang2010@gmail.com>

* fix test

Signed-off-by: xwjiang2010 <xwjiang2010@gmail.com>

* fix progress_reporter test.

Signed-off-by: xwjiang2010 <xwjiang2010@gmail.com>

* .get(, None)

Signed-off-by: xwjiang2010 <xwjiang2010@gmail.com>

* fix test

Signed-off-by: xwjiang2010 <xwjiang2010@gmail.com>

* fix test_gpu

Signed-off-by: xwjiang2010 <xwjiang2010@gmail.com>

* WORKER_

Signed-off-by: xwjiang2010 <xwjiang2010@gmail.com>

---------

Signed-off-by: xwjiang2010 <xwjiang2010@gmail.com>
  • Loading branch information
xwjiang2010 authored Feb 13, 2023
1 parent 80f2161 commit e71c63f
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 67 deletions.
17 changes: 6 additions & 11 deletions python/ray/train/_internal/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
from ray.data import Dataset, DatasetPipeline
from ray.train._internal.accelerator import Accelerator
from ray.train.constants import (
DATE,
DETAILED_AUTOFILLED_KEYS,
HOSTNAME,
NODE_IP,
PID,
WORKER_HOSTNAME,
WORKER_NODE_IP,
WORKER_PID,
TIME_THIS_ITER_S,
TIME_TOTAL_S,
TIMESTAMP,
TRAINING_ITERATION,
CHECKPOINT_METADATA_KEY,
)
from ray.train.error import SessionMisuseError
Expand Down Expand Up @@ -229,14 +227,11 @@ def _auto_fill_metrics(self, result: dict) -> dict:
self.last_report_time = current_time

auto_filled_metrics = {
DATE: current_datetime.strftime("%Y-%m-%d_%H-%M-%S"),
TIMESTAMP: int(time.mktime(current_datetime.timetuple())),
TIME_THIS_ITER_S: time_this_iter,
TIME_TOTAL_S: self.time_total,
PID: os.getpid(),
HOSTNAME: platform.node(),
NODE_IP: self.local_ip,
TRAINING_ITERATION: self.iteration,
WORKER_PID: os.getpid(),
WORKER_HOSTNAME: platform.node(),
WORKER_NODE_IP: self.local_ip,
}

if not self.detailed_autofilled_metrics:
Expand Down
21 changes: 11 additions & 10 deletions python/ray/train/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@
)

# Autofilled session.report() metrics. Keys should be consistent with Tune.
TIMESTAMP = "_timestamp"
TIME_THIS_ITER_S = "_time_this_iter_s"
TRAINING_ITERATION = "_training_iteration"
# The train provided `TIME_THIS_ITER_S` and `TIMESTAMP` will triumph what's
# auto-filled by Tune session.
# TODO: Combine the following two with tune's, once there is a centralized
# file for both tune/train constants.
TIMESTAMP = "timestamp"
TIME_THIS_ITER_S = "time_this_iter_s"

BASIC_AUTOFILLED_KEYS = {TIMESTAMP, TIME_THIS_ITER_S, TRAINING_ITERATION}

DATE = "_date"
HOSTNAME = "_hostname"
NODE_IP = "_node_ip"
PID = "_pid"
TIME_TOTAL_S = "_time_total_s"

WORKER_HOSTNAME = "_hostname"
WORKER_NODE_IP = "_node_ip"
WORKER_PID = "_pid"

# Will not be reported unless ENABLE_DETAILED_AUTOFILLED_METRICS_ENV
# env var is not 0
DETAILED_AUTOFILLED_KEYS = {DATE, HOSTNAME, NODE_IP, PID, TIME_TOTAL_S}
DETAILED_AUTOFILLED_KEYS = {WORKER_HOSTNAME, WORKER_NODE_IP, WORKER_PID, TIME_TOTAL_S}

# Default filename for JSON logger
RESULT_FILE_JSON = "results.json"
Expand Down
2 changes: 1 addition & 1 deletion python/ray/train/tests/test_examples.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest

from ray.air.config import ScalingConfig
from ray.train.constants import TRAINING_ITERATION
from ray.tune.result import TRAINING_ITERATION

from ray.train.examples.horovod.horovod_example import (
train_func as horovod_torch_train_func,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/train/tests/test_gpu_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from ray.air import Checkpoint, session

from ray.air.config import ScalingConfig
from ray.train.constants import TRAINING_ITERATION
from ray.train.examples.horovod.horovod_example import (
train_func as horovod_torch_train_func,
)
Expand All @@ -21,6 +20,7 @@
)
from ray.train.tensorflow.tensorflow_trainer import TensorflowTrainer
from ray.train.torch.torch_trainer import TorchTrainer
from ray.tune.result import TRAINING_ITERATION


def test_tensorflow_mnist_gpu(ray_start_4_cpus_2_gpus):
Expand Down
5 changes: 3 additions & 2 deletions python/ray/train/tests/test_mosaic_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import torchvision
from torchvision import transforms, datasets

from ray.air import session
from ray.air.config import ScalingConfig
import ray.train as train
from ray.air import session
from ray.tune.result import TRAINING_ITERATION


scaling_config = ScalingConfig(num_workers=2, use_gpu=False)
Expand Down Expand Up @@ -89,7 +90,7 @@ def test_mosaic_cifar10(ray_start_4_cpus):
assert result["epoch"][result.index[-1]] == 4

# check train_iterations
assert result["_training_iteration"][result.index[-1]] == 5
assert result[TRAINING_ITERATION][result.index[-1]] == 5

# check metrics/train/Accuracy has increased
acc = list(result["metrics/train/Accuracy"])
Expand Down
16 changes: 7 additions & 9 deletions python/ray/tune/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
# (Optional) Mean loss for training iteration
MEAN_LOSS = "mean_loss"

# (Optional) Mean loss for training iteration
NEG_MEAN_LOSS = "neg_mean_loss"

# (Optional) Mean accuracy for training iteration
MEAN_ACCURACY = "mean_accuracy"

Expand All @@ -44,6 +41,10 @@
# (Optional/Auto-filled) Accumulated number of episodes for this trial.
EPISODES_TOTAL = "episodes_total"

# The timestamp of when the result is generated.
# Default to when the result is processed by tune.
TIMESTAMP = "timestamp"

# Number of timesteps in this iteration.
TIMESTEPS_THIS_ITER = "timesteps_this_iter"

Expand All @@ -67,7 +68,6 @@
DEFAULT_RESULT_KEYS = (
TRAINING_ITERATION,
TIME_TOTAL_S,
TIMESTEPS_TOTAL,
MEAN_ACCURACY,
MEAN_LOSS,
)
Expand All @@ -77,7 +77,7 @@
TRIAL_ID,
"experiment_id",
"date",
"timestamp",
TIMESTAMP,
PID,
HOSTNAME,
NODE_IP,
Expand All @@ -95,14 +95,12 @@
PID,
TIME_TOTAL_S,
TIME_THIS_ITER_S,
"timestamp",
"experiment_id",
TIMESTAMP,
"date",
"time_since_restore",
"iterations_since_restore",
"timesteps_since_restore",
"iterations_since_restore",
"config",
"warmup_time",
)

# __duplicate__ is a magic keyword used internally to
Expand Down
6 changes: 3 additions & 3 deletions python/ray/tune/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def _function_trainable(config, reporter):
"time_since_restore",
"experiment_id",
"date",
"warmup_time",
}

self.assertEqual(len(class_output), len(results))
Expand Down Expand Up @@ -871,7 +870,7 @@ def testReportTimeStep(self):
results1 = [dict(mean_accuracy=5, done=i == 99) for i in range(100)]
logs1, _ = self.checkAndReturnConsistentLogs(results1)

self.assertTrue(all(log[TIMESTEPS_TOTAL] is None for log in logs1))
self.assertTrue(all(TIMESTEPS_TOTAL not in log for log in logs1))

# Test that no timesteps_this_iter are logged if only timesteps_total
# are returned.
Expand All @@ -891,7 +890,8 @@ def testReportTimeStep(self):
self.assertFalse(any(hasattr(log, TIMESTEPS_THIS_ITER) for log in logs2))

# Test that timesteps_total and episodes_total are reported when
# timesteps_this_iter and episodes_this_iter despite only return zeros.
# timesteps_this_iter and episodes_this_iter are provided by user,
# despite only return zeros.
results3 = [
dict(timesteps_this_iter=0, episodes_this_iter=0) for i in range(10)
]
Expand Down
48 changes: 21 additions & 27 deletions python/ray/tune/trainable/trainable.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import sys
import tempfile
import time
import uuid
from contextlib import redirect_stderr, redirect_stdout
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Union, Type, TYPE_CHECKING
Expand Down Expand Up @@ -35,6 +34,7 @@
STDOUT_FILE,
TIME_THIS_ITER_S,
TIME_TOTAL_S,
TIMESTAMP,
TIMESTEPS_THIS_ITER,
TIMESTEPS_TOTAL,
TRAINING_ITERATION,
Expand Down Expand Up @@ -133,7 +133,6 @@ def __init__(
sync_timeout: Timeout after which sync processes are aborted.
"""

self._experiment_id = uuid.uuid4().hex
self.config = config or {}
trial_info = self.config.pop(TRIAL_INFO, None)

Expand Down Expand Up @@ -164,10 +163,10 @@ def __init__(
self._stdout_file = stdout_file
self._stderr_file = stderr_file

start_time = time.time()
self._start_time = time.time()
self._local_ip = ray.util.get_node_ip_address()
self.setup(copy.deepcopy(self.config))
setup_time = time.time() - start_time
setup_time = time.time() - self._start_time
if setup_time > SETUP_TIME_THRESHOLD:
logger.info(
"Trainable.setup took {:.3f} seconds. If your "
Expand All @@ -176,8 +175,6 @@ def __init__(
"overheads.".format(setup_time)
)
log_sys_usage = self.config.get("log_sys_usage", False)
self._start_time = start_time
self._warmup_time = None
self._monitor = UtilMonitor(start=log_sys_usage)

self.remote_checkpoint_dir = remote_checkpoint_dir
Expand Down Expand Up @@ -240,6 +237,7 @@ def get_auto_filled_metrics(
self,
now: Optional[datetime] = None,
time_this_iter: Optional[float] = None,
timestamp: Optional[int] = None,
debug_metrics_only: bool = False,
) -> dict:
"""Return a dict with metrics auto-filled by the trainable.
Expand All @@ -252,20 +250,20 @@ def get_auto_filled_metrics(
now = datetime.today()
autofilled = {
TRIAL_ID: self.trial_id,
"experiment_id": self._experiment_id,
"date": now.strftime("%Y-%m-%d_%H-%M-%S"),
"timestamp": int(time.mktime(now.timetuple())),
"timestamp": timestamp if timestamp else int(time.mktime(now.timetuple())),
TIME_THIS_ITER_S: time_this_iter,
TIME_TOTAL_S: self._time_total,
PID: os.getpid(),
HOSTNAME: platform.node(),
NODE_IP: self._local_ip,
"config": self.config,
"time_since_restore": self._time_since_restore,
"timesteps_since_restore": self._timesteps_since_restore,
"iterations_since_restore": self._iterations_since_restore,
"warmup_time": self._warmup_time,
}
if self._timesteps_since_restore:
autofilled["timesteps_since_restore"] = self._timesteps_since_restore

if debug_metrics_only:
autofilled = {k: v for k, v in autofilled.items() if k in DEBUG_METRICS}
return autofilled
Expand Down Expand Up @@ -334,10 +332,6 @@ def train(self):
`time_total_s` (float): Accumulated time in seconds for this
entire experiment.
`experiment_id` (str): Unique string identifier
for this experiment. This id is preserved
across checkpoint / restore calls.
`training_iteration` (int): The index of this
training iteration, e.g. call to train(). This is incremented
after `step()` is called.
Expand All @@ -347,7 +341,7 @@ def train(self):
`date` (str): A formatted date of when the result was processed.
`timestamp` (str): A UNIX timestamp of when the result
was processed.
was processed. This may be overridden.
`hostname` (str): Hostname of the machine hosting the training
process.
Expand All @@ -358,8 +352,6 @@ def train(self):
Returns:
A dict that describes training progress.
"""
if self._warmup_time is None:
self._warmup_time = time.time() - self._start_time
start = time.time()
try:
result = self.step()
Expand All @@ -385,9 +377,11 @@ def train(self):
self._time_total += time_this_iter
self._time_since_restore += time_this_iter

result_timestamp = result.get(TIMESTAMP, None)

result.setdefault(DONE, False)

# self._timesteps_total should only be tracked if increments provided
# self._timesteps_total should only be tracked if increments are provided
if result.get(TIMESTEPS_THIS_ITER) is not None:
if self._timesteps_total is None:
self._timesteps_total = 0
Expand All @@ -401,16 +395,18 @@ def train(self):
self._episodes_total += result[EPISODES_THIS_ITER]

# self._timesteps_total should not override user-provided total
result.setdefault(TIMESTEPS_TOTAL, self._timesteps_total)
result.setdefault(EPISODES_TOTAL, self._episodes_total)
if self._timesteps_total is not None:
result.setdefault(TIMESTEPS_TOTAL, self._timesteps_total)
if self._episodes_total is not None:
result.setdefault(EPISODES_TOTAL, self._episodes_total)
result.setdefault(TRAINING_ITERATION, self._iteration)

# Provides auto-filled neg_mean_loss for avoiding regressions
if result.get("mean_loss"):
result.setdefault("neg_mean_loss", -result["mean_loss"])

now = datetime.today()
result.update(self.get_auto_filled_metrics(now, time_this_iter))
result.update(
self.get_auto_filled_metrics(
now=now, time_this_iter=time_this_iter, timestamp=result_timestamp
)
)

monitor_data = self._monitor.get_data()
if monitor_data:
Expand All @@ -429,7 +425,6 @@ def train(self):

def get_state(self):
return {
"experiment_id": self._experiment_id,
"iteration": self._iteration,
"timesteps_total": self._timesteps_total,
"time_total": self._time_total,
Expand Down Expand Up @@ -774,7 +769,6 @@ def restore(
to_load = os.path.join(checkpoint_dir, relative_checkpoint_path)

# Set metadata
self._experiment_id = metadata["experiment_id"]
self._iteration = metadata["iteration"]
self._timesteps_total = metadata["timesteps_total"]
self._time_total = metadata["time_total"]
Expand Down
3 changes: 2 additions & 1 deletion rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2360,12 +2360,13 @@ def get_auto_filled_metrics(
self,
now: Optional[datetime] = None,
time_this_iter: Optional[float] = None,
timestamp: Optional[int] = None,
debug_metrics_only: bool = False,
) -> dict:
# Override this method to make sure, the `config` key of the returned results
# contains the proper Tune config dict (instead of an AlgorithmConfig object).
auto_filled = super().get_auto_filled_metrics(
now, time_this_iter, debug_metrics_only
now, time_this_iter, timestamp, debug_metrics_only
)
if "config" not in auto_filled:
raise KeyError("`config` key not found in auto-filled results dict!")
Expand Down
2 changes: 0 additions & 2 deletions rllib/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,6 @@ def check_train_results(train_results: PartialAlgorithmConfigDict) -> ResultDict
"episode_reward_max",
"episode_reward_mean",
"episode_reward_min",
"episodes_total",
"hist_stats",
"info",
"iterations_since_restore",
Expand All @@ -646,7 +645,6 @@ def check_train_results(train_results: PartialAlgorithmConfigDict) -> ResultDict
"sampler_perf",
"time_since_restore",
"time_this_iter_s",
"timesteps_since_restore",
"timesteps_total",
"timers",
"time_total_s",
Expand Down

0 comments on commit e71c63f

Please sign in to comment.