From cef62898d5740209d52af838a2e53ab4513806ae Mon Sep 17 00:00:00 2001 From: ark Date: Fri, 23 Apr 2021 21:33:16 +0800 Subject: [PATCH 01/14] Feat: Make offpolicy trainer resumable --- test/discrete/test_dqn.py | 37 +++++++++++++++- tianshou/trainer/offpolicy.py | 80 ++++++++++++++++++++++++++--------- 2 files changed, 97 insertions(+), 20 deletions(-) diff --git a/test/discrete/test_dqn.py b/test/discrete/test_dqn.py index bd88b2b5b..8d29436c1 100644 --- a/test/discrete/test_dqn.py +++ b/test/discrete/test_dqn.py @@ -44,6 +44,7 @@ def get_args(): parser.add_argument( '--save-buffer-name', type=str, default="./expert_DQN_CartPole-v0.pkl") + parser.add_argument("--epoch-per-save", type=int, default=5) parser.add_argument( '--device', type=str, default='cuda' if torch.cuda.is_available() else 'cpu') @@ -97,8 +98,39 @@ def test_dqn(args=get_args()): def save_fn(policy): torch.save(policy.state_dict(), os.path.join(log_path, 'policy.pth')) + def save_train_fn(policy, train_collector, + test_collector, train_log, epoch): + # see also: https://pytorch.org/tutorials/beginner/saving_loading_models.html + torch.save({ + 'epoch': epoch, + 'model_state_dict': policy.state_dict(), + 'optimizer_state_dict': policy.optim.state_dict(), + 'train_log': train_log.state_dict() + }, os.path.join(log_path, 'checkpoint.pth')) + pickle.dump(train_collector.buffer, + open(os.path.join(log_path, 'train_buffer.pkl'), "wb")) + # test collector does not need to be saved + + def load_train_fn(policy, train_collector, + test_collector, train_log): + # or use resume_path + print("Loaded agent from: ", log_path) + if os.path.exists(os.path.join(log_path, 'checkpoint.pth')) \ + and os.path.exists(os.path.join(log_path, 'train_buffer.pkl')): + checkpoint = torch.load( + os.path.join(log_path, 'checkpoint.pth'), map_location=args.device + ) + policy.load_state_dict(checkpoint['model_state_dict']) + policy.optim.load_state_dict(checkpoint['optimizer_state_dict']) + train_log.load_state_dict(checkpoint['train_log']) + buffer = pickle.load(open(os.path.join(log_path, 'train_buffer.pkl'), "rb")) + train_collector._assign_buffer(buffer) + return checkpoint['epoch'] + 1 + return 1 + def stop_fn(mean_rewards): return mean_rewards >= env.spec.reward_threshold + # return False def train_fn(epoch, env_step): # eps annnealing, just a demo @@ -119,7 +151,10 @@ def test_fn(epoch, env_step): policy, train_collector, test_collector, args.epoch, args.step_per_epoch, args.step_per_collect, args.test_num, args.batch_size, update_per_step=args.update_per_step, train_fn=train_fn, - test_fn=test_fn, stop_fn=stop_fn, save_fn=save_fn, logger=logger) + test_fn=test_fn, stop_fn=stop_fn, save_fn=save_fn, logger=logger, + save_train_fn=save_train_fn, load_train_fn=load_train_fn, + epoch_per_save=args.epoch_per_save + ) assert stop_fn(result['best_reward']) diff --git a/tianshou/trainer/offpolicy.py b/tianshou/trainer/offpolicy.py index 444dbc08a..c1a289b3f 100644 --- a/tianshou/trainer/offpolicy.py +++ b/tianshou/trainer/offpolicy.py @@ -10,6 +10,22 @@ from tianshou.trainer import test_episode, gather_info +class TrainLog: + def __init__(self) -> None: + self.best_epoch = 0 + self.best_reward, self.best_reward_std = 0, 0 + self.gradient_step = 0 + self.last_rew, self.last_len = 0.0, 0 + + def state_dict(self): + return self.__dict__ + + def load_state_dict(self, state_dict: dict): + for key in self.__dict__.keys(): + if key in state_dict: + setattr(self, key, state_dict[key]) + + def offpolicy_trainer( policy: BasePolicy, train_collector: Collector, @@ -24,6 +40,13 @@ def offpolicy_trainer( test_fn: Optional[Callable[[int, Optional[int]], None]] = None, stop_fn: Optional[Callable[[float], bool]] = None, save_fn: Optional[Callable[[BasePolicy], None]] = None, + save_train_fn: Optional[ + Callable[[BasePolicy, Collector, Collector, TrainLog, int], None] + ] = None, + load_train_fn: Optional[ + Callable[[BasePolicy, Collector, Collector, TrainLog], int] + ] = None, + epoch_per_save: int = 0, reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, logger: BaseLogger = LazyLogger(), verbose: bool = True, @@ -59,6 +82,11 @@ def offpolicy_trainer( :param function save_fn: a hook called when the undiscounted average mean reward in evaluation phase gets better, with the signature ``f(policy:BasePolicy) -> None``. + :param function save_train_fn: a function to save training process, you can save + whatever you want + :param function load_train_fn: a function called before train start, load whatever + you save, return epoch + :param int epoch_per_save: save train process each ``epoch_per_save`` epoch :param function stop_fn: a function with signature ``f(mean_rewards: float) -> bool``, receives the average undiscounted returns of the testing result, returns a boolean which indicates whether reaching the goal. @@ -75,18 +103,26 @@ def offpolicy_trainer( :return: See :func:`~tianshou.trainer.gather_info`. """ - env_step, gradient_step = 0, 0 - last_rew, last_len = 0.0, 0 + train_log = TrainLog() stat: Dict[str, MovAvg] = defaultdict(MovAvg) start_time = time.time() train_collector.reset_stat() test_collector.reset_stat() test_in_train = test_in_train and train_collector.policy == policy - test_result = test_episode(policy, test_collector, test_fn, 0, episode_per_test, - logger, env_step, reward_metric) - best_epoch = 0 - best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] - for epoch in range(1, 1 + max_epoch): + + if load_train_fn: + epoch = load_train_fn( + policy, train_collector, test_collector, train_log + ) + env_step = (epoch - 1) * step_per_epoch + else: + epoch, env_step = 1, 0 + test_result = test_episode(policy, test_collector, test_fn, 0, + episode_per_test, logger, env_step, reward_metric) + train_log.best_reward, train_log.best_reward_std = \ + test_result["rew"], test_result["rew_std"] + + for epoch in range(epoch, 1 + max_epoch): # train policy.train() with tqdm.tqdm( @@ -101,12 +137,14 @@ def offpolicy_trainer( env_step += int(result["n/st"]) t.update(result["n/st"]) logger.log_train_data(result, env_step) - last_rew = result['rew'] if 'rew' in result else last_rew - last_len = result['len'] if 'len' in result else last_len + train_log.last_rew = \ + result['rew'] if 'rew' in result else train_log.last_rew + train_log.last_len = \ + result['len'] if 'len' in result else train_log.last_len data = { "env_step": str(env_step), - "rew": f"{last_rew:.2f}", - "len": str(int(last_len)), + "rew": f"{train_log.last_rew:.2f}", + "len": str(int(train_log.last_len)), "n/ep": str(int(result["n/ep"])), "n/st": str(int(result["n/st"])), } @@ -125,13 +163,13 @@ def offpolicy_trainer( else: policy.train() for i in range(round(update_per_step * result["n/st"])): - gradient_step += 1 + train_log.gradient_step += 1 losses = policy.update(batch_size, train_collector.buffer) for k in losses.keys(): stat[k].add(losses[k]) losses[k] = stat[k].get() data[k] = f"{losses[k]:.3f}" - logger.log_update_data(losses, gradient_step) + logger.log_update_data(losses, train_log.gradient_step) t.set_postfix(**data) if t.n <= t.total: t.update() @@ -139,16 +177,20 @@ def offpolicy_trainer( test_result = test_episode(policy, test_collector, test_fn, epoch, episode_per_test, logger, env_step, reward_metric) rew, rew_std = test_result["rew"], test_result["rew_std"] - if best_epoch == -1 or best_reward < rew: - best_reward, best_reward_std = rew, rew_std - best_epoch = epoch + if train_log.best_epoch == -1 or train_log.best_reward < rew: + train_log.best_reward, train_log.best_reward_std = rew, rew_std + train_log.best_epoch = epoch if save_fn: save_fn(policy) + if epoch_per_save > 0 and epoch % epoch_per_save == 0 and save_train_fn: + save_train_fn(policy, train_collector, test_collector, + train_log, epoch) if verbose: print( f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_reward:" - f" {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") - if stop_fn and stop_fn(best_reward): + f" {train_log.best_reward:.6f} ± {train_log.best_reward_std:.6f}" + f" in #{train_log.best_epoch}") + if stop_fn and stop_fn(train_log.best_reward): break return gather_info(start_time, train_collector, test_collector, - best_reward, best_reward_std) + train_log.best_reward, train_log.best_reward_std) From 93a5d1aec55f0cfa442846b9d2dfd4d0d8ad41cb Mon Sep 17 00:00:00 2001 From: Trinkle23897 Date: Sun, 25 Apr 2021 14:17:21 +0800 Subject: [PATCH 02/14] refact --- docs/tutorials/cheatsheet.rst | 25 ++++++++++ test/discrete/test_c51.py | 39 ++++++++++++++- test/discrete/test_dqn.py | 37 +------------- tianshou/trainer/offpolicy.py | 93 +++++++++++++---------------------- tianshou/trainer/utils.py | 2 +- tianshou/utils/log_tools.py | 56 +++++++++++++++++++-- 6 files changed, 151 insertions(+), 101 deletions(-) diff --git a/docs/tutorials/cheatsheet.rst b/docs/tutorials/cheatsheet.rst index f71d34cd5..8cfe24ec1 100644 --- a/docs/tutorials/cheatsheet.rst +++ b/docs/tutorials/cheatsheet.rst @@ -30,6 +30,31 @@ Customize Training Process See :ref:`customized_trainer`. +.. _resume_training: + +Resume Training Process +----------------------- + +This is related to `Issue 349 `_. + +To resume training process from an existing checkpoint, you need to do the following things in the training process: + +1. Make sure you write ``save_train_fn`` which saves everything needed in the training process, i.e., policy, optim, buffer; pass it to trainer; +2. Use ``BasicLogger`` which contains a tensorboard; + +And to successfully resume from a checkpoint: + +1. Load everything needed in the training process **before trainer initialization**, i.e., policy, optim, buffer; +2. set ``resume_from_log=True`` with trainer; + +We provide an example to show how these steps work: checkout `test_c51.py `_ by running + +.. code-block:: bash + + $ python3 test/discrete/test_c51.py # train some epoch + $ python3 test/discrete/test_c51.py --resume # restore from existing log and continuing training + + .. _parallel_sampling: Parallel Sampling diff --git a/test/discrete/test_c51.py b/test/discrete/test_c51.py index 1d0c4cc0a..8701f38ea 100644 --- a/test/discrete/test_c51.py +++ b/test/discrete/test_c51.py @@ -1,6 +1,7 @@ import os import gym import torch +import pickle import pprint import argparse import numpy as np @@ -43,6 +44,7 @@ def get_args(): action="store_true", default=False) parser.add_argument('--alpha', type=float, default=0.6) parser.add_argument('--beta', type=float, default=0.4) + parser.add_argument('--resume', action="store_true") parser.add_argument( '--device', type=str, default='cuda' if torch.cuda.is_available() else 'cpu') @@ -112,14 +114,44 @@ def train_fn(epoch, env_step): def test_fn(epoch, env_step): policy.set_eps(args.eps_test) + def save_train_fn(epoch, env_step, gradient_step): + # see also: https://pytorch.org/tutorials/beginner/saving_loading_models.html + torch.save({ + 'model': policy.state_dict(), + 'optim': policy.optim.state_dict(), + }, os.path.join(log_path, 'checkpoint.pth')) + pickle.dump(train_collector.buffer, + open(os.path.join(log_path, 'train_buffer.pkl'), "wb")) + + if args.resume: + # load from existing checkpoint + print(f"Loading agent under {log_path}") + ckpt_path = os.path.join(log_path, 'checkpoint.pth') + if os.path.exists(ckpt_path): + checkpoint = torch.load(ckpt_path, map_location=args.device) + policy.load_state_dict(checkpoint['model']) + policy.optim.load_state_dict(checkpoint['optim']) + print("Successfully restore policy and optim.") + else: + print("Fail to restore policy and optim.") + buffer_path = os.path.join(log_path, 'train_buffer.pkl') + if os.path.exists(buffer_path): + buffer = pickle.load(open(buffer_path, "rb")) + train_collector._assign_buffer(buffer) + print("Successfully restore buffer.") + else: + print("Fail to restore buffer.") + # trainer result = offpolicy_trainer( policy, train_collector, test_collector, args.epoch, args.step_per_epoch, args.step_per_collect, args.test_num, args.batch_size, update_per_step=args.update_per_step, train_fn=train_fn, - test_fn=test_fn, stop_fn=stop_fn, save_fn=save_fn, logger=logger) + test_fn=test_fn, stop_fn=stop_fn, save_fn=save_fn, logger=logger, + resume_from_log=args.resume, save_train_fn=save_train_fn) assert stop_fn(result['best_reward']) + if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! @@ -132,6 +164,11 @@ def test_fn(epoch, env_step): print(f"Final reward: {rews.mean()}, length: {lens.mean()}") +def test_c51_resume(args=get_args()): + args.resume = True + test_c51(args) + + def test_pc51(args=get_args()): args.prioritized_replay = True args.gamma = .95 diff --git a/test/discrete/test_dqn.py b/test/discrete/test_dqn.py index 8d29436c1..bd88b2b5b 100644 --- a/test/discrete/test_dqn.py +++ b/test/discrete/test_dqn.py @@ -44,7 +44,6 @@ def get_args(): parser.add_argument( '--save-buffer-name', type=str, default="./expert_DQN_CartPole-v0.pkl") - parser.add_argument("--epoch-per-save", type=int, default=5) parser.add_argument( '--device', type=str, default='cuda' if torch.cuda.is_available() else 'cpu') @@ -98,39 +97,8 @@ def test_dqn(args=get_args()): def save_fn(policy): torch.save(policy.state_dict(), os.path.join(log_path, 'policy.pth')) - def save_train_fn(policy, train_collector, - test_collector, train_log, epoch): - # see also: https://pytorch.org/tutorials/beginner/saving_loading_models.html - torch.save({ - 'epoch': epoch, - 'model_state_dict': policy.state_dict(), - 'optimizer_state_dict': policy.optim.state_dict(), - 'train_log': train_log.state_dict() - }, os.path.join(log_path, 'checkpoint.pth')) - pickle.dump(train_collector.buffer, - open(os.path.join(log_path, 'train_buffer.pkl'), "wb")) - # test collector does not need to be saved - - def load_train_fn(policy, train_collector, - test_collector, train_log): - # or use resume_path - print("Loaded agent from: ", log_path) - if os.path.exists(os.path.join(log_path, 'checkpoint.pth')) \ - and os.path.exists(os.path.join(log_path, 'train_buffer.pkl')): - checkpoint = torch.load( - os.path.join(log_path, 'checkpoint.pth'), map_location=args.device - ) - policy.load_state_dict(checkpoint['model_state_dict']) - policy.optim.load_state_dict(checkpoint['optimizer_state_dict']) - train_log.load_state_dict(checkpoint['train_log']) - buffer = pickle.load(open(os.path.join(log_path, 'train_buffer.pkl'), "rb")) - train_collector._assign_buffer(buffer) - return checkpoint['epoch'] + 1 - return 1 - def stop_fn(mean_rewards): return mean_rewards >= env.spec.reward_threshold - # return False def train_fn(epoch, env_step): # eps annnealing, just a demo @@ -151,10 +119,7 @@ def test_fn(epoch, env_step): policy, train_collector, test_collector, args.epoch, args.step_per_epoch, args.step_per_collect, args.test_num, args.batch_size, update_per_step=args.update_per_step, train_fn=train_fn, - test_fn=test_fn, stop_fn=stop_fn, save_fn=save_fn, logger=logger, - save_train_fn=save_train_fn, load_train_fn=load_train_fn, - epoch_per_save=args.epoch_per_save - ) + test_fn=test_fn, stop_fn=stop_fn, save_fn=save_fn, logger=logger) assert stop_fn(result['best_reward']) diff --git a/tianshou/trainer/offpolicy.py b/tianshou/trainer/offpolicy.py index c1a289b3f..b96b8039a 100644 --- a/tianshou/trainer/offpolicy.py +++ b/tianshou/trainer/offpolicy.py @@ -6,24 +6,8 @@ from tianshou.data import Collector from tianshou.policy import BasePolicy -from tianshou.utils import tqdm_config, MovAvg, BaseLogger, LazyLogger from tianshou.trainer import test_episode, gather_info - - -class TrainLog: - def __init__(self) -> None: - self.best_epoch = 0 - self.best_reward, self.best_reward_std = 0, 0 - self.gradient_step = 0 - self.last_rew, self.last_len = 0.0, 0 - - def state_dict(self): - return self.__dict__ - - def load_state_dict(self, state_dict: dict): - for key in self.__dict__.keys(): - if key in state_dict: - setattr(self, key, state_dict[key]) +from tianshou.utils import tqdm_config, MovAvg, BaseLogger, LazyLogger def offpolicy_trainer( @@ -40,13 +24,9 @@ def offpolicy_trainer( test_fn: Optional[Callable[[int, Optional[int]], None]] = None, stop_fn: Optional[Callable[[float], bool]] = None, save_fn: Optional[Callable[[BasePolicy], None]] = None, - save_train_fn: Optional[ - Callable[[BasePolicy, Collector, Collector, TrainLog, int], None] - ] = None, - load_train_fn: Optional[ - Callable[[BasePolicy, Collector, Collector, TrainLog], int] - ] = None, - epoch_per_save: int = 0, + save_train_fn: Optional[Callable[[int, int, int], None]] = None, + resume_from_log: bool = False, + epoch_per_save: int = 1, reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, logger: BaseLogger = LazyLogger(), verbose: bool = True, @@ -80,13 +60,15 @@ def offpolicy_trainer( It can be used to perform custom additional operations, with the signature ``f( num_epoch: int, step_idx: int) -> None``. :param function save_fn: a hook called when the undiscounted average mean reward in - evaluation phase gets better, with the signature ``f(policy:BasePolicy) -> + evaluation phase gets better, with the signature ``f(policy: BasePolicy) -> None``. - :param function save_train_fn: a function to save training process, you can save - whatever you want - :param function load_train_fn: a function called before train start, load whatever - you save, return epoch - :param int epoch_per_save: save train process each ``epoch_per_save`` epoch + :param function save_train_fn: a function to save training process, with the + signature ``f(epoch: int, env_step: int, gradient_step: int) -> None``; you can + save whatever you want. + :param int epoch_per_save: save train process each ``epoch_per_save`` epoch by + calling ``save_train_fn``. Default to 1. + :param bool resume_from_log: resume env_step/gradient_step and other metadata from + existing tensorboard log. Default to False. :param function stop_fn: a function with signature ``f(mean_rewards: float) -> bool``, receives the average undiscounted returns of the testing result, returns a boolean which indicates whether reaching the goal. @@ -103,26 +85,23 @@ def offpolicy_trainer( :return: See :func:`~tianshou.trainer.gather_info`. """ - train_log = TrainLog() + start_epoch, env_step, gradient_step = 0, 0, 0 + last_rew, last_len = 0.0, 0 stat: Dict[str, MovAvg] = defaultdict(MovAvg) start_time = time.time() train_collector.reset_stat() test_collector.reset_stat() test_in_train = test_in_train and train_collector.policy == policy - - if load_train_fn: - epoch = load_train_fn( - policy, train_collector, test_collector, train_log - ) - env_step = (epoch - 1) * step_per_epoch + if resume_from_log: + best_epoch, best_reward, best_reward_std, start_epoch, env_step, \ + gradient_step, last_rew, last_len = logger.restore_data() else: - epoch, env_step = 1, 0 test_result = test_episode(policy, test_collector, test_fn, 0, episode_per_test, logger, env_step, reward_metric) - train_log.best_reward, train_log.best_reward_std = \ - test_result["rew"], test_result["rew_std"] + best_epoch = 0 + best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] - for epoch in range(epoch, 1 + max_epoch): + for epoch in range(1 + start_epoch, 1 + max_epoch): # train policy.train() with tqdm.tqdm( @@ -137,14 +116,12 @@ def offpolicy_trainer( env_step += int(result["n/st"]) t.update(result["n/st"]) logger.log_train_data(result, env_step) - train_log.last_rew = \ - result['rew'] if 'rew' in result else train_log.last_rew - train_log.last_len = \ - result['len'] if 'len' in result else train_log.last_len + last_rew = result['rew'] if 'rew' in result else last_rew + last_len = result['len'] if 'len' in result else last_len data = { "env_step": str(env_step), - "rew": f"{train_log.last_rew:.2f}", - "len": str(int(train_log.last_len)), + "rew": f"{last_rew:.2f}", + "len": str(int(last_len)), "n/ep": str(int(result["n/ep"])), "n/st": str(int(result["n/st"])), } @@ -163,13 +140,13 @@ def offpolicy_trainer( else: policy.train() for i in range(round(update_per_step * result["n/st"])): - train_log.gradient_step += 1 + gradient_step += 1 losses = policy.update(batch_size, train_collector.buffer) for k in losses.keys(): stat[k].add(losses[k]) losses[k] = stat[k].get() data[k] = f"{losses[k]:.3f}" - logger.log_update_data(losses, train_log.gradient_step) + logger.log_update_data(losses, gradient_step) t.set_postfix(**data) if t.n <= t.total: t.update() @@ -177,20 +154,16 @@ def offpolicy_trainer( test_result = test_episode(policy, test_collector, test_fn, epoch, episode_per_test, logger, env_step, reward_metric) rew, rew_std = test_result["rew"], test_result["rew_std"] - if train_log.best_epoch == -1 or train_log.best_reward < rew: - train_log.best_reward, train_log.best_reward_std = rew, rew_std - train_log.best_epoch = epoch + if best_epoch < 0 or best_reward < rew: + best_epoch, best_reward, best_reward_std = epoch, rew, rew_std if save_fn: save_fn(policy) if epoch_per_save > 0 and epoch % epoch_per_save == 0 and save_train_fn: - save_train_fn(policy, train_collector, test_collector, - train_log, epoch) + save_train_fn(epoch, env_step, gradient_step) if verbose: - print( - f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_reward:" - f" {train_log.best_reward:.6f} ± {train_log.best_reward_std:.6f}" - f" in #{train_log.best_epoch}") - if stop_fn and stop_fn(train_log.best_reward): + print(f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_rew" + f"ard: {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") + if stop_fn and stop_fn(best_reward): break return gather_info(start_time, train_collector, test_collector, - train_log.best_reward, train_log.best_reward_std) + best_reward, best_reward_std) diff --git a/tianshou/trainer/utils.py b/tianshou/trainer/utils.py index 2e729feeb..46df58d5e 100644 --- a/tianshou/trainer/utils.py +++ b/tianshou/trainer/utils.py @@ -27,7 +27,7 @@ def test_episode( if reward_metric: result["rews"] = reward_metric(result["rews"]) if logger and global_step is not None: - logger.log_test_data(result, global_step) + logger.log_test_data(result, global_step, epoch) return result diff --git a/tianshou/utils/log_tools.py b/tianshou/utils/log_tools.py index fcd1d5575..5062f0081 100644 --- a/tianshou/utils/log_tools.py +++ b/tianshou/utils/log_tools.py @@ -1,8 +1,10 @@ +import warnings import numpy as np from numbers import Number -from typing import Any, Union from abc import ABC, abstractmethod +from typing import Any, Tuple, Union from torch.utils.tensorboard import SummaryWriter +from tensorboard.backend.event_processing import event_accumulator class BaseLogger(ABC): @@ -42,15 +44,29 @@ def log_update_data(self, update_result: dict, step: int) -> None: """ pass - def log_test_data(self, collect_result: dict, step: int) -> None: + def log_test_data(self, collect_result: dict, step: int, epoch: int) -> None: """Use writer to log statistics generated during evaluating. :param collect_result: a dict containing information of data collected in evaluating stage, i.e., returns of collector.collect(). :param int step: stands for the timestep the collect_result being logged. + :param int epoch: stands for the epoch the collect_result being logged. """ pass + def restore_data(self) -> Tuple[int, float, float, int, int, int, float, int]: + """Return the metadata from existing log. + + If it finds nothing or an error occurs during the recover process, it will + return the default parameters. + + :return: best_epoch, best_reward, best_reward_std, epoch, env_step, + gradient_step, last_rew, last_len + """ + warnings.warn("Please specify an existing tensorboard logdir to resume.") + # epoch == -1 is invalid, so that it should be forcely updated by trainer + return -1, 0.0, 0.0, 0, 0, 0, 0.0, 0 + class BasicLogger(BaseLogger): """A loggger that relies on tensorboard SummaryWriter by default to visualize \ @@ -70,6 +86,7 @@ def __init__( train_interval: int = 1, test_interval: int = 1, update_interval: int = 1000, + resume: bool = True, ) -> None: super().__init__(writer) self.train_interval = train_interval @@ -104,12 +121,13 @@ def log_train_data(self, collect_result: dict, step: int) -> None: self.write("train/len", step, collect_result["len"]) self.last_log_train_step = step - def log_test_data(self, collect_result: dict, step: int) -> None: + def log_test_data(self, collect_result: dict, step: int, epoch: int) -> None: """Use writer to log statistics generated during evaluating. :param collect_result: a dict containing information of data collected in evaluating stage, i.e., returns of collector.collect(). :param int step: stands for the timestep the collect_result being logged. + :param int epoch: stands for the epoch the collect_result being logged. .. note:: @@ -121,6 +139,7 @@ def log_test_data(self, collect_result: dict, step: int) -> None: rew, rew_std, len_, len_std = rews.mean(), rews.std(), lens.mean(), lens.std() collect_result.update(rew=rew, rew_std=rew_std, len=len_, len_std=len_std) if step - self.last_log_test_step >= self.test_interval: + self.write("test/epoch", step, epoch) # type: ignore self.write("test/rew", step, rew) self.write("test/len", step, len_) self.write("test/rew_std", step, rew_std) @@ -129,10 +148,41 @@ def log_test_data(self, collect_result: dict, step: int) -> None: def log_update_data(self, update_result: dict, step: int) -> None: if step - self.last_log_update_step >= self.update_interval: + self.write("train/gradient_step", step, step) # type: ignore for k, v in update_result.items(): self.write(k, step, v) self.last_log_update_step = step + def restore_data(self) -> Tuple[int, float, float, int, int, int, float, int]: + ea = event_accumulator.EventAccumulator(self.writer.log_dir) + ea.Reload() + epoch, best_epoch, best_reward, best_reward_std = 0, -1, 0.0, 0.0 + try: # best_* + for test_rew, test_rew_std in zip( + ea.scalars.Items("test/rew"), ea.scalars.Items("test/rew_std") + ): + rew, rew_std = test_rew.value, test_rew_std.value + if best_epoch == -1 or best_reward < rew: + best_epoch, best_reward, best_reward_std = 0, rew, rew_std + self.last_log_test_step = test_rew.step + epoch = int(ea.scalars.Items("test/epoch")[-1].value) + except KeyError: + pass + try: # env_step / last_* + item = ea.scalars.Items("train/rew")[-1] + self.last_log_train_step = env_step = item.step + last_rew = item.value + last_len = ea.scalars.Items("train/len")[-1].value + except KeyError: + last_rew, last_len, env_step = 0.0, 0, 0 + try: + self.last_log_update_step = gradient_step = int(ea.scalars.Items( + "train/gradient_step")[-1].value) + except KeyError: + gradient_step = 0 + return best_epoch, best_reward, best_reward_std, \ + epoch, env_step, gradient_step, last_rew, last_len + class LazyLogger(BasicLogger): """A loggger that does nothing. Used as the placeholder in trainer.""" From ec169928dcf987e56a729417b5e2c4416229b395 Mon Sep 17 00:00:00 2001 From: Trinkle23897 Date: Sun, 25 Apr 2021 14:37:29 +0800 Subject: [PATCH 03/14] onpolicy trainer resumable --- docs/tutorials/cheatsheet.rst | 2 +- test/continuous/test_ppo.py | 28 ++++++++++++++++++++++- tianshou/trainer/offpolicy.py | 2 ++ tianshou/trainer/onpolicy.py | 43 ++++++++++++++++++++++++----------- 4 files changed, 60 insertions(+), 15 deletions(-) diff --git a/docs/tutorials/cheatsheet.rst b/docs/tutorials/cheatsheet.rst index 8cfe24ec1..cc29b1c25 100644 --- a/docs/tutorials/cheatsheet.rst +++ b/docs/tutorials/cheatsheet.rst @@ -47,7 +47,7 @@ And to successfully resume from a checkpoint: 1. Load everything needed in the training process **before trainer initialization**, i.e., policy, optim, buffer; 2. set ``resume_from_log=True`` with trainer; -We provide an example to show how these steps work: checkout `test_c51.py `_ by running +We provide an example to show how these steps work: checkout `test_c51.py `_ or `test_ppo.py `_ by running .. code-block:: bash diff --git a/test/continuous/test_ppo.py b/test/continuous/test_ppo.py index b661bb756..af8e2a867 100644 --- a/test/continuous/test_ppo.py +++ b/test/continuous/test_ppo.py @@ -47,6 +47,7 @@ def get_args(): parser.add_argument('--value-clip', type=int, default=1) parser.add_argument('--norm-adv', type=int, default=1) parser.add_argument('--recompute-adv', type=int, default=0) + parser.add_argument('--resume', action="store_true") args = parser.parse_known_args()[0] return args @@ -122,13 +123,33 @@ def save_fn(policy): def stop_fn(mean_rewards): return mean_rewards >= env.spec.reward_threshold + def save_train_fn(epoch, env_step, gradient_step): + # see also: https://pytorch.org/tutorials/beginner/saving_loading_models.html + torch.save({ + 'model': policy.state_dict(), + 'optim': policy.optim.state_dict(), + }, os.path.join(log_path, 'checkpoint.pth')) + + if args.resume: + # load from existing checkpoint + print(f"Loading agent under {log_path}") + ckpt_path = os.path.join(log_path, 'checkpoint.pth') + if os.path.exists(ckpt_path): + checkpoint = torch.load(ckpt_path, map_location=args.device) + policy.load_state_dict(checkpoint['model']) + policy.optim.load_state_dict(checkpoint['optim']) + print("Successfully restore policy and optim.") + else: + print("Fail to restore policy and optim.") + # trainer result = onpolicy_trainer( policy, train_collector, test_collector, args.epoch, args.step_per_epoch, args.repeat_per_collect, args.test_num, args.batch_size, episode_per_collect=args.episode_per_collect, stop_fn=stop_fn, save_fn=save_fn, - logger=logger) + logger=logger, resume_from_log=args.resume, save_train_fn=save_train_fn) assert stop_fn(result['best_reward']) + if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! @@ -140,5 +161,10 @@ def stop_fn(mean_rewards): print(f"Final reward: {rews.mean()}, length: {lens.mean()}") +def test_ppo_resume(args=get_args()): + args.resume = True + test_ppo(args) + + if __name__ == '__main__': test_ppo() diff --git a/tianshou/trainer/offpolicy.py b/tianshou/trainer/offpolicy.py index b96b8039a..05934375b 100644 --- a/tianshou/trainer/offpolicy.py +++ b/tianshou/trainer/offpolicy.py @@ -133,6 +133,8 @@ def offpolicy_trainer( if stop_fn(test_result["rew"]): if save_fn: save_fn(policy) + if save_train_fn: + save_train_fn(epoch, env_step, gradient_step) t.set_postfix(**data) return gather_info( start_time, train_collector, test_collector, diff --git a/tianshou/trainer/onpolicy.py b/tianshou/trainer/onpolicy.py index e396295d0..1e3a55648 100644 --- a/tianshou/trainer/onpolicy.py +++ b/tianshou/trainer/onpolicy.py @@ -6,8 +6,8 @@ from tianshou.data import Collector from tianshou.policy import BasePolicy -from tianshou.utils import tqdm_config, MovAvg, BaseLogger, LazyLogger from tianshou.trainer import test_episode, gather_info +from tianshou.utils import tqdm_config, MovAvg, BaseLogger, LazyLogger def onpolicy_trainer( @@ -25,6 +25,9 @@ def onpolicy_trainer( test_fn: Optional[Callable[[int, Optional[int]], None]] = None, stop_fn: Optional[Callable[[float], bool]] = None, save_fn: Optional[Callable[[BasePolicy], None]] = None, + save_train_fn: Optional[Callable[[int, int, int], None]] = None, + resume_from_log: bool = False, + epoch_per_save: int = 1, reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, logger: BaseLogger = LazyLogger(), verbose: bool = True, @@ -61,6 +64,13 @@ def onpolicy_trainer( :param function save_fn: a hook called when the undiscounted average mean reward in evaluation phase gets better, with the signature ``f(policy: BasePolicy) -> None``. + :param function save_train_fn: a function to save training process, with the + signature ``f(epoch: int, env_step: int, gradient_step: int) -> None``; you can + save whatever you want. + :param int epoch_per_save: save train process each ``epoch_per_save`` epoch by + calling ``save_train_fn``. Default to 1. + :param bool resume_from_log: resume env_step/gradient_step and other metadata from + existing tensorboard log. Default to False. :param function stop_fn: a function with signature ``f(mean_rewards: float) -> bool``, receives the average undiscounted returns of the testing result, returns a boolean which indicates whether reaching the goal. @@ -81,18 +91,23 @@ def onpolicy_trainer( Only either one of step_per_collect and episode_per_collect can be specified. """ - env_step, gradient_step = 0, 0 + start_epoch, env_step, gradient_step = 0, 0, 0 last_rew, last_len = 0.0, 0 stat: Dict[str, MovAvg] = defaultdict(MovAvg) start_time = time.time() train_collector.reset_stat() test_collector.reset_stat() test_in_train = test_in_train and train_collector.policy == policy - test_result = test_episode(policy, test_collector, test_fn, 0, episode_per_test, - logger, env_step, reward_metric) - best_epoch = 0 - best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] - for epoch in range(1, 1 + max_epoch): + if resume_from_log: + best_epoch, best_reward, best_reward_std, start_epoch, env_step, \ + gradient_step, last_rew, last_len = logger.restore_data() + else: + test_result = test_episode(policy, test_collector, test_fn, 0, + episode_per_test, logger, env_step, reward_metric) + best_epoch = 0 + best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] + + for epoch in range(1 + start_epoch, 1 + max_epoch): # train policy.train() with tqdm.tqdm( @@ -125,6 +140,8 @@ def onpolicy_trainer( if stop_fn(test_result["rew"]): if save_fn: save_fn(policy) + if save_train_fn: + save_train_fn(epoch, env_step, gradient_step) t.set_postfix(**data) return gather_info( start_time, train_collector, test_collector, @@ -150,15 +167,15 @@ def onpolicy_trainer( test_result = test_episode(policy, test_collector, test_fn, epoch, episode_per_test, logger, env_step, reward_metric) rew, rew_std = test_result["rew"], test_result["rew_std"] - if best_epoch == -1 or best_reward < rew: - best_reward, best_reward_std = rew, rew_std - best_epoch = epoch + if best_epoch < 0 or best_reward < rew: + best_epoch, best_reward, best_reward_std = epoch, rew, rew_std if save_fn: save_fn(policy) + if epoch_per_save > 0 and epoch % epoch_per_save == 0 and save_train_fn: + save_train_fn(epoch, env_step, gradient_step) if verbose: - print( - f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_reward:" - f" {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") + print(f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_rew" + f"ard: {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") if stop_fn and stop_fn(best_reward): break return gather_info(start_time, train_collector, test_collector, From a09cf515c90a9e4217c4a9b193926c4db5a82550 Mon Sep 17 00:00:00 2001 From: Trinkle23897 Date: Sun, 25 Apr 2021 15:15:45 +0800 Subject: [PATCH 04/14] offline --- docs/tutorials/cheatsheet.rst | 6 +++--- test/continuous/test_ppo.py | 2 +- test/discrete/test_c51.py | 5 ++--- test/discrete/test_il_bcq.py | 28 +++++++++++++++++++++++- tianshou/trainer/offline.py | 40 ++++++++++++++++++++++++----------- tianshou/trainer/offpolicy.py | 2 +- tianshou/trainer/onpolicy.py | 2 +- 7 files changed, 63 insertions(+), 22 deletions(-) diff --git a/docs/tutorials/cheatsheet.rst b/docs/tutorials/cheatsheet.rst index cc29b1c25..7a0937099 100644 --- a/docs/tutorials/cheatsheet.rst +++ b/docs/tutorials/cheatsheet.rst @@ -45,11 +45,11 @@ To resume training process from an existing checkpoint, you need to do the follo And to successfully resume from a checkpoint: 1. Load everything needed in the training process **before trainer initialization**, i.e., policy, optim, buffer; -2. set ``resume_from_log=True`` with trainer; +2. Set ``resume_from_log=True`` with trainer; -We provide an example to show how these steps work: checkout `test_c51.py `_ or `test_ppo.py `_ by running +We provide an example to show how these steps work: checkout `test_c51.py `_, `test_ppo.py `_ or `test_il_bcq.py `_ by running -.. code-block:: bash +.. code-block:: console $ python3 test/discrete/test_c51.py # train some epoch $ python3 test/discrete/test_c51.py --resume # restore from existing log and continuing training diff --git a/test/continuous/test_ppo.py b/test/continuous/test_ppo.py index af8e2a867..1ea2a56b5 100644 --- a/test/continuous/test_ppo.py +++ b/test/continuous/test_ppo.py @@ -127,7 +127,7 @@ def save_train_fn(epoch, env_step, gradient_step): # see also: https://pytorch.org/tutorials/beginner/saving_loading_models.html torch.save({ 'model': policy.state_dict(), - 'optim': policy.optim.state_dict(), + 'optim': optim.state_dict(), }, os.path.join(log_path, 'checkpoint.pth')) if args.resume: diff --git a/test/discrete/test_c51.py b/test/discrete/test_c51.py index 8701f38ea..e475d430c 100644 --- a/test/discrete/test_c51.py +++ b/test/discrete/test_c51.py @@ -118,7 +118,7 @@ def save_train_fn(epoch, env_step, gradient_step): # see also: https://pytorch.org/tutorials/beginner/saving_loading_models.html torch.save({ 'model': policy.state_dict(), - 'optim': policy.optim.state_dict(), + 'optim': optim.state_dict(), }, os.path.join(log_path, 'checkpoint.pth')) pickle.dump(train_collector.buffer, open(os.path.join(log_path, 'train_buffer.pkl'), "wb")) @@ -136,8 +136,7 @@ def save_train_fn(epoch, env_step, gradient_step): print("Fail to restore policy and optim.") buffer_path = os.path.join(log_path, 'train_buffer.pkl') if os.path.exists(buffer_path): - buffer = pickle.load(open(buffer_path, "rb")) - train_collector._assign_buffer(buffer) + train_collector.buffer = pickle.load(open(buffer_path, "rb")) print("Successfully restore buffer.") else: print("Fail to restore buffer.") diff --git a/test/discrete/test_il_bcq.py b/test/discrete/test_il_bcq.py index c3bebb53a..4b5362c45 100644 --- a/test/discrete/test_il_bcq.py +++ b/test/discrete/test_il_bcq.py @@ -42,6 +42,7 @@ def get_args(): "--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu", ) + parser.add_argument("--resume", action="store_true") args = parser.parse_known_args()[0] return args @@ -93,10 +94,30 @@ def save_fn(policy): def stop_fn(mean_rewards): return mean_rewards >= env.spec.reward_threshold + def save_train_fn(epoch, env_step, gradient_step): + # see also: https://pytorch.org/tutorials/beginner/saving_loading_models.html + torch.save({ + 'model': policy.state_dict(), + 'optim': optim.state_dict(), + }, os.path.join(log_path, 'checkpoint.pth')) + + if args.resume: + # load from existing checkpoint + print(f"Loading agent under {log_path}") + ckpt_path = os.path.join(log_path, 'checkpoint.pth') + if os.path.exists(ckpt_path): + checkpoint = torch.load(ckpt_path, map_location=args.device) + policy.load_state_dict(checkpoint['model']) + # optim.load_state_dict(checkpoint['optim']) # don't know why + print("Successfully restore policy and optim.") + else: + print("Fail to restore policy and optim.") + result = offline_trainer( policy, buffer, test_collector, args.epoch, args.update_per_epoch, args.test_num, args.batch_size, - stop_fn=stop_fn, save_fn=save_fn, logger=logger) + stop_fn=stop_fn, save_fn=save_fn, logger=logger, + resume_from_log=args.resume, save_train_fn=save_train_fn) assert stop_fn(result['best_reward']) @@ -112,5 +133,10 @@ def stop_fn(mean_rewards): print(f"Final reward: {rews.mean()}, length: {lens.mean()}") +def test_discrete_bcq_resume(args=get_args()): + args.resume = True + test_discrete_bcq(args) + + if __name__ == "__main__": test_discrete_bcq(get_args()) diff --git a/tianshou/trainer/offline.py b/tianshou/trainer/offline.py index 802e34963..06fc44a87 100644 --- a/tianshou/trainer/offline.py +++ b/tianshou/trainer/offline.py @@ -21,6 +21,9 @@ def offline_trainer( test_fn: Optional[Callable[[int, Optional[int]], None]] = None, stop_fn: Optional[Callable[[float], bool]] = None, save_fn: Optional[Callable[[BasePolicy], None]] = None, + save_train_fn: Optional[Callable[[int, int, int], None]] = None, + resume_from_log: bool = False, + epoch_per_save: int = 1, reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, logger: BaseLogger = LazyLogger(), verbose: bool = True, @@ -44,6 +47,14 @@ def offline_trainer( :param function save_fn: a hook called when the undiscounted average mean reward in evaluation phase gets better, with the signature ``f(policy: BasePolicy) -> None``. + :param function save_train_fn: a function to save training process, with the + signature ``f(epoch: int, env_step: int, gradient_step: int) -> None``; you can + save whatever you want. Because offline-RL doesn't have env_step, the env_step + is always 0 here. + :param int epoch_per_save: save train process each ``epoch_per_save`` epoch by + calling ``save_train_fn``. Default to 1. + :param bool resume_from_log: resume gradient_step and other metadata from existing + tensorboard log. Default to False. :param function stop_fn: a function with signature ``f(mean_rewards: float) -> bool``, receives the average undiscounted returns of the testing result, returns a boolean which indicates whether reaching the goal. @@ -59,15 +70,20 @@ def offline_trainer( :return: See :func:`~tianshou.trainer.gather_info`. """ - gradient_step = 0 + start_epoch, gradient_step = 0, 0 stat: Dict[str, MovAvg] = defaultdict(MovAvg) start_time = time.time() test_collector.reset_stat() - test_result = test_episode(policy, test_collector, test_fn, 0, episode_per_test, - logger, gradient_step, reward_metric) - best_epoch = 0 - best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] - for epoch in range(1, 1 + max_epoch): + if resume_from_log: + best_epoch, best_reward, best_reward_std, start_epoch, _, \ + gradient_step, _, _ = logger.restore_data() + else: + test_result = test_episode(policy, test_collector, test_fn, 0, + episode_per_test, logger, 0, reward_metric) + best_epoch = 0 + best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] + + for epoch in range(1 + start_epoch, 1 + max_epoch): policy.train() with tqdm.trange( update_per_epoch, desc=f"Epoch #{epoch}", **tqdm_config @@ -87,15 +103,15 @@ def offline_trainer( policy, test_collector, test_fn, epoch, episode_per_test, logger, gradient_step, reward_metric) rew, rew_std = test_result["rew"], test_result["rew_std"] - if best_epoch == -1 or best_reward < rew: - best_reward, best_reward_std = rew, rew_std - best_epoch = epoch + if best_epoch < 0 or best_reward < rew: + best_epoch, best_reward, best_reward_std = epoch, rew, rew_std if save_fn: save_fn(policy) + if epoch_per_save > 0 and epoch % epoch_per_save == 0 and save_train_fn: + save_train_fn(epoch, 0, gradient_step) if verbose: - print( - f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_reward:" - f" {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") + print(f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_rew" + f"ard: {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") if stop_fn and stop_fn(best_reward): break return gather_info(start_time, None, test_collector, best_reward, best_reward_std) diff --git a/tianshou/trainer/offpolicy.py b/tianshou/trainer/offpolicy.py index 05934375b..da3560ae7 100644 --- a/tianshou/trainer/offpolicy.py +++ b/tianshou/trainer/offpolicy.py @@ -97,7 +97,7 @@ def offpolicy_trainer( gradient_step, last_rew, last_len = logger.restore_data() else: test_result = test_episode(policy, test_collector, test_fn, 0, - episode_per_test, logger, env_step, reward_metric) + episode_per_test, logger, 0, reward_metric) best_epoch = 0 best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] diff --git a/tianshou/trainer/onpolicy.py b/tianshou/trainer/onpolicy.py index 1e3a55648..34cc0b5bb 100644 --- a/tianshou/trainer/onpolicy.py +++ b/tianshou/trainer/onpolicy.py @@ -103,7 +103,7 @@ def onpolicy_trainer( gradient_step, last_rew, last_len = logger.restore_data() else: test_result = test_episode(policy, test_collector, test_fn, 0, - episode_per_test, logger, env_step, reward_metric) + episode_per_test, logger, 0, reward_metric) best_epoch = 0 best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] From 0664823cbe1bc916b778b8303c590685f8015593 Mon Sep 17 00:00:00 2001 From: Trinkle23897 Date: Mon, 26 Apr 2021 08:21:25 +0800 Subject: [PATCH 05/14] fix optim bug --- examples/atari/atari_bcq.py | 3 +-- examples/mujoco/mujoco_a2c.py | 2 +- examples/mujoco/mujoco_ppo.py | 4 ++-- test/continuous/test_ddpg.py | 1 + test/continuous/test_npg.py | 4 ++-- test/continuous/test_ppo.py | 6 +++--- test/continuous/test_sac_with_il.py | 1 + test/continuous/test_td3.py | 1 + test/continuous/test_trpo.py | 4 ++-- test/discrete/test_a2c_with_il.py | 6 ++++-- test/discrete/test_c51.py | 1 - test/discrete/test_dqn.py | 1 - test/discrete/test_drqn.py | 2 +- test/discrete/test_il_bcq.py | 5 ++--- test/discrete/test_pg.py | 1 + test/discrete/test_ppo.py | 5 +++-- test/discrete/test_qrdqn.py | 2 +- test/discrete/test_sac.py | 1 + 18 files changed, 27 insertions(+), 23 deletions(-) diff --git a/examples/atari/atari_bcq.py b/examples/atari/atari_bcq.py index 2c42c46c7..f16db4618 100644 --- a/examples/atari/atari_bcq.py +++ b/examples/atari/atari_bcq.py @@ -85,8 +85,7 @@ def test_discrete_bcq(args=get_args()): feature_net, args.action_shape, device=args.device, hidden_sizes=args.hidden_sizes, softmax_output=False).to(args.device) optim = torch.optim.Adam( - set(policy_net.parameters()).union(imitation_net.parameters()), - lr=args.lr) + list(policy_net.parameters()) + list(imitation_net.parameters()), lr=args.lr) # define policy policy = DiscreteBCQPolicy( policy_net, imitation_net, optim, args.gamma, args.n_step, diff --git a/examples/mujoco/mujoco_a2c.py b/examples/mujoco/mujoco_a2c.py index 076dfca5c..bf039a187 100755 --- a/examples/mujoco/mujoco_a2c.py +++ b/examples/mujoco/mujoco_a2c.py @@ -101,7 +101,7 @@ def test_a2c(args=get_args()): torch.nn.init.zeros_(m.bias) m.weight.data.copy_(0.01 * m.weight.data) - optim = torch.optim.RMSprop(set(actor.parameters()).union(critic.parameters()), + optim = torch.optim.RMSprop(list(actor.parameters()) + list(critic.parameters()), lr=args.lr, eps=1e-5, alpha=0.99) lr_scheduler = None diff --git a/examples/mujoco/mujoco_ppo.py b/examples/mujoco/mujoco_ppo.py index 3974c2e63..681f626a1 100755 --- a/examples/mujoco/mujoco_ppo.py +++ b/examples/mujoco/mujoco_ppo.py @@ -106,8 +106,8 @@ def test_ppo(args=get_args()): torch.nn.init.zeros_(m.bias) m.weight.data.copy_(0.01 * m.weight.data) - optim = torch.optim.Adam(set( - actor.parameters()).union(critic.parameters()), lr=args.lr) + optim = torch.optim.Adam( + list(actor.parameters()) + list(critic.parameters()), lr=args.lr) lr_scheduler = None if args.lr_decay: diff --git a/test/continuous/test_ddpg.py b/test/continuous/test_ddpg.py index 5030abfe5..021cc0419 100644 --- a/test/continuous/test_ddpg.py +++ b/test/continuous/test_ddpg.py @@ -105,6 +105,7 @@ def stop_fn(mean_rewards): update_per_step=args.update_per_step, stop_fn=stop_fn, save_fn=save_fn, logger=logger) assert stop_fn(result['best_reward']) + if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! diff --git a/test/continuous/test_npg.py b/test/continuous/test_npg.py index d5172fa8b..44821463f 100644 --- a/test/continuous/test_npg.py +++ b/test/continuous/test_npg.py @@ -80,8 +80,8 @@ def test_npg(args=get_args()): if isinstance(m, torch.nn.Linear): torch.nn.init.orthogonal_(m.weight) torch.nn.init.zeros_(m.bias) - optim = torch.optim.Adam(set( - actor.parameters()).union(critic.parameters()), lr=args.lr) + optim = torch.optim.Adam( + list(actor.parameters()) + list(critic.parameters()), lr=args.lr) # replace DiagGuassian with Independent(Normal) which is equivalent # pass *logits to be consistent with policy.forward diff --git a/test/continuous/test_ppo.py b/test/continuous/test_ppo.py index 1ea2a56b5..8d49feeaa 100644 --- a/test/continuous/test_ppo.py +++ b/test/continuous/test_ppo.py @@ -84,8 +84,8 @@ def test_ppo(args=get_args()): if isinstance(m, torch.nn.Linear): torch.nn.init.orthogonal_(m.weight) torch.nn.init.zeros_(m.bias) - optim = torch.optim.Adam(set( - actor.parameters()).union(critic.parameters()), lr=args.lr) + optim = torch.optim.Adam( + list(actor.parameters()) + list(critic.parameters()), lr=args.lr) # replace DiagGuassian with Independent(Normal) which is equivalent # pass *logits to be consistent with policy.forward @@ -137,7 +137,7 @@ def save_train_fn(epoch, env_step, gradient_step): if os.path.exists(ckpt_path): checkpoint = torch.load(ckpt_path, map_location=args.device) policy.load_state_dict(checkpoint['model']) - policy.optim.load_state_dict(checkpoint['optim']) + optim.load_state_dict(checkpoint['optim']) print("Successfully restore policy and optim.") else: print("Fail to restore policy and optim.") diff --git a/test/continuous/test_sac_with_il.py b/test/continuous/test_sac_with_il.py index c064ace7f..b624b2aef 100644 --- a/test/continuous/test_sac_with_il.py +++ b/test/continuous/test_sac_with_il.py @@ -124,6 +124,7 @@ def stop_fn(mean_rewards): update_per_step=args.update_per_step, stop_fn=stop_fn, save_fn=save_fn, logger=logger) assert stop_fn(result['best_reward']) + if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! diff --git a/test/continuous/test_td3.py b/test/continuous/test_td3.py index 2e0674372..ee6fa11de 100644 --- a/test/continuous/test_td3.py +++ b/test/continuous/test_td3.py @@ -119,6 +119,7 @@ def stop_fn(mean_rewards): update_per_step=args.update_per_step, stop_fn=stop_fn, save_fn=save_fn, logger=logger) assert stop_fn(result['best_reward']) + if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! diff --git a/test/continuous/test_trpo.py b/test/continuous/test_trpo.py index 9db4f449c..689c41c14 100644 --- a/test/continuous/test_trpo.py +++ b/test/continuous/test_trpo.py @@ -82,8 +82,8 @@ def test_trpo(args=get_args()): if isinstance(m, torch.nn.Linear): torch.nn.init.orthogonal_(m.weight) torch.nn.init.zeros_(m.bias) - optim = torch.optim.Adam(set( - actor.parameters()).union(critic.parameters()), lr=args.lr) + optim = torch.optim.Adam( + list(actor.parameters()) + list(critic.parameters()), lr=args.lr) # replace DiagGuassian with Independent(Normal) which is equivalent # pass *logits to be consistent with policy.forward diff --git a/test/discrete/test_a2c_with_il.py b/test/discrete/test_a2c_with_il.py index 8c188caf6..261df9121 100644 --- a/test/discrete/test_a2c_with_il.py +++ b/test/discrete/test_a2c_with_il.py @@ -74,8 +74,8 @@ def test_a2c_with_il(args=get_args()): device=args.device) actor = Actor(net, args.action_shape, device=args.device).to(args.device) critic = Critic(net, device=args.device).to(args.device) - optim = torch.optim.Adam(set( - actor.parameters()).union(critic.parameters()), lr=args.lr) + optim = torch.optim.Adam( + list(actor.parameters()) + list(critic.parameters()), lr=args.lr) dist = torch.distributions.Categorical policy = A2CPolicy( actor, critic, optim, dist, @@ -106,6 +106,7 @@ def stop_fn(mean_rewards): episode_per_collect=args.episode_per_collect, stop_fn=stop_fn, save_fn=save_fn, logger=logger) assert stop_fn(result['best_reward']) + if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! @@ -135,6 +136,7 @@ def stop_fn(mean_rewards): args.il_step_per_epoch, args.step_per_collect, args.test_num, args.batch_size, stop_fn=stop_fn, save_fn=save_fn, logger=logger) assert stop_fn(result['best_reward']) + if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! diff --git a/test/discrete/test_c51.py b/test/discrete/test_c51.py index e475d430c..6f8e49e9d 100644 --- a/test/discrete/test_c51.py +++ b/test/discrete/test_c51.py @@ -148,7 +148,6 @@ def save_train_fn(epoch, env_step, gradient_step): args.batch_size, update_per_step=args.update_per_step, train_fn=train_fn, test_fn=test_fn, stop_fn=stop_fn, save_fn=save_fn, logger=logger, resume_from_log=args.resume, save_train_fn=save_train_fn) - assert stop_fn(result['best_reward']) if __name__ == '__main__': diff --git a/test/discrete/test_dqn.py b/test/discrete/test_dqn.py index bd88b2b5b..cb7fac403 100644 --- a/test/discrete/test_dqn.py +++ b/test/discrete/test_dqn.py @@ -120,7 +120,6 @@ def test_fn(epoch, env_step): args.step_per_epoch, args.step_per_collect, args.test_num, args.batch_size, update_per_step=args.update_per_step, train_fn=train_fn, test_fn=test_fn, stop_fn=stop_fn, save_fn=save_fn, logger=logger) - assert stop_fn(result['best_reward']) if __name__ == '__main__': diff --git a/test/discrete/test_drqn.py b/test/discrete/test_drqn.py index 39bef8dbc..c04cbc396 100644 --- a/test/discrete/test_drqn.py +++ b/test/discrete/test_drqn.py @@ -99,8 +99,8 @@ def test_fn(epoch, env_step): args.batch_size, update_per_step=args.update_per_step, train_fn=train_fn, test_fn=test_fn, stop_fn=stop_fn, save_fn=save_fn, logger=logger) - assert stop_fn(result['best_reward']) + if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! diff --git a/test/discrete/test_il_bcq.py b/test/discrete/test_il_bcq.py index 4b5362c45..6e48335b5 100644 --- a/test/discrete/test_il_bcq.py +++ b/test/discrete/test_il_bcq.py @@ -68,7 +68,7 @@ def test_discrete_bcq(args=get_args()): args.state_shape, args.action_shape, hidden_sizes=args.hidden_sizes, device=args.device).to(args.device) optim = torch.optim.Adam( - set(policy_net.parameters()).union(imitation_net.parameters()), + list(policy_net.parameters()) + list(imitation_net.parameters()), lr=args.lr) policy = DiscreteBCQPolicy( @@ -108,7 +108,7 @@ def save_train_fn(epoch, env_step, gradient_step): if os.path.exists(ckpt_path): checkpoint = torch.load(ckpt_path, map_location=args.device) policy.load_state_dict(checkpoint['model']) - # optim.load_state_dict(checkpoint['optim']) # don't know why + optim.load_state_dict(checkpoint['optim']) print("Successfully restore policy and optim.") else: print("Fail to restore policy and optim.") @@ -118,7 +118,6 @@ def save_train_fn(epoch, env_step, gradient_step): args.epoch, args.update_per_epoch, args.test_num, args.batch_size, stop_fn=stop_fn, save_fn=save_fn, logger=logger, resume_from_log=args.resume, save_train_fn=save_train_fn) - assert stop_fn(result['best_reward']) if __name__ == '__main__': diff --git a/test/discrete/test_pg.py b/test/discrete/test_pg.py index 8ce73b5cf..193117150 100644 --- a/test/discrete/test_pg.py +++ b/test/discrete/test_pg.py @@ -93,6 +93,7 @@ def stop_fn(mean_rewards): episode_per_collect=args.episode_per_collect, stop_fn=stop_fn, save_fn=save_fn, logger=logger) assert stop_fn(result['best_reward']) + if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! diff --git a/test/discrete/test_ppo.py b/test/discrete/test_ppo.py index ae051a4f8..761a0719c 100644 --- a/test/discrete/test_ppo.py +++ b/test/discrete/test_ppo.py @@ -75,8 +75,8 @@ def test_ppo(args=get_args()): if isinstance(m, torch.nn.Linear): torch.nn.init.orthogonal_(m.weight) torch.nn.init.zeros_(m.bias) - optim = torch.optim.Adam(set( - actor.parameters()).union(critic.parameters()), lr=args.lr) + optim = torch.optim.Adam( + list(actor.parameters()) + list(critic.parameters()), lr=args.lr) dist = torch.distributions.Categorical policy = PPOPolicy( actor, critic, optim, dist, @@ -113,6 +113,7 @@ def stop_fn(mean_rewards): episode_per_collect=args.episode_per_collect, stop_fn=stop_fn, save_fn=save_fn, logger=logger) assert stop_fn(result['best_reward']) + if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! diff --git a/test/discrete/test_qrdqn.py b/test/discrete/test_qrdqn.py index 59e26b197..2847ac2b8 100644 --- a/test/discrete/test_qrdqn.py +++ b/test/discrete/test_qrdqn.py @@ -117,8 +117,8 @@ def test_fn(epoch, env_step): args.batch_size, train_fn=train_fn, test_fn=test_fn, stop_fn=stop_fn, save_fn=save_fn, logger=logger, update_per_step=args.update_per_step) - assert stop_fn(result['best_reward']) + if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! diff --git a/test/discrete/test_sac.py b/test/discrete/test_sac.py index aaa731547..0cb2ae018 100644 --- a/test/discrete/test_sac.py +++ b/test/discrete/test_sac.py @@ -112,6 +112,7 @@ def stop_fn(mean_rewards): args.batch_size, stop_fn=stop_fn, save_fn=save_fn, logger=logger, update_per_step=args.update_per_step, test_in_train=False) assert stop_fn(result['best_reward']) + if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! From 921ca86906ab35641eb9cf02bdf6ae74cc210e11 Mon Sep 17 00:00:00 2001 From: Trinkle23897 Date: Mon, 26 Apr 2021 16:09:10 +0800 Subject: [PATCH 06/14] save in logger --- tianshou/trainer/offline.py | 4 +- tianshou/trainer/offpolicy.py | 8 ++-- tianshou/trainer/onpolicy.py | 8 ++-- tianshou/utils/log_tools.py | 78 ++++++++++++++++++++++++----------- 4 files changed, 63 insertions(+), 35 deletions(-) diff --git a/tianshou/trainer/offline.py b/tianshou/trainer/offline.py index 06fc44a87..07ded7bc8 100644 --- a/tianshou/trainer/offline.py +++ b/tianshou/trainer/offline.py @@ -107,8 +107,8 @@ def offline_trainer( best_epoch, best_reward, best_reward_std = epoch, rew, rew_std if save_fn: save_fn(policy) - if epoch_per_save > 0 and epoch % epoch_per_save == 0 and save_train_fn: - save_train_fn(epoch, 0, gradient_step) + if epoch_per_save > 0 and epoch % epoch_per_save == 0: + logger.save_data(epoch, 0, gradient_step, save_train_fn) if verbose: print(f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_rew" f"ard: {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") diff --git a/tianshou/trainer/offpolicy.py b/tianshou/trainer/offpolicy.py index da3560ae7..a4415001c 100644 --- a/tianshou/trainer/offpolicy.py +++ b/tianshou/trainer/offpolicy.py @@ -133,8 +133,8 @@ def offpolicy_trainer( if stop_fn(test_result["rew"]): if save_fn: save_fn(policy) - if save_train_fn: - save_train_fn(epoch, env_step, gradient_step) + logger.save_data( + epoch, env_step, gradient_step, save_train_fn) t.set_postfix(**data) return gather_info( start_time, train_collector, test_collector, @@ -160,8 +160,8 @@ def offpolicy_trainer( best_epoch, best_reward, best_reward_std = epoch, rew, rew_std if save_fn: save_fn(policy) - if epoch_per_save > 0 and epoch % epoch_per_save == 0 and save_train_fn: - save_train_fn(epoch, env_step, gradient_step) + if epoch_per_save > 0 and epoch % epoch_per_save == 0: + logger.save_data(epoch, env_step, gradient_step, save_train_fn) if verbose: print(f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_rew" f"ard: {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") diff --git a/tianshou/trainer/onpolicy.py b/tianshou/trainer/onpolicy.py index 34cc0b5bb..189f1e9c8 100644 --- a/tianshou/trainer/onpolicy.py +++ b/tianshou/trainer/onpolicy.py @@ -140,8 +140,8 @@ def onpolicy_trainer( if stop_fn(test_result["rew"]): if save_fn: save_fn(policy) - if save_train_fn: - save_train_fn(epoch, env_step, gradient_step) + logger.save_data( + epoch, env_step, gradient_step, save_train_fn) t.set_postfix(**data) return gather_info( start_time, train_collector, test_collector, @@ -171,8 +171,8 @@ def onpolicy_trainer( best_epoch, best_reward, best_reward_std = epoch, rew, rew_std if save_fn: save_fn(policy) - if epoch_per_save > 0 and epoch % epoch_per_save == 0 and save_train_fn: - save_train_fn(epoch, env_step, gradient_step) + if epoch_per_save > 0 and epoch % epoch_per_save == 0: + logger.save_data(epoch, env_step, gradient_step, save_train_fn) if verbose: print(f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_rew" f"ard: {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") diff --git a/tianshou/utils/log_tools.py b/tianshou/utils/log_tools.py index 5062f0081..61aead16b 100644 --- a/tianshou/utils/log_tools.py +++ b/tianshou/utils/log_tools.py @@ -2,11 +2,14 @@ import numpy as np from numbers import Number from abc import ABC, abstractmethod -from typing import Any, Tuple, Union from torch.utils.tensorboard import SummaryWriter +from typing import Any, Tuple, Union, Callable, Optional from tensorboard.backend.event_processing import event_accumulator +WRITE_TYPE = Union[int, Number, np.number, np.ndarray] + + class BaseLogger(ABC): """The base class for any logger which is compatible with trainer.""" @@ -15,9 +18,7 @@ def __init__(self, writer: Any) -> None: self.writer = writer @abstractmethod - def write( - self, key: str, x: int, y: Union[Number, np.number, np.ndarray], **kwargs: Any - ) -> None: + def write(self, key: str, x: int, y: WRITE_TYPE, **kwargs: Any) -> None: """Specify how the writer is used to log data. :param str key: namespace which the input data tuple belongs to. @@ -54,6 +55,24 @@ def log_test_data(self, collect_result: dict, step: int, epoch: int) -> None: """ pass + def save_data( + self, + epoch: int, + env_step: int, + gradient_step: int, + save_train_fn: Optional[Callable[[int, int, int], None]] = None, + ) -> None: + """Use writer to log metadata when calling ``save_train_fn`` in trainer. + + :param int epoch: the epoch in trainer. + :param int env_step: the env_step in trainer. + :param int gradient_step: the gradient_step in trainer. + :param function save_train_fn: a hook defined by user, see trainer + documentation for detail. + """ + if save_train_fn: + save_train_fn(epoch, env_step, gradient_step) + def restore_data(self) -> Tuple[int, float, float, int, int, int, float, int]: """Return the metadata from existing log. @@ -96,9 +115,7 @@ def __init__( self.last_log_test_step = -1 self.last_log_update_step = -1 - def write( - self, key: str, x: int, y: Union[Number, np.number, np.ndarray], **kwargs: Any - ) -> None: + def write(self, key: str, x: int, y: WRITE_TYPE, **kwargs: Any) -> None: self.writer.add_scalar(key, y, global_step=x) def log_train_data(self, collect_result: dict, step: int) -> None: @@ -139,7 +156,6 @@ def log_test_data(self, collect_result: dict, step: int, epoch: int) -> None: rew, rew_std, len_, len_std = rews.mean(), rews.std(), lens.mean(), lens.std() collect_result.update(rew=rew, rew_std=rew_std, len=len_, len_std=len_std) if step - self.last_log_test_step >= self.test_interval: - self.write("test/epoch", step, epoch) # type: ignore self.write("test/rew", step, rew) self.write("test/len", step, len_) self.write("test/rew_std", step, rew_std) @@ -148,15 +164,37 @@ def log_test_data(self, collect_result: dict, step: int, epoch: int) -> None: def log_update_data(self, update_result: dict, step: int) -> None: if step - self.last_log_update_step >= self.update_interval: - self.write("train/gradient_step", step, step) # type: ignore for k, v in update_result.items(): self.write(k, step, v) self.last_log_update_step = step + def save_data( + self, + epoch: int, + env_step: int, + gradient_step: int, + save_train_fn: Optional[Callable[[int, int, int], None]] = None, + ) -> None: + super().save_data(epoch, env_step, gradient_step, save_train_fn) + self.write("save/epoch", epoch, epoch) + self.write("save/env_step", env_step, env_step) + self.write("save/gradient_step", gradient_step, gradient_step) + def restore_data(self) -> Tuple[int, float, float, int, int, int, float, int]: ea = event_accumulator.EventAccumulator(self.writer.log_dir) ea.Reload() - epoch, best_epoch, best_reward, best_reward_std = 0, -1, 0.0, 0.0 + + try: # epoch / env_step / gradient_step + epoch = ea.scalars.Items("save/epoch")[-1].step + env_step = ea.scalars.Items("save/env_step")[-1].step + gradient_step = ea.scalars.Items("save/gradient_step")[-1].step + self.last_log_train_step = env_step + self.last_log_update_step = gradient_step + self.last_log_test_step = epoch + except KeyError: + epoch, env_step, gradient_step = 0, 0, 0 + + best_epoch, best_reward, best_reward_std = -1, 0.0, 0.0 try: # best_* for test_rew, test_rew_std in zip( ea.scalars.Items("test/rew"), ea.scalars.Items("test/rew_std") @@ -164,22 +202,14 @@ def restore_data(self) -> Tuple[int, float, float, int, int, int, float, int]: rew, rew_std = test_rew.value, test_rew_std.value if best_epoch == -1 or best_reward < rew: best_epoch, best_reward, best_reward_std = 0, rew, rew_std - self.last_log_test_step = test_rew.step - epoch = int(ea.scalars.Items("test/epoch")[-1].value) except KeyError: pass - try: # env_step / last_* - item = ea.scalars.Items("train/rew")[-1] - self.last_log_train_step = env_step = item.step - last_rew = item.value + + try: # last_* + last_rew = ea.scalars.Items("train/rew")[-1].value last_len = ea.scalars.Items("train/len")[-1].value except KeyError: - last_rew, last_len, env_step = 0.0, 0, 0 - try: - self.last_log_update_step = gradient_step = int(ea.scalars.Items( - "train/gradient_step")[-1].value) - except KeyError: - gradient_step = 0 + last_rew, last_len = 0.0, 0 return best_epoch, best_reward, best_reward_std, \ epoch, env_step, gradient_step, last_rew, last_len @@ -190,8 +220,6 @@ class LazyLogger(BasicLogger): def __init__(self) -> None: super().__init__(None) # type: ignore - def write( - self, key: str, x: int, y: Union[Number, np.number, np.ndarray], **kwargs: Any - ) -> None: + def write(self, key: str, x: int, y: WRITE_TYPE, **kwargs: Any) -> None: """The LazyLogger writes nothing.""" pass From ba461e0e57ab220a0a692b4285399811cfff56d3 Mon Sep 17 00:00:00 2001 From: Trinkle23897 Date: Tue, 27 Apr 2021 20:37:48 +0800 Subject: [PATCH 07/14] fix test --- test/continuous/test_npg.py | 3 +-- test/continuous/test_trpo.py | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/test/continuous/test_npg.py b/test/continuous/test_npg.py index 44821463f..dd2ce2d82 100644 --- a/test/continuous/test_npg.py +++ b/test/continuous/test_npg.py @@ -80,8 +80,7 @@ def test_npg(args=get_args()): if isinstance(m, torch.nn.Linear): torch.nn.init.orthogonal_(m.weight) torch.nn.init.zeros_(m.bias) - optim = torch.optim.Adam( - list(actor.parameters()) + list(critic.parameters()), lr=args.lr) + optim = torch.optim.Adam(critic.parameters(), lr=args.lr) # replace DiagGuassian with Independent(Normal) which is equivalent # pass *logits to be consistent with policy.forward diff --git a/test/continuous/test_trpo.py b/test/continuous/test_trpo.py index 689c41c14..8c8387773 100644 --- a/test/continuous/test_trpo.py +++ b/test/continuous/test_trpo.py @@ -27,7 +27,8 @@ def get_args(): parser.add_argument('--epoch', type=int, default=5) parser.add_argument('--step-per-epoch', type=int, default=50000) parser.add_argument('--step-per-collect', type=int, default=2048) - parser.add_argument('--repeat-per-collect', type=int, default=1) + parser.add_argument('--repeat-per-collect', type=int, + default=2) # theoretically it should be 1 parser.add_argument('--batch-size', type=int, default=99999) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[64, 64]) parser.add_argument('--training-num', type=int, default=16) @@ -82,8 +83,7 @@ def test_trpo(args=get_args()): if isinstance(m, torch.nn.Linear): torch.nn.init.orthogonal_(m.weight) torch.nn.init.zeros_(m.bias) - optim = torch.optim.Adam( - list(actor.parameters()) + list(critic.parameters()), lr=args.lr) + optim = torch.optim.Adam(critic.parameters(), lr=args.lr) # replace DiagGuassian with Independent(Normal) which is equivalent # pass *logits to be consistent with policy.forward From 7a25e6e95f6bd05a75f1b2786b7ef04e5374cc13 Mon Sep 17 00:00:00 2001 From: Trinkle23897 Date: Tue, 27 Apr 2021 20:45:21 +0800 Subject: [PATCH 08/14] epoch_per_checkpoint --- docs/tutorials/cheatsheet.rst | 2 +- test/continuous/test_ppo.py | 9 +++++---- test/discrete/test_c51.py | 4 ++-- test/discrete/test_il_bcq.py | 4 ++-- tianshou/trainer/offline.py | 14 +++++++------- tianshou/trainer/offpolicy.py | 16 ++++++++-------- tianshou/trainer/onpolicy.py | 16 ++++++++-------- tianshou/utils/log_tools.py | 14 +++++++------- 8 files changed, 40 insertions(+), 39 deletions(-) diff --git a/docs/tutorials/cheatsheet.rst b/docs/tutorials/cheatsheet.rst index 7a0937099..245f35031 100644 --- a/docs/tutorials/cheatsheet.rst +++ b/docs/tutorials/cheatsheet.rst @@ -39,7 +39,7 @@ This is related to `Issue 349 `_. To resume training process from an existing checkpoint, you need to do the following things in the training process: -1. Make sure you write ``save_train_fn`` which saves everything needed in the training process, i.e., policy, optim, buffer; pass it to trainer; +1. Make sure you write ``save_checkpoint_fn`` which saves everything needed in the training process, i.e., policy, optim, buffer; pass it to trainer; 2. Use ``BasicLogger`` which contains a tensorboard; And to successfully resume from a checkpoint: diff --git a/test/continuous/test_ppo.py b/test/continuous/test_ppo.py index 8d49feeaa..824c0744b 100644 --- a/test/continuous/test_ppo.py +++ b/test/continuous/test_ppo.py @@ -123,7 +123,7 @@ def save_fn(policy): def stop_fn(mean_rewards): return mean_rewards >= env.spec.reward_threshold - def save_train_fn(epoch, env_step, gradient_step): + def save_checkpoint_fn(epoch, env_step, gradient_step): # see also: https://pytorch.org/tutorials/beginner/saving_loading_models.html torch.save({ 'model': policy.state_dict(), @@ -144,10 +144,11 @@ def save_train_fn(epoch, env_step, gradient_step): # trainer result = onpolicy_trainer( - policy, train_collector, test_collector, args.epoch, - args.step_per_epoch, args.repeat_per_collect, args.test_num, args.batch_size, + policy, train_collector, test_collector, args.epoch, args.step_per_epoch, + args.repeat_per_collect, args.test_num, args.batch_size, episode_per_collect=args.episode_per_collect, stop_fn=stop_fn, save_fn=save_fn, - logger=logger, resume_from_log=args.resume, save_train_fn=save_train_fn) + logger=logger, resume_from_log=args.resume, + save_checkpoint_fn=save_checkpoint_fn) assert stop_fn(result['best_reward']) if __name__ == '__main__': diff --git a/test/discrete/test_c51.py b/test/discrete/test_c51.py index 6f8e49e9d..47d7806db 100644 --- a/test/discrete/test_c51.py +++ b/test/discrete/test_c51.py @@ -114,7 +114,7 @@ def train_fn(epoch, env_step): def test_fn(epoch, env_step): policy.set_eps(args.eps_test) - def save_train_fn(epoch, env_step, gradient_step): + def save_checkpoint_fn(epoch, env_step, gradient_step): # see also: https://pytorch.org/tutorials/beginner/saving_loading_models.html torch.save({ 'model': policy.state_dict(), @@ -147,7 +147,7 @@ def save_train_fn(epoch, env_step, gradient_step): args.step_per_epoch, args.step_per_collect, args.test_num, args.batch_size, update_per_step=args.update_per_step, train_fn=train_fn, test_fn=test_fn, stop_fn=stop_fn, save_fn=save_fn, logger=logger, - resume_from_log=args.resume, save_train_fn=save_train_fn) + resume_from_log=args.resume, save_checkpoint_fn=save_checkpoint_fn) assert stop_fn(result['best_reward']) if __name__ == '__main__': diff --git a/test/discrete/test_il_bcq.py b/test/discrete/test_il_bcq.py index 6e48335b5..e0df3beb6 100644 --- a/test/discrete/test_il_bcq.py +++ b/test/discrete/test_il_bcq.py @@ -94,7 +94,7 @@ def save_fn(policy): def stop_fn(mean_rewards): return mean_rewards >= env.spec.reward_threshold - def save_train_fn(epoch, env_step, gradient_step): + def save_checkpoint_fn(epoch, env_step, gradient_step): # see also: https://pytorch.org/tutorials/beginner/saving_loading_models.html torch.save({ 'model': policy.state_dict(), @@ -117,7 +117,7 @@ def save_train_fn(epoch, env_step, gradient_step): policy, buffer, test_collector, args.epoch, args.update_per_epoch, args.test_num, args.batch_size, stop_fn=stop_fn, save_fn=save_fn, logger=logger, - resume_from_log=args.resume, save_train_fn=save_train_fn) + resume_from_log=args.resume, save_checkpoint_fn=save_checkpoint_fn) assert stop_fn(result['best_reward']) if __name__ == '__main__': diff --git a/tianshou/trainer/offline.py b/tianshou/trainer/offline.py index 07ded7bc8..a92e39a8b 100644 --- a/tianshou/trainer/offline.py +++ b/tianshou/trainer/offline.py @@ -21,9 +21,9 @@ def offline_trainer( test_fn: Optional[Callable[[int, Optional[int]], None]] = None, stop_fn: Optional[Callable[[float], bool]] = None, save_fn: Optional[Callable[[BasePolicy], None]] = None, - save_train_fn: Optional[Callable[[int, int, int], None]] = None, + save_checkpoint_fn: Optional[Callable[[int, int, int], None]] = None, resume_from_log: bool = False, - epoch_per_save: int = 1, + epoch_per_checkpoint: int = 1, reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, logger: BaseLogger = LazyLogger(), verbose: bool = True, @@ -47,12 +47,12 @@ def offline_trainer( :param function save_fn: a hook called when the undiscounted average mean reward in evaluation phase gets better, with the signature ``f(policy: BasePolicy) -> None``. - :param function save_train_fn: a function to save training process, with the + :param function save_checkpoint_fn: a function to save training process, with the signature ``f(epoch: int, env_step: int, gradient_step: int) -> None``; you can save whatever you want. Because offline-RL doesn't have env_step, the env_step is always 0 here. - :param int epoch_per_save: save train process each ``epoch_per_save`` epoch by - calling ``save_train_fn``. Default to 1. + :param int epoch_per_checkpoint: save train process each ``epoch_per_checkpoint`` + epoch by calling ``save_checkpoint_fn``. Default to 1. :param bool resume_from_log: resume gradient_step and other metadata from existing tensorboard log. Default to False. :param function stop_fn: a function with signature ``f(mean_rewards: float) -> @@ -107,8 +107,8 @@ def offline_trainer( best_epoch, best_reward, best_reward_std = epoch, rew, rew_std if save_fn: save_fn(policy) - if epoch_per_save > 0 and epoch % epoch_per_save == 0: - logger.save_data(epoch, 0, gradient_step, save_train_fn) + if epoch_per_checkpoint > 0 and epoch % epoch_per_checkpoint == 0: + logger.save_data(epoch, 0, gradient_step, save_checkpoint_fn) if verbose: print(f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_rew" f"ard: {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") diff --git a/tianshou/trainer/offpolicy.py b/tianshou/trainer/offpolicy.py index a4415001c..3e9fe66b0 100644 --- a/tianshou/trainer/offpolicy.py +++ b/tianshou/trainer/offpolicy.py @@ -24,9 +24,9 @@ def offpolicy_trainer( test_fn: Optional[Callable[[int, Optional[int]], None]] = None, stop_fn: Optional[Callable[[float], bool]] = None, save_fn: Optional[Callable[[BasePolicy], None]] = None, - save_train_fn: Optional[Callable[[int, int, int], None]] = None, + save_checkpoint_fn: Optional[Callable[[int, int, int], None]] = None, resume_from_log: bool = False, - epoch_per_save: int = 1, + epoch_per_checkpoint: int = 1, reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, logger: BaseLogger = LazyLogger(), verbose: bool = True, @@ -62,11 +62,11 @@ def offpolicy_trainer( :param function save_fn: a hook called when the undiscounted average mean reward in evaluation phase gets better, with the signature ``f(policy: BasePolicy) -> None``. - :param function save_train_fn: a function to save training process, with the + :param function save_checkpoint_fn: a function to save training process, with the signature ``f(epoch: int, env_step: int, gradient_step: int) -> None``; you can save whatever you want. - :param int epoch_per_save: save train process each ``epoch_per_save`` epoch by - calling ``save_train_fn``. Default to 1. + :param int epoch_per_checkpoint: save train process each ``epoch_per_checkpoint`` + epoch by calling ``save_checkpoint_fn``. Default to 1. :param bool resume_from_log: resume env_step/gradient_step and other metadata from existing tensorboard log. Default to False. :param function stop_fn: a function with signature ``f(mean_rewards: float) -> @@ -134,7 +134,7 @@ def offpolicy_trainer( if save_fn: save_fn(policy) logger.save_data( - epoch, env_step, gradient_step, save_train_fn) + epoch, env_step, gradient_step, save_checkpoint_fn) t.set_postfix(**data) return gather_info( start_time, train_collector, test_collector, @@ -160,8 +160,8 @@ def offpolicy_trainer( best_epoch, best_reward, best_reward_std = epoch, rew, rew_std if save_fn: save_fn(policy) - if epoch_per_save > 0 and epoch % epoch_per_save == 0: - logger.save_data(epoch, env_step, gradient_step, save_train_fn) + if epoch_per_checkpoint > 0 and epoch % epoch_per_checkpoint == 0: + logger.save_data(epoch, env_step, gradient_step, save_checkpoint_fn) if verbose: print(f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_rew" f"ard: {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") diff --git a/tianshou/trainer/onpolicy.py b/tianshou/trainer/onpolicy.py index 189f1e9c8..d1ed8a5ad 100644 --- a/tianshou/trainer/onpolicy.py +++ b/tianshou/trainer/onpolicy.py @@ -25,9 +25,9 @@ def onpolicy_trainer( test_fn: Optional[Callable[[int, Optional[int]], None]] = None, stop_fn: Optional[Callable[[float], bool]] = None, save_fn: Optional[Callable[[BasePolicy], None]] = None, - save_train_fn: Optional[Callable[[int, int, int], None]] = None, + save_checkpoint_fn: Optional[Callable[[int, int, int], None]] = None, resume_from_log: bool = False, - epoch_per_save: int = 1, + epoch_per_checkpoint: int = 1, reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, logger: BaseLogger = LazyLogger(), verbose: bool = True, @@ -64,11 +64,11 @@ def onpolicy_trainer( :param function save_fn: a hook called when the undiscounted average mean reward in evaluation phase gets better, with the signature ``f(policy: BasePolicy) -> None``. - :param function save_train_fn: a function to save training process, with the + :param function save_checkpoint_fn: a function to save training process, with the signature ``f(epoch: int, env_step: int, gradient_step: int) -> None``; you can save whatever you want. - :param int epoch_per_save: save train process each ``epoch_per_save`` epoch by - calling ``save_train_fn``. Default to 1. + :param int epoch_per_checkpoint: save train process each ``epoch_per_checkpoint`` + epoch by calling ``save_checkpoint_fn``. Default to 1. :param bool resume_from_log: resume env_step/gradient_step and other metadata from existing tensorboard log. Default to False. :param function stop_fn: a function with signature ``f(mean_rewards: float) -> @@ -141,7 +141,7 @@ def onpolicy_trainer( if save_fn: save_fn(policy) logger.save_data( - epoch, env_step, gradient_step, save_train_fn) + epoch, env_step, gradient_step, save_checkpoint_fn) t.set_postfix(**data) return gather_info( start_time, train_collector, test_collector, @@ -171,8 +171,8 @@ def onpolicy_trainer( best_epoch, best_reward, best_reward_std = epoch, rew, rew_std if save_fn: save_fn(policy) - if epoch_per_save > 0 and epoch % epoch_per_save == 0: - logger.save_data(epoch, env_step, gradient_step, save_train_fn) + if epoch_per_checkpoint > 0 and epoch % epoch_per_checkpoint == 0: + logger.save_data(epoch, env_step, gradient_step, save_checkpoint_fn) if verbose: print(f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_rew" f"ard: {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") diff --git a/tianshou/utils/log_tools.py b/tianshou/utils/log_tools.py index 61aead16b..137667ab0 100644 --- a/tianshou/utils/log_tools.py +++ b/tianshou/utils/log_tools.py @@ -60,18 +60,18 @@ def save_data( epoch: int, env_step: int, gradient_step: int, - save_train_fn: Optional[Callable[[int, int, int], None]] = None, + save_checkpoint_fn: Optional[Callable[[int, int, int], None]] = None, ) -> None: - """Use writer to log metadata when calling ``save_train_fn`` in trainer. + """Use writer to log metadata when calling ``save_checkpoint_fn`` in trainer. :param int epoch: the epoch in trainer. :param int env_step: the env_step in trainer. :param int gradient_step: the gradient_step in trainer. - :param function save_train_fn: a hook defined by user, see trainer + :param function save_checkpoint_fn: a hook defined by user, see trainer documentation for detail. """ - if save_train_fn: - save_train_fn(epoch, env_step, gradient_step) + if save_checkpoint_fn: + save_checkpoint_fn(epoch, env_step, gradient_step) def restore_data(self) -> Tuple[int, float, float, int, int, int, float, int]: """Return the metadata from existing log. @@ -173,9 +173,9 @@ def save_data( epoch: int, env_step: int, gradient_step: int, - save_train_fn: Optional[Callable[[int, int, int], None]] = None, + save_checkpoint_fn: Optional[Callable[[int, int, int], None]] = None, ) -> None: - super().save_data(epoch, env_step, gradient_step, save_train_fn) + super().save_data(epoch, env_step, gradient_step, save_checkpoint_fn) self.write("save/epoch", epoch, epoch) self.write("save/env_step", env_step, env_step) self.write("save/gradient_step", gradient_step, gradient_step) From e93a3eed418f8dea7250351403adb0190e294f6b Mon Sep 17 00:00:00 2001 From: Trinkle23897 Date: Tue, 27 Apr 2021 22:45:31 +0800 Subject: [PATCH 09/14] remove --- tianshou/trainer/offline.py | 15 ++++++------ tianshou/trainer/offpolicy.py | 14 +++++------ tianshou/trainer/onpolicy.py | 14 +++++------ tianshou/trainer/utils.py | 2 +- tianshou/utils/log_tools.py | 46 +++++++++++------------------------ 5 files changed, 34 insertions(+), 57 deletions(-) diff --git a/tianshou/trainer/offline.py b/tianshou/trainer/offline.py index a92e39a8b..fe241e1da 100644 --- a/tianshou/trainer/offline.py +++ b/tianshou/trainer/offline.py @@ -71,17 +71,16 @@ def offline_trainer( :return: See :func:`~tianshou.trainer.gather_info`. """ start_epoch, gradient_step = 0, 0 + if resume_from_log: + start_epoch, _, gradient_step = logger.restore_data() stat: Dict[str, MovAvg] = defaultdict(MovAvg) start_time = time.time() test_collector.reset_stat() - if resume_from_log: - best_epoch, best_reward, best_reward_std, start_epoch, _, \ - gradient_step, _, _ = logger.restore_data() - else: - test_result = test_episode(policy, test_collector, test_fn, 0, - episode_per_test, logger, 0, reward_metric) - best_epoch = 0 - best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] + + test_result = test_episode(policy, test_collector, test_fn, start_epoch, + episode_per_test, logger, gradient_step, reward_metric) + best_epoch = start_epoch + best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] for epoch in range(1 + start_epoch, 1 + max_epoch): policy.train() diff --git a/tianshou/trainer/offpolicy.py b/tianshou/trainer/offpolicy.py index 3e9fe66b0..abb50694f 100644 --- a/tianshou/trainer/offpolicy.py +++ b/tianshou/trainer/offpolicy.py @@ -86,20 +86,18 @@ def offpolicy_trainer( :return: See :func:`~tianshou.trainer.gather_info`. """ start_epoch, env_step, gradient_step = 0, 0, 0 + if resume_from_log: + start_epoch, env_step, gradient_step = logger.restore_data() last_rew, last_len = 0.0, 0 stat: Dict[str, MovAvg] = defaultdict(MovAvg) start_time = time.time() train_collector.reset_stat() test_collector.reset_stat() test_in_train = test_in_train and train_collector.policy == policy - if resume_from_log: - best_epoch, best_reward, best_reward_std, start_epoch, env_step, \ - gradient_step, last_rew, last_len = logger.restore_data() - else: - test_result = test_episode(policy, test_collector, test_fn, 0, - episode_per_test, logger, 0, reward_metric) - best_epoch = 0 - best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] + test_result = test_episode(policy, test_collector, test_fn, start_epoch, + episode_per_test, logger, gradient_step, reward_metric) + best_epoch = start_epoch + best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] for epoch in range(1 + start_epoch, 1 + max_epoch): # train diff --git a/tianshou/trainer/onpolicy.py b/tianshou/trainer/onpolicy.py index d1ed8a5ad..7b7f42cd5 100644 --- a/tianshou/trainer/onpolicy.py +++ b/tianshou/trainer/onpolicy.py @@ -92,20 +92,18 @@ def onpolicy_trainer( Only either one of step_per_collect and episode_per_collect can be specified. """ start_epoch, env_step, gradient_step = 0, 0, 0 + if resume_from_log: + start_epoch, env_step, gradient_step = logger.restore_data() last_rew, last_len = 0.0, 0 stat: Dict[str, MovAvg] = defaultdict(MovAvg) start_time = time.time() train_collector.reset_stat() test_collector.reset_stat() test_in_train = test_in_train and train_collector.policy == policy - if resume_from_log: - best_epoch, best_reward, best_reward_std, start_epoch, env_step, \ - gradient_step, last_rew, last_len = logger.restore_data() - else: - test_result = test_episode(policy, test_collector, test_fn, 0, - episode_per_test, logger, 0, reward_metric) - best_epoch = 0 - best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] + test_result = test_episode(policy, test_collector, test_fn, start_epoch, + episode_per_test, logger, gradient_step, reward_metric) + best_epoch = start_epoch + best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] for epoch in range(1 + start_epoch, 1 + max_epoch): # train diff --git a/tianshou/trainer/utils.py b/tianshou/trainer/utils.py index 46df58d5e..2e729feeb 100644 --- a/tianshou/trainer/utils.py +++ b/tianshou/trainer/utils.py @@ -27,7 +27,7 @@ def test_episode( if reward_metric: result["rews"] = reward_metric(result["rews"]) if logger and global_step is not None: - logger.log_test_data(result, global_step, epoch) + logger.log_test_data(result, global_step) return result diff --git a/tianshou/utils/log_tools.py b/tianshou/utils/log_tools.py index 137667ab0..a28c1607a 100644 --- a/tianshou/utils/log_tools.py +++ b/tianshou/utils/log_tools.py @@ -45,13 +45,12 @@ def log_update_data(self, update_result: dict, step: int) -> None: """ pass - def log_test_data(self, collect_result: dict, step: int, epoch: int) -> None: + def log_test_data(self, collect_result: dict, step: int) -> None: """Use writer to log statistics generated during evaluating. :param collect_result: a dict containing information of data collected in evaluating stage, i.e., returns of collector.collect(). :param int step: stands for the timestep the collect_result being logged. - :param int epoch: stands for the epoch the collect_result being logged. """ pass @@ -73,18 +72,16 @@ def save_data( if save_checkpoint_fn: save_checkpoint_fn(epoch, env_step, gradient_step) - def restore_data(self) -> Tuple[int, float, float, int, int, int, float, int]: + def restore_data(self) -> Tuple[int, int, int]: """Return the metadata from existing log. If it finds nothing or an error occurs during the recover process, it will return the default parameters. - :return: best_epoch, best_reward, best_reward_std, epoch, env_step, - gradient_step, last_rew, last_len + :return: epoch, env_step, gradient_step """ warnings.warn("Please specify an existing tensorboard logdir to resume.") - # epoch == -1 is invalid, so that it should be forcely updated by trainer - return -1, 0.0, 0.0, 0, 0, 0, 0.0, 0 + return 0, 0, 0 class BasicLogger(BaseLogger): @@ -138,13 +135,12 @@ def log_train_data(self, collect_result: dict, step: int) -> None: self.write("train/len", step, collect_result["len"]) self.last_log_train_step = step - def log_test_data(self, collect_result: dict, step: int, epoch: int) -> None: + def log_test_data(self, collect_result: dict, step: int) -> None: """Use writer to log statistics generated during evaluating. :param collect_result: a dict containing information of data collected in evaluating stage, i.e., returns of collector.collect(). :param int step: stands for the timestep the collect_result being logged. - :param int epoch: stands for the epoch the collect_result being logged. .. note:: @@ -180,38 +176,24 @@ def save_data( self.write("save/env_step", env_step, env_step) self.write("save/gradient_step", gradient_step, gradient_step) - def restore_data(self) -> Tuple[int, float, float, int, int, int, float, int]: + def restore_data(self) -> Tuple[int, int, int]: ea = event_accumulator.EventAccumulator(self.writer.log_dir) ea.Reload() - try: # epoch / env_step / gradient_step + try: # epoch / gradient_step epoch = ea.scalars.Items("save/epoch")[-1].step - env_step = ea.scalars.Items("save/env_step")[-1].step + self.last_log_test_step = epoch gradient_step = ea.scalars.Items("save/gradient_step")[-1].step - self.last_log_train_step = env_step self.last_log_update_step = gradient_step - self.last_log_test_step = epoch except KeyError: - epoch, env_step, gradient_step = 0, 0, 0 - - best_epoch, best_reward, best_reward_std = -1, 0.0, 0.0 - try: # best_* - for test_rew, test_rew_std in zip( - ea.scalars.Items("test/rew"), ea.scalars.Items("test/rew_std") - ): - rew, rew_std = test_rew.value, test_rew_std.value - if best_epoch == -1 or best_reward < rew: - best_epoch, best_reward, best_reward_std = 0, rew, rew_std + epoch, gradient_step = 0, 0 + try: # offline trainer doesn't have env_step + env_step = ea.scalars.Items("save/env_step")[-1].step + self.last_log_train_step = env_step except KeyError: - pass + env_step = 0 - try: # last_* - last_rew = ea.scalars.Items("train/rew")[-1].value - last_len = ea.scalars.Items("train/len")[-1].value - except KeyError: - last_rew, last_len = 0.0, 0 - return best_epoch, best_reward, best_reward_std, \ - epoch, env_step, gradient_step, last_rew, last_len + return epoch, env_step, gradient_step class LazyLogger(BasicLogger): From 740a021bc0da08fcd9cf698c1b758b257110368c Mon Sep 17 00:00:00 2001 From: Trinkle23897 Date: Wed, 28 Apr 2021 14:20:37 +0800 Subject: [PATCH 10/14] move epoch_per_checkpoint to logger(save_interval) --- docs/tutorials/cheatsheet.rst | 1 + test/continuous/test_ppo.py | 3 ++- test/discrete/test_c51.py | 3 ++- test/discrete/test_il_bcq.py | 3 ++- tianshou/trainer/offline.py | 10 +++++----- tianshou/trainer/offpolicy.py | 10 +++++----- tianshou/trainer/onpolicy.py | 10 +++++----- tianshou/utils/log_tools.py | 27 +++++++++++++++------------ 8 files changed, 37 insertions(+), 30 deletions(-) diff --git a/docs/tutorials/cheatsheet.rst b/docs/tutorials/cheatsheet.rst index 245f35031..24b38ccd2 100644 --- a/docs/tutorials/cheatsheet.rst +++ b/docs/tutorials/cheatsheet.rst @@ -41,6 +41,7 @@ To resume training process from an existing checkpoint, you need to do the follo 1. Make sure you write ``save_checkpoint_fn`` which saves everything needed in the training process, i.e., policy, optim, buffer; pass it to trainer; 2. Use ``BasicLogger`` which contains a tensorboard; +3. To adjust the save frequency, specify ``save_interval`` when initializing BasicLogger. And to successfully resume from a checkpoint: diff --git a/test/continuous/test_ppo.py b/test/continuous/test_ppo.py index 824c0744b..6ab4717d6 100644 --- a/test/continuous/test_ppo.py +++ b/test/continuous/test_ppo.py @@ -48,6 +48,7 @@ def get_args(): parser.add_argument('--norm-adv', type=int, default=1) parser.add_argument('--recompute-adv', type=int, default=0) parser.add_argument('--resume', action="store_true") + parser.add_argument("--save-interval", type=int, default=4) args = parser.parse_known_args()[0] return args @@ -115,7 +116,7 @@ def dist(*logits): # log log_path = os.path.join(args.logdir, args.task, 'ppo') writer = SummaryWriter(log_path) - logger = BasicLogger(writer) + logger = BasicLogger(writer, save_interval=args.save_interval) def save_fn(policy): torch.save(policy.state_dict(), os.path.join(log_path, 'policy.pth')) diff --git a/test/discrete/test_c51.py b/test/discrete/test_c51.py index 47d7806db..a7fdd922a 100644 --- a/test/discrete/test_c51.py +++ b/test/discrete/test_c51.py @@ -48,6 +48,7 @@ def get_args(): parser.add_argument( '--device', type=str, default='cuda' if torch.cuda.is_available() else 'cpu') + parser.add_argument("--save-interval", type=int, default=4) args = parser.parse_known_args()[0] return args @@ -92,7 +93,7 @@ def test_c51(args=get_args()): # log log_path = os.path.join(args.logdir, args.task, 'c51') writer = SummaryWriter(log_path) - logger = BasicLogger(writer) + logger = BasicLogger(writer, save_interval=args.save_interval) def save_fn(policy): torch.save(policy.state_dict(), os.path.join(log_path, 'policy.pth')) diff --git a/test/discrete/test_il_bcq.py b/test/discrete/test_il_bcq.py index e0df3beb6..1ea2db433 100644 --- a/test/discrete/test_il_bcq.py +++ b/test/discrete/test_il_bcq.py @@ -43,6 +43,7 @@ def get_args(): default="cuda" if torch.cuda.is_available() else "cpu", ) parser.add_argument("--resume", action="store_true") + parser.add_argument("--save-interval", type=int, default=4) args = parser.parse_known_args()[0] return args @@ -86,7 +87,7 @@ def test_discrete_bcq(args=get_args()): log_path = os.path.join(args.logdir, args.task, 'discrete_bcq') writer = SummaryWriter(log_path) - logger = BasicLogger(writer) + logger = BasicLogger(writer, save_interval=args.save_interval) def save_fn(policy): torch.save(policy.state_dict(), os.path.join(log_path, 'policy.pth')) diff --git a/tianshou/trainer/offline.py b/tianshou/trainer/offline.py index fe241e1da..aad7299ca 100644 --- a/tianshou/trainer/offline.py +++ b/tianshou/trainer/offline.py @@ -1,5 +1,6 @@ import time import tqdm +import warnings import numpy as np from collections import defaultdict from typing import Dict, Union, Callable, Optional @@ -23,7 +24,6 @@ def offline_trainer( save_fn: Optional[Callable[[BasePolicy], None]] = None, save_checkpoint_fn: Optional[Callable[[int, int, int], None]] = None, resume_from_log: bool = False, - epoch_per_checkpoint: int = 1, reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, logger: BaseLogger = LazyLogger(), verbose: bool = True, @@ -51,8 +51,6 @@ def offline_trainer( signature ``f(epoch: int, env_step: int, gradient_step: int) -> None``; you can save whatever you want. Because offline-RL doesn't have env_step, the env_step is always 0 here. - :param int epoch_per_checkpoint: save train process each ``epoch_per_checkpoint`` - epoch by calling ``save_checkpoint_fn``. Default to 1. :param bool resume_from_log: resume gradient_step and other metadata from existing tensorboard log. Default to False. :param function stop_fn: a function with signature ``f(mean_rewards: float) -> @@ -70,6 +68,9 @@ def offline_trainer( :return: See :func:`~tianshou.trainer.gather_info`. """ + if save_fn: + warnings.warn("Please consider using save_checkpoint_fn instead of save_fn.") + start_epoch, gradient_step = 0, 0 if resume_from_log: start_epoch, _, gradient_step = logger.restore_data() @@ -106,8 +107,7 @@ def offline_trainer( best_epoch, best_reward, best_reward_std = epoch, rew, rew_std if save_fn: save_fn(policy) - if epoch_per_checkpoint > 0 and epoch % epoch_per_checkpoint == 0: - logger.save_data(epoch, 0, gradient_step, save_checkpoint_fn) + logger.save_data(epoch, 0, gradient_step, save_checkpoint_fn) if verbose: print(f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_rew" f"ard: {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") diff --git a/tianshou/trainer/offpolicy.py b/tianshou/trainer/offpolicy.py index abb50694f..8a72c3a0d 100644 --- a/tianshou/trainer/offpolicy.py +++ b/tianshou/trainer/offpolicy.py @@ -1,5 +1,6 @@ import time import tqdm +import warnings import numpy as np from collections import defaultdict from typing import Dict, Union, Callable, Optional @@ -26,7 +27,6 @@ def offpolicy_trainer( save_fn: Optional[Callable[[BasePolicy], None]] = None, save_checkpoint_fn: Optional[Callable[[int, int, int], None]] = None, resume_from_log: bool = False, - epoch_per_checkpoint: int = 1, reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, logger: BaseLogger = LazyLogger(), verbose: bool = True, @@ -65,8 +65,6 @@ def offpolicy_trainer( :param function save_checkpoint_fn: a function to save training process, with the signature ``f(epoch: int, env_step: int, gradient_step: int) -> None``; you can save whatever you want. - :param int epoch_per_checkpoint: save train process each ``epoch_per_checkpoint`` - epoch by calling ``save_checkpoint_fn``. Default to 1. :param bool resume_from_log: resume env_step/gradient_step and other metadata from existing tensorboard log. Default to False. :param function stop_fn: a function with signature ``f(mean_rewards: float) -> @@ -85,6 +83,9 @@ def offpolicy_trainer( :return: See :func:`~tianshou.trainer.gather_info`. """ + if save_fn: + warnings.warn("Please consider using save_checkpoint_fn instead of save_fn.") + start_epoch, env_step, gradient_step = 0, 0, 0 if resume_from_log: start_epoch, env_step, gradient_step = logger.restore_data() @@ -158,8 +159,7 @@ def offpolicy_trainer( best_epoch, best_reward, best_reward_std = epoch, rew, rew_std if save_fn: save_fn(policy) - if epoch_per_checkpoint > 0 and epoch % epoch_per_checkpoint == 0: - logger.save_data(epoch, env_step, gradient_step, save_checkpoint_fn) + logger.save_data(epoch, env_step, gradient_step, save_checkpoint_fn) if verbose: print(f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_rew" f"ard: {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") diff --git a/tianshou/trainer/onpolicy.py b/tianshou/trainer/onpolicy.py index 7b7f42cd5..53528eb38 100644 --- a/tianshou/trainer/onpolicy.py +++ b/tianshou/trainer/onpolicy.py @@ -1,5 +1,6 @@ import time import tqdm +import warnings import numpy as np from collections import defaultdict from typing import Dict, Union, Callable, Optional @@ -27,7 +28,6 @@ def onpolicy_trainer( save_fn: Optional[Callable[[BasePolicy], None]] = None, save_checkpoint_fn: Optional[Callable[[int, int, int], None]] = None, resume_from_log: bool = False, - epoch_per_checkpoint: int = 1, reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, logger: BaseLogger = LazyLogger(), verbose: bool = True, @@ -67,8 +67,6 @@ def onpolicy_trainer( :param function save_checkpoint_fn: a function to save training process, with the signature ``f(epoch: int, env_step: int, gradient_step: int) -> None``; you can save whatever you want. - :param int epoch_per_checkpoint: save train process each ``epoch_per_checkpoint`` - epoch by calling ``save_checkpoint_fn``. Default to 1. :param bool resume_from_log: resume env_step/gradient_step and other metadata from existing tensorboard log. Default to False. :param function stop_fn: a function with signature ``f(mean_rewards: float) -> @@ -91,6 +89,9 @@ def onpolicy_trainer( Only either one of step_per_collect and episode_per_collect can be specified. """ + if save_fn: + warnings.warn("Please consider using save_checkpoint_fn instead of save_fn.") + start_epoch, env_step, gradient_step = 0, 0, 0 if resume_from_log: start_epoch, env_step, gradient_step = logger.restore_data() @@ -169,8 +170,7 @@ def onpolicy_trainer( best_epoch, best_reward, best_reward_std = epoch, rew, rew_std if save_fn: save_fn(policy) - if epoch_per_checkpoint > 0 and epoch % epoch_per_checkpoint == 0: - logger.save_data(epoch, env_step, gradient_step, save_checkpoint_fn) + logger.save_data(epoch, env_step, gradient_step, save_checkpoint_fn) if verbose: print(f"Epoch #{epoch}: test_reward: {rew:.6f} ± {rew_std:.6f}, best_rew" f"ard: {best_reward:.6f} ± {best_reward_std:.6f} in #{best_epoch}") diff --git a/tianshou/utils/log_tools.py b/tianshou/utils/log_tools.py index a28c1607a..2a8d3a251 100644 --- a/tianshou/utils/log_tools.py +++ b/tianshou/utils/log_tools.py @@ -1,4 +1,3 @@ -import warnings import numpy as np from numbers import Number from abc import ABC, abstractmethod @@ -69,8 +68,7 @@ def save_data( :param function save_checkpoint_fn: a hook defined by user, see trainer documentation for detail. """ - if save_checkpoint_fn: - save_checkpoint_fn(epoch, env_step, gradient_step) + pass def restore_data(self) -> Tuple[int, int, int]: """Return the metadata from existing log. @@ -78,10 +76,9 @@ def restore_data(self) -> Tuple[int, int, int]: If it finds nothing or an error occurs during the recover process, it will return the default parameters. - :return: epoch, env_step, gradient_step + :return: epoch, env_step, gradient_step. """ - warnings.warn("Please specify an existing tensorboard logdir to resume.") - return 0, 0, 0 + pass class BasicLogger(BaseLogger): @@ -94,6 +91,8 @@ class BasicLogger(BaseLogger): :param int train_interval: the log interval in log_train_data(). Default to 1. :param int test_interval: the log interval in log_test_data(). Default to 1. :param int update_interval: the log interval in log_update_data(). Default to 1000. + :param int save_interval: the save interval in save_data(). Default to 1 (save at + the end of each epoch). """ def __init__( @@ -102,15 +101,17 @@ def __init__( train_interval: int = 1, test_interval: int = 1, update_interval: int = 1000, - resume: bool = True, + save_interval: int = 1, ) -> None: super().__init__(writer) self.train_interval = train_interval self.test_interval = test_interval self.update_interval = update_interval + self.save_interval = save_interval self.last_log_train_step = -1 self.last_log_test_step = -1 self.last_log_update_step = -1 + self.last_save_step = -1 def write(self, key: str, x: int, y: WRITE_TYPE, **kwargs: Any) -> None: self.writer.add_scalar(key, y, global_step=x) @@ -171,10 +172,12 @@ def save_data( gradient_step: int, save_checkpoint_fn: Optional[Callable[[int, int, int], None]] = None, ) -> None: - super().save_data(epoch, env_step, gradient_step, save_checkpoint_fn) - self.write("save/epoch", epoch, epoch) - self.write("save/env_step", env_step, env_step) - self.write("save/gradient_step", gradient_step, gradient_step) + if save_checkpoint_fn and epoch - self.last_save_step >= self.save_interval: + self.last_save_step = epoch + save_checkpoint_fn(epoch, env_step, gradient_step) + self.write("save/epoch", epoch, epoch) + self.write("save/env_step", env_step, env_step) + self.write("save/gradient_step", gradient_step, gradient_step) def restore_data(self) -> Tuple[int, int, int]: ea = event_accumulator.EventAccumulator(self.writer.log_dir) @@ -182,7 +185,7 @@ def restore_data(self) -> Tuple[int, int, int]: try: # epoch / gradient_step epoch = ea.scalars.Items("save/epoch")[-1].step - self.last_log_test_step = epoch + self.last_save_step = self.last_log_test_step = epoch gradient_step = ea.scalars.Items("save/gradient_step")[-1].step self.last_log_update_step = gradient_step except KeyError: From ba2027965b157f92ba33cf03536174ecb1ff0d46 Mon Sep 17 00:00:00 2001 From: ark Date: Thu, 29 Apr 2021 12:43:18 +0800 Subject: [PATCH 11/14] Fix: offpolicy test with gradient_step at the beginning --- tianshou/trainer/offpolicy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tianshou/trainer/offpolicy.py b/tianshou/trainer/offpolicy.py index 8a72c3a0d..5df2d6f29 100644 --- a/tianshou/trainer/offpolicy.py +++ b/tianshou/trainer/offpolicy.py @@ -96,7 +96,7 @@ def offpolicy_trainer( test_collector.reset_stat() test_in_train = test_in_train and train_collector.policy == policy test_result = test_episode(policy, test_collector, test_fn, start_epoch, - episode_per_test, logger, gradient_step, reward_metric) + episode_per_test, logger, env_step, reward_metric) best_epoch = start_epoch best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] From 527017b3a36e9ac6ccef00d767d7c9cf3995e453 Mon Sep 17 00:00:00 2001 From: Trinkle23897 Date: Thu, 29 Apr 2021 17:45:31 +0800 Subject: [PATCH 12/14] specify tensorboard version --- setup.py | 2 +- tianshou/trainer/onpolicy.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 04149ea1e..284ae0f56 100644 --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ def get_version() -> str: "gym>=0.15.4", "tqdm", "numpy>1.16.0", # https://github.com/numpy/numpy/issues/12793 - "tensorboard", + "tensorboard>=2.5.0", "torch>=1.4.0", "numba>=0.51.0", "h5py>=2.10.0", # to match tensorflow's minimal requirements diff --git a/tianshou/trainer/onpolicy.py b/tianshou/trainer/onpolicy.py index 53528eb38..379aba68e 100644 --- a/tianshou/trainer/onpolicy.py +++ b/tianshou/trainer/onpolicy.py @@ -102,7 +102,7 @@ def onpolicy_trainer( test_collector.reset_stat() test_in_train = test_in_train and train_collector.policy == policy test_result = test_episode(policy, test_collector, test_fn, start_epoch, - episode_per_test, logger, gradient_step, reward_metric) + episode_per_test, logger, env_step, reward_metric) best_epoch = start_epoch best_reward, best_reward_std = test_result["rew"], test_result["rew_std"] From 623bf31468da8e82641e1b4a02d375154f29890e Mon Sep 17 00:00:00 2001 From: Trinkle23897 Date: Thu, 29 Apr 2021 17:46:55 +0800 Subject: [PATCH 13/14] docs --- docs/tutorials/cheatsheet.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/tutorials/cheatsheet.rst b/docs/tutorials/cheatsheet.rst index 24b38ccd2..65a6e0f99 100644 --- a/docs/tutorials/cheatsheet.rst +++ b/docs/tutorials/cheatsheet.rst @@ -56,6 +56,8 @@ We provide an example to show how these steps work: checkout `test_c51.py = 2.5.0``. + .. _parallel_sampling: Parallel Sampling From fc2d3fcaf5be69cf1423f0eb9a62311bd7c46fb6 Mon Sep 17 00:00:00 2001 From: Trinkle23897 Date: Thu, 6 May 2021 08:30:08 +0800 Subject: [PATCH 14/14] docs --- docs/tutorials/cheatsheet.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/tutorials/cheatsheet.rst b/docs/tutorials/cheatsheet.rst index 65a6e0f99..7f8095e1d 100644 --- a/docs/tutorials/cheatsheet.rst +++ b/docs/tutorials/cheatsheet.rst @@ -56,7 +56,7 @@ We provide an example to show how these steps work: checkout `test_c51.py = 2.5.0``. +To correctly render the data (including several tfevent files), we highly recommend using ``tensorboard >= 2.5.0`` (see `here `_ for the reason). Otherwise, it may cause overlapping issue that you need to manually handle with. .. _parallel_sampling: