From 03b75c6ceaf746cf982a8e2224a55d590d4974cc Mon Sep 17 00:00:00 2001 From: Danielle Robinson Date: Fri, 10 Jul 2020 11:19:19 -0700 Subject: [PATCH 1/8] Passing num_workers, num_shuffle_batches and is_cached through the shell --- src/gluonts/model/estimator.py | 7 ++++++- src/gluonts/shell/sagemaker/__init__.py | 21 +++++++++++++++++++-- src/gluonts/shell/sagemaker/path.py | 1 + src/gluonts/shell/train.py | 22 +++++++++++++++++++--- src/gluonts/testutil/shell.py | 12 ++++++++++-- test/shell/test_shell.py | 8 +++++++- 6 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/gluonts/model/estimator.py b/src/gluonts/model/estimator.py index b63340d323..aa4c26dab5 100644 --- a/src/gluonts/model/estimator.py +++ b/src/gluonts/model/estimator.py @@ -259,9 +259,14 @@ def train( validation_data: Optional[Dataset] = None, num_workers: Optional[int] = None, num_prefetch: Optional[int] = None, - shuffle_buffer_length: Optional[int] = None, + num_batches_shuffle: Optional[int] = None, **kwargs, ) -> Predictor: + shuffle_buffer_length = ( + self.trainer.batch_size * num_batches_shuffle + if num_batches_shuffle is not None + else None + ) return self.train_model( training_data, validation_data, diff --git a/src/gluonts/shell/sagemaker/__init__.py b/src/gluonts/shell/sagemaker/__init__.py index 2532ac42a6..cc2690d84b 100644 --- a/src/gluonts/shell/sagemaker/__init__.py +++ b/src/gluonts/shell/sagemaker/__init__.py @@ -45,8 +45,15 @@ def __init__(self, path: TrainPaths) -> None: self.hyperparameters = _load_hyperparameters( self.path.hyperparameters, self.channels ) + self.train_auxillary_parameters = _load_train_auxillary_parameters( + self.path.train_auxillary_parameters + ) self.current_host = _get_current_host(self.path.resourceconfig) - self.datasets = _load_datasets(self.hyperparameters, self.channels) + self.datasets = _load_datasets( + self.hyperparameters, + self.train_auxillary_parameters, + self.channels, + ) class ServeEnv: @@ -121,6 +128,15 @@ def _load_hyperparameters(path: Path, channels) -> dict: return hyperparameters +def _load_train_auxillary_parameters(path: Path) -> dict: + with path.open() as json_file: + train_auxillary_parameters = decode_sagemaker_parameters( + json.load(json_file) + ) + + return train_auxillary_parameters + + def _get_current_host(resourceconfig: Path) -> str: if not resourceconfig.exists(): return "local" @@ -131,7 +147,8 @@ def _get_current_host(resourceconfig: Path) -> str: def _load_datasets( - hyperparameters: dict, channels: Dict[str, Path] + hyperparameters: dict, + channels: Dict[str, Path], ) -> Dict[str, FileDataset]: freq = hyperparameters["freq"] listify_dataset = strtobool(hyperparameters.get("listify_dataset", "no")) diff --git a/src/gluonts/shell/sagemaker/path.py b/src/gluonts/shell/sagemaker/path.py index b21bc12cab..2190dae3ed 100644 --- a/src/gluonts/shell/sagemaker/path.py +++ b/src/gluonts/shell/sagemaker/path.py @@ -25,6 +25,7 @@ def __init__(self, base: Path = Path("/opt/ml")) -> None: self.output: Path = self.base / "output" self.hyperparameters: Path = self.config / "hyperparameters.json" + self.train_auxillary_parameters: Path = self.config / "trainauxparameters.json" self.inputdataconfig: Path = self.config / "inputdataconfig.json" self.resourceconfig: Path = self.config / "resourceconfig.json" diff --git a/src/gluonts/shell/train.py b/src/gluonts/shell/train.py index f9082ecdbf..039f3b2796 100644 --- a/src/gluonts/shell/train.py +++ b/src/gluonts/shell/train.py @@ -13,8 +13,12 @@ # Standard library imports import logging +import multiprocessing from typing import Any, Optional, Type, Union +# Third-party imports +import numpy as np + # First-party imports import gluonts from gluonts.core import fqname_for @@ -82,9 +86,10 @@ def run_train_and_test( predictor = forecaster else: predictor = run_train( - forecaster, env.datasets["train"], env.datasets.get("validation") + forecaster, env.datasets["train"], env.datasets.get("validation"), env.train_auxillary_parameters ) + predictor.serialize(env.path.model) if "test" in env.datasets: @@ -95,10 +100,21 @@ def run_train( forecaster: Estimator, train_dataset: Dataset, validation_dataset: Optional[Dataset], + train_auxillary_parameters: dict, ) -> Predictor: - return forecaster.train( - training_data=train_dataset, validation_data=validation_dataset + num_workers = ( + int(train_auxillary_parameters["num_workers"]) if "num_workers" in train_auxillary_parameters + else min(4, int(np.ceil(np.sqrt(multiprocessing.cpu_count())))) ) + num_batches_shuffle = ( + int(train_auxillary_parameters["num_batches_shuffle"]) if "num_batches_shuffle" in train_auxillary_parameters + else None + ) + + return forecaster.train(training_data=train_dataset, + validation_data=validation_dataset, + num_workers=num_workers, + num_batches_shuffle=num_batches_shuffle) def run_test( diff --git a/src/gluonts/testutil/shell.py b/src/gluonts/testutil/shell.py index 98f77d2e9b..d85498073d 100644 --- a/src/gluonts/testutil/shell.py +++ b/src/gluonts/testutil/shell.py @@ -166,11 +166,11 @@ def temporary_server( @contextmanager # type: ignore def temporary_train_env( - hyperparameters: Dict[str, Any], dataset_name: str + hyperparameters: Dict[str, Any], train_auxillary_parameters: Dict[str, Any], dataset_name: str ) -> ContextManager[TrainEnv]: """ A context manager that instantiates a training environment from a given - combination of `hyperparameters` and `dataset_name` in a temporary + combination of `hyperparameters`, `train_auxillary_parameters` and `dataset_name` in a temporary directory and removes the directory on exit. Parameters @@ -178,6 +178,9 @@ def temporary_train_env( hyperparameters The hyperparameters to use when instantiating the training environment. + train_auxillary_parameters + The train_auxillary_parameters to use when instantiating the + training environment. dataset_name The name of the repository dataset to use when instantiating the training environment. @@ -196,6 +199,11 @@ def temporary_train_env( hps_encoded = encode_sagemaker_parameters(hyperparameters) json.dump(hps_encoded, fp, indent=2, sort_keys=True) + # write train_auxillary_parameters + with paths.train_auxillary_parameters.open(mode="w") as fp: + train_aux_params_encoded = encode_sagemaker_parameters(train_auxillary_parameters) + json.dump(train_aux_params_encoded, fp, indent=2, sort_keys=True) + # save dataset ds_path = materialize_dataset(dataset_name) diff --git a/test/shell/test_shell.py b/test/shell/test_shell.py index c44ad5672d..81ad35b5a0 100644 --- a/test/shell/test_shell.py +++ b/test/shell/test_shell.py @@ -52,7 +52,13 @@ def train_env(listify_dataset) -> ContextManager[TrainEnv]: "num_samples": num_samples, "listify_dataset": listify_dataset, } - with testutil.temporary_train_env(hyperparameters, "constant") as env: + train_auxillary_parameters = { + "num_workers": 2, + "num_batches_shuffle": 8, + "is_cached": True + } + + with testutil.temporary_train_env(hyperparameters, train_auxillary_parameters, "constant") as env: yield env From 5aa93ef81bf883f811786ebee228fd4c03ad15c2 Mon Sep 17 00:00:00 2001 From: Danielle Robinson Date: Sat, 25 Jul 2020 23:48:53 -0700 Subject: [PATCH 2/8] Pass num_workers, num_prefetch and shuffle_buffer_length through the shell --- src/gluonts/dataset/parallelized_loader.py | 36 +++++++++----- src/gluonts/shell/sagemaker/__init__.py | 32 ++++-------- src/gluonts/shell/train.py | 36 +++++++++----- src/gluonts/testutil/dummy_datasets.py | 4 -- test/model/seq2seq/test_model.py | 2 - test/shell/test_shell.py | 57 +++++++++++----------- 6 files changed, 85 insertions(+), 82 deletions(-) diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index 4f9676be70..0708bc5bfc 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -13,14 +13,12 @@ # Standard library imports -import collections import functools import io import itertools import logging import multiprocessing import multiprocessing.queues -import pathlib import pickle import random import sys @@ -30,7 +28,7 @@ from multiprocessing.pool import Pool from multiprocessing.reduction import ForkingPickler from queue import Queue -from typing import Any, Callable, Iterable, Iterator, List, Optional, Union +from typing import Any, Callable, Iterator, List, Optional, Union import mxnet as mx @@ -362,10 +360,9 @@ class ShuffleIter(Iterator[DataEntry]): A wrapper class which takes a serialized iterator as an input and generates a pseudo randomized iterator using the same elements from the input iterator. """ - def __init__( self, base_iterator: Iterator[DataEntry], shuffle_buffer_length: int - ): + ) -> None: self.shuffle_buffer: list = [] self.shuffle_buffer_length = shuffle_buffer_length self.base_iterator = base_iterator @@ -418,7 +415,7 @@ def __init__( dataset_len: int, timeout: int, shuffle_buffer_length: Optional[int], - ): + ) -> None: self._worker_pool = worker_pool self._batchify_fn = batchify_fn self._data_buffer: dict = ( @@ -471,6 +468,7 @@ def _push_next(self) -> None: def __next__(self) -> DataBatch: # Try to get a batch, sometimes its possible that an iterator was # exhausted and thus we don't get a new batch + logger = logging.getLogger(__name__) success = False while not success: try: @@ -508,7 +506,7 @@ def __next__(self) -> DataBatch: # or return with the right context straight away return _as_in_context(batch, self._ctx) except multiprocessing.context.TimeoutError: - print( + logger.error( f"Worker timed out after {self._timeout} seconds. This might be caused by " "\n - Slow transform. Please increase timeout to allow slower data loading in each worker. " "\n - Insufficient shared_memory if `timeout` is large enough. " @@ -516,7 +514,9 @@ def __next__(self) -> DataBatch: ) raise except Exception: - print("An unexpected error occurred in the WorkerIterator.") + logger.error( + "An unexpected error occurred in the WorkerIterator." + ) self._worker_pool.terminate() raise return {} @@ -560,7 +560,7 @@ class ParallelDataLoader(object): but will consume more shared_memory. Using smaller number may forfeit the purpose of using multiple worker processes, try reduce `num_workers` in this case. By default it defaults to `num_workers * 2`. - shuffle_buffer_length + shuffle_buffer_length The length of the buffer used to do pseudo shuffle. If not None, the loader will perform pseudo shuffle when generating batches. Note that using a larger buffer will provide more randomized batches, but will make the job require a bit @@ -580,10 +580,11 @@ def __init__( num_prefetch: Optional[int] = None, num_workers: Optional[int] = None, shuffle_buffer_length: Optional[int] = None, - ): + ) -> None: + self.logger = logging.getLogger(__name__) # Some windows error with the ForkingPickler prevents usage currently: if sys.platform == "win32": - logging.warning( + self.logger.warning( "You have set `num_workers` to a non zero value, " "however, currently multiprocessing is not supported on windows and therefore" "`num_workers will be set to 0." @@ -593,7 +594,7 @@ def __init__( if num_workers is not None and num_workers > 0: if isinstance(dataset, FileDataset): if not dataset.cache: - logging.warning( + self.logger.warning( "You have set `num_workers` to a non zero value, " "however, you have not enabled caching for your FileDataset. " "To improve training performance you can enable caching for the FileDataset. " @@ -637,11 +638,17 @@ def __init__( num_workers if num_workers is not None else default_num_workers, self.dataset_len, ) # cannot have more than dataset entries + self.logger.info( + f"gluonts[multiprocessing]: num_workers={self.num_workers}" + ) self.num_prefetch = ( num_prefetch if num_prefetch is not None else 2 * self.num_workers ) + self.logger.info( + f"gluonts[multiprocessing]: num_prefetch={self.num_prefetch}" + ) if self.num_prefetch < self.num_workers: - logging.warning( + self.logger.warning( "You have set `num_prefetch` to less than `num_workers`, which is counter productive." "If you want to reduce load, reduce `num_workers`." ) @@ -652,6 +659,9 @@ def __init__( # In order to recycle unused but pre-calculated batches from last epoch for training: self.multi_worker_cache: Optional[Iterator[DataBatch]] = None self.shuffle_buffer_length: Optional[int] = shuffle_buffer_length + self.logger.info( + f"gluonts[multiprocessing]: shuffle_buffer_length={self.shuffle_buffer_length}" + ) if self.num_workers > 0: # generate unique ids for processes diff --git a/src/gluonts/shell/sagemaker/__init__.py b/src/gluonts/shell/sagemaker/__init__.py index cc2690d84b..51dc716d0a 100644 --- a/src/gluonts/shell/sagemaker/__init__.py +++ b/src/gluonts/shell/sagemaker/__init__.py @@ -14,13 +14,14 @@ # Standard library imports from distutils.util import strtobool import json +import logging import os from pathlib import Path from pydantic import BaseModel from typing import Dict, Optional # First party imports -from gluonts.dataset.common import FileDataset, ListDataset, MetaData +from gluonts.dataset.common import Dataset, FileDataset, ListDataset, MetaData from gluonts.model.forecast import Config as ForecastConfig from gluonts.support.util import map_dct_values @@ -45,15 +46,8 @@ def __init__(self, path: TrainPaths) -> None: self.hyperparameters = _load_hyperparameters( self.path.hyperparameters, self.channels ) - self.train_auxillary_parameters = _load_train_auxillary_parameters( - self.path.train_auxillary_parameters - ) self.current_host = _get_current_host(self.path.resourceconfig) - self.datasets = _load_datasets( - self.hyperparameters, - self.train_auxillary_parameters, - self.channels, - ) + self.datasets = _load_datasets(self.hyperparameters, self.channels) class ServeEnv: @@ -128,15 +122,6 @@ def _load_hyperparameters(path: Path, channels) -> dict: return hyperparameters -def _load_train_auxillary_parameters(path: Path) -> dict: - with path.open() as json_file: - train_auxillary_parameters = decode_sagemaker_parameters( - json.load(json_file) - ) - - return train_auxillary_parameters - - def _get_current_host(resourceconfig: Path) -> str: if not resourceconfig.exists(): return "local" @@ -147,11 +132,12 @@ def _get_current_host(resourceconfig: Path) -> str: def _load_datasets( - hyperparameters: dict, - channels: Dict[str, Path], -) -> Dict[str, FileDataset]: + hyperparameters: dict, channels: Dict[str, Path] +) -> Dict[str, Dataset]: + logger = logging.getLogger(__name__) freq = hyperparameters["freq"] listify_dataset = strtobool(hyperparameters.get("listify_dataset", "no")) + logger.info(f"gluonts[cached]: listify_dataset = {listify_dataset}") dataset_dict = {} for name in DATASET_NAMES: if name in channels: @@ -161,5 +147,7 @@ def _load_datasets( if listify_dataset else file_dataset ) - + logger.info( + f"gluonts[cached]: Type of {name} dataset is {type(dataset_dict[name])}" + ) return dataset_dict diff --git a/src/gluonts/shell/train.py b/src/gluonts/shell/train.py index 039f3b2796..4d96a8fb12 100644 --- a/src/gluonts/shell/train.py +++ b/src/gluonts/shell/train.py @@ -86,10 +86,12 @@ def run_train_and_test( predictor = forecaster else: predictor = run_train( - forecaster, env.datasets["train"], env.datasets.get("validation"), env.train_auxillary_parameters + forecaster=forecaster, + train_dataset=env.datasets["train"], + validation_dataset=env.datasets.get("validation"), + hyperparameters=env.hyperparameters, ) - predictor.serialize(env.path.model) if "test" in env.datasets: @@ -99,22 +101,30 @@ def run_train_and_test( def run_train( forecaster: Estimator, train_dataset: Dataset, + hyperparameters: dict, validation_dataset: Optional[Dataset], - train_auxillary_parameters: dict, ) -> Predictor: - num_workers = ( - int(train_auxillary_parameters["num_workers"]) if "num_workers" in train_auxillary_parameters - else min(4, int(np.ceil(np.sqrt(multiprocessing.cpu_count())))) + num_workers = min( + int(hyperparameters.get("num_workers", 4)), + int(np.ceil(np.sqrt(multiprocessing.cpu_count()))), ) - num_batches_shuffle = ( - int(train_auxillary_parameters["num_batches_shuffle"]) if "num_batches_shuffle" in train_auxillary_parameters + shuffle_buffer_length = ( + int(hyperparameters["shuffle_buffer_length"]) + if "shuffle_buffer_length" in hyperparameters.keys() else None ) - - return forecaster.train(training_data=train_dataset, - validation_data=validation_dataset, - num_workers=num_workers, - num_batches_shuffle=num_batches_shuffle) + num_prefetch = ( + int(hyperparameters["num_prefetch"]) + if "num_prefetch" in hyperparameters.keys() + else None + ) + return forecaster.train( + training_data=train_dataset, + validation_data=validation_dataset, + num_workers=num_workers, + num_prefetch=num_prefetch, + shuffle_buffer_length=shuffle_buffer_length, + ) def run_test( diff --git a/src/gluonts/testutil/dummy_datasets.py b/src/gluonts/testutil/dummy_datasets.py index 3f95292c10..03f3c712e6 100644 --- a/src/gluonts/testutil/dummy_datasets.py +++ b/src/gluonts/testutil/dummy_datasets.py @@ -16,10 +16,6 @@ from random import randint from typing import List, Tuple -# Third-party imports -import numpy as np -import pytest - # First-party imports from gluonts.dataset.common import ListDataset from gluonts.dataset.field_names import FieldName diff --git a/test/model/seq2seq/test_model.py b/test/model/seq2seq/test_model.py index 11bf1c9c11..c5aa507126 100644 --- a/test/model/seq2seq/test_model.py +++ b/test/model/seq2seq/test_model.py @@ -176,10 +176,8 @@ def test_backwards_compatibility(): "use_past_feat_dynamic_real": True, "enable_encoder_dynamic_feature": True, "enable_decoder_dynamic_feature": True, - "num_workers": 0, "scaling": True, "scaling_decoder_dynamic_feature": True, - "num_batches_shuffle": 8, } dataset_train, dataset_test = make_dummy_datasets_with_features( diff --git a/test/shell/test_shell.py b/test/shell/test_shell.py index 81ad35b5a0..ed387cfb30 100644 --- a/test/shell/test_shell.py +++ b/test/shell/test_shell.py @@ -13,17 +13,19 @@ # Standard library imports import json -from typing import ContextManager, Optional +from typing import ContextManager import sys +from distutils.util import strtobool # Third-party imports import numpy as np import pytest # First-party imports -from gluonts.core.component import equals from gluonts.dataset.common import FileDataset, ListDataset +from gluonts.core.component import equals from gluonts.model.trivial.mean import MeanPredictor +from gluonts.model.seq2seq import MQCNNEstimator from gluonts.shell.sagemaker import ServeEnv, TrainEnv from gluonts.shell.train import run_train_and_test @@ -51,14 +53,11 @@ def train_env(listify_dataset) -> ContextManager[TrainEnv]: "prediction_length": prediction_length, "num_samples": num_samples, "listify_dataset": listify_dataset, + "num_workers": 3, + "num_prefetch": 4, + "shuffle_buffer_length": 256, } - train_auxillary_parameters = { - "num_workers": 2, - "num_batches_shuffle": 8, - "is_cached": True - } - - with testutil.temporary_train_env(hyperparameters, train_auxillary_parameters, "constant") as env: + with testutil.temporary_train_env(hyperparameters, "constant") as env: yield env @@ -104,34 +103,36 @@ def batch_transform(monkeypatch, train_env): return inference_config -@pytest.mark.parametrize("listify_dataset", [True, False]) +@pytest.mark.parametrize("listify_dataset", ["yes", "no"]) def test_listify_dataset(train_env: TrainEnv, listify_dataset): for dataset_name in train_env.datasets.keys(): assert ( isinstance(train_env.datasets[dataset_name], ListDataset) - if listify_dataset + if strtobool(listify_dataset) else isinstance(train_env.datasets[dataset_name], FileDataset) ) -@pytest.mark.parametrize("listify_dataset", [True, False]) -def test_train_shell(train_env: TrainEnv, caplog) -> None: - run_train_and_test(env=train_env, forecaster_type=MeanPredictor) +@pytest.mark.parametrize("listify_dataset", ["yes", "no"]) +@pytest.mark.parametrize("forecaster_type", [MeanPredictor, MQCNNEstimator]) +def test_train_shell(train_env: TrainEnv, caplog, forecaster_type) -> None: + run_train_and_test(env=train_env, forecaster_type=forecaster_type) - for _, _, line in caplog.record_tuples: - if "#test_score (local, QuantileLoss" in line: - assert line.endswith("0.0") - if "local, wQuantileLoss" in line: - assert line.endswith("0.0") - if "local, Coverage" in line: - assert line.endswith("0.0") - if "MASE" in line or "MSIS" in line: - assert line.endswith("0.0") - if "abs_target_sum" in line: - assert line.endswith("270.0") + if forecaster_type == MeanPredictor: + for _, _, line in caplog.record_tuples: + if "#test_score (local, QuantileLoss" in line: + assert line.endswith("0.0") + if "local, wQuantileLoss" in line: + assert line.endswith("0.0") + if "local, Coverage" in line: + assert line.endswith("0.0") + if "MASE" in line or "MSIS" in line: + assert line.endswith("0.0") + if "abs_target_sum" in line: + assert line.endswith("270.0") -@pytest.mark.parametrize("listify_dataset", [True, False]) +@pytest.mark.parametrize("listify_dataset", ["yes", "no"]) def test_server_shell( train_env: TrainEnv, static_server: "testutil.ServerFacade", caplog ) -> None: @@ -173,7 +174,7 @@ def test_server_shell( assert equals(exp_samples, act_samples) -@pytest.mark.parametrize("listify_dataset", [True, False]) +@pytest.mark.parametrize("listify_dataset", ["yes", "no"]) def test_dynamic_shell( train_env: TrainEnv, dynamic_server: "testutil.ServerFacade", caplog ) -> None: @@ -216,7 +217,7 @@ def test_dynamic_shell( assert equals(exp_samples, act_samples) -@pytest.mark.parametrize("listify_dataset", [True, False]) +@pytest.mark.parametrize("listify_dataset", ["yes", "no"]) def test_dynamic_batch_shell( batch_transform, train_env: TrainEnv, From 7c4cef4880143c8e9b36a00693371a5a160305f8 Mon Sep 17 00:00:00 2001 From: Danielle Robinson Date: Sat, 25 Jul 2020 23:59:53 -0700 Subject: [PATCH 3/8] Fix black --- src/gluonts/dataset/parallelized_loader.py | 11 +++++------ src/gluonts/shell/sagemaker/path.py | 1 - src/gluonts/testutil/shell.py | 12 ++---------- test/shell/test_shell.py | 2 +- 4 files changed, 8 insertions(+), 18 deletions(-) diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index 0708bc5bfc..8b962fff6b 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -84,7 +84,7 @@ def reduce_ndarray(data): def _is_stackable( - arrays: List[Union[np.ndarray, mx.nd.NDArray, Any]], axis: int = 0, + arrays: List[Union[np.ndarray, mx.nd.NDArray, Any]], axis: int = 0 ) -> bool: """ Check if elements are scalars, have too few dimensions, or their @@ -97,7 +97,7 @@ def _is_stackable( def _pad_arrays( - data: List[Union[np.ndarray, mx.nd.NDArray]], axis: int = 0, + data: List[Union[np.ndarray, mx.nd.NDArray]], axis: int = 0 ) -> List[Union[np.ndarray, mx.nd.NDArray]]: assert isinstance(data[0], (np.ndarray, mx.nd.NDArray)) is_mx = isinstance(data[0], mx.nd.NDArray) @@ -237,9 +237,7 @@ def _sequential_sample_generator( cyclic: bool, ) -> Iterator[DataEntry]: while True: - yield from transformation( - data_it=dataset, is_train=is_train, - ) + yield from transformation(data_it=dataset, is_train=is_train) # Dont cycle if not training time if not cyclic: return @@ -360,6 +358,7 @@ class ShuffleIter(Iterator[DataEntry]): A wrapper class which takes a serialized iterator as an input and generates a pseudo randomized iterator using the same elements from the input iterator. """ + def __init__( self, base_iterator: Iterator[DataEntry], shuffle_buffer_length: int ) -> None: @@ -688,7 +687,7 @@ def __iter__(self) -> Iterator[DataBatch]: self.cycle_num += 1 if self.num_workers == 0: generator = _sequential_sample_generator( - self.dataset, self.transformation, self.is_train, self.cyclic, + self.dataset, self.transformation, self.is_train, self.cyclic ) if self.shuffle_buffer_length is not None: generator = ShuffleIter( diff --git a/src/gluonts/shell/sagemaker/path.py b/src/gluonts/shell/sagemaker/path.py index 2190dae3ed..b21bc12cab 100644 --- a/src/gluonts/shell/sagemaker/path.py +++ b/src/gluonts/shell/sagemaker/path.py @@ -25,7 +25,6 @@ def __init__(self, base: Path = Path("/opt/ml")) -> None: self.output: Path = self.base / "output" self.hyperparameters: Path = self.config / "hyperparameters.json" - self.train_auxillary_parameters: Path = self.config / "trainauxparameters.json" self.inputdataconfig: Path = self.config / "inputdataconfig.json" self.resourceconfig: Path = self.config / "resourceconfig.json" diff --git a/src/gluonts/testutil/shell.py b/src/gluonts/testutil/shell.py index d85498073d..98f77d2e9b 100644 --- a/src/gluonts/testutil/shell.py +++ b/src/gluonts/testutil/shell.py @@ -166,11 +166,11 @@ def temporary_server( @contextmanager # type: ignore def temporary_train_env( - hyperparameters: Dict[str, Any], train_auxillary_parameters: Dict[str, Any], dataset_name: str + hyperparameters: Dict[str, Any], dataset_name: str ) -> ContextManager[TrainEnv]: """ A context manager that instantiates a training environment from a given - combination of `hyperparameters`, `train_auxillary_parameters` and `dataset_name` in a temporary + combination of `hyperparameters` and `dataset_name` in a temporary directory and removes the directory on exit. Parameters @@ -178,9 +178,6 @@ def temporary_train_env( hyperparameters The hyperparameters to use when instantiating the training environment. - train_auxillary_parameters - The train_auxillary_parameters to use when instantiating the - training environment. dataset_name The name of the repository dataset to use when instantiating the training environment. @@ -199,11 +196,6 @@ def temporary_train_env( hps_encoded = encode_sagemaker_parameters(hyperparameters) json.dump(hps_encoded, fp, indent=2, sort_keys=True) - # write train_auxillary_parameters - with paths.train_auxillary_parameters.open(mode="w") as fp: - train_aux_params_encoded = encode_sagemaker_parameters(train_auxillary_parameters) - json.dump(train_aux_params_encoded, fp, indent=2, sort_keys=True) - # save dataset ds_path = materialize_dataset(dataset_name) diff --git a/test/shell/test_shell.py b/test/shell/test_shell.py index ed387cfb30..a0dd7107cd 100644 --- a/test/shell/test_shell.py +++ b/test/shell/test_shell.py @@ -22,8 +22,8 @@ import pytest # First-party imports -from gluonts.dataset.common import FileDataset, ListDataset from gluonts.core.component import equals +from gluonts.dataset.common import FileDataset, ListDataset from gluonts.model.trivial.mean import MeanPredictor from gluonts.model.seq2seq import MQCNNEstimator from gluonts.shell.sagemaker import ServeEnv, TrainEnv From 9d333974eb7f1b3da49ee9f6a53f006810f71845 Mon Sep 17 00:00:00 2001 From: Danielle Robinson Date: Sun, 26 Jul 2020 00:02:42 -0700 Subject: [PATCH 4/8] Fix spacing --- src/gluonts/dataset/parallelized_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index 8b962fff6b..cb44f118b2 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -559,7 +559,7 @@ class ParallelDataLoader(object): but will consume more shared_memory. Using smaller number may forfeit the purpose of using multiple worker processes, try reduce `num_workers` in this case. By default it defaults to `num_workers * 2`. - shuffle_buffer_length + shuffle_buffer_length The length of the buffer used to do pseudo shuffle. If not None, the loader will perform pseudo shuffle when generating batches. Note that using a larger buffer will provide more randomized batches, but will make the job require a bit From a7a9cc2be65f4207b4d8be10d02b034640ab6592 Mon Sep 17 00:00:00 2001 From: Danielle Robinson Date: Sun, 26 Jul 2020 08:15:09 -0700 Subject: [PATCH 5/8] Revert back to shuffle_buffer_length --- src/gluonts/model/estimator.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/gluonts/model/estimator.py b/src/gluonts/model/estimator.py index aa4c26dab5..b63340d323 100644 --- a/src/gluonts/model/estimator.py +++ b/src/gluonts/model/estimator.py @@ -259,14 +259,9 @@ def train( validation_data: Optional[Dataset] = None, num_workers: Optional[int] = None, num_prefetch: Optional[int] = None, - num_batches_shuffle: Optional[int] = None, + shuffle_buffer_length: Optional[int] = None, **kwargs, ) -> Predictor: - shuffle_buffer_length = ( - self.trainer.batch_size * num_batches_shuffle - if num_batches_shuffle is not None - else None - ) return self.train_model( training_data, validation_data, From 741ab2b518e0afa0299344f67338bb3d6ff06677 Mon Sep 17 00:00:00 2001 From: Danielle Robinson Date: Mon, 27 Jul 2020 08:31:12 -0700 Subject: [PATCH 6/8] Decreasing number of training epochs in the smoke test for MQCNN so that it finishes within the timeout limit --- test/shell/test_shell.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/shell/test_shell.py b/test/shell/test_shell.py index a0dd7107cd..d7af20dde7 100644 --- a/test/shell/test_shell.py +++ b/test/shell/test_shell.py @@ -56,6 +56,7 @@ def train_env(listify_dataset) -> ContextManager[TrainEnv]: "num_workers": 3, "num_prefetch": 4, "shuffle_buffer_length": 256, + "epochs": 3, } with testutil.temporary_train_env(hyperparameters, "constant") as env: yield env From 784abf5b709f992f7a706afc49b52650e9fea61f Mon Sep 17 00:00:00 2001 From: Aaron Spieler <25365592+AaronSpieler@users.noreply.github.com> Date: Mon, 27 Jul 2020 18:06:26 +0200 Subject: [PATCH 7/8] Default to num_workers 2. --- src/gluonts/shell/train.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/gluonts/shell/train.py b/src/gluonts/shell/train.py index 4d96a8fb12..dc3c4e2e3f 100644 --- a/src/gluonts/shell/train.py +++ b/src/gluonts/shell/train.py @@ -104,10 +104,7 @@ def run_train( hyperparameters: dict, validation_dataset: Optional[Dataset], ) -> Predictor: - num_workers = min( - int(hyperparameters.get("num_workers", 4)), - int(np.ceil(np.sqrt(multiprocessing.cpu_count()))), - ) + num_workers = int(hyperparameters.get("num_workers", 2)) shuffle_buffer_length = ( int(hyperparameters["shuffle_buffer_length"]) if "shuffle_buffer_length" in hyperparameters.keys() From d12cd512f7519bf932721b07a713fdab5633ae96 Mon Sep 17 00:00:00 2001 From: Danielle Robinson Date: Mon, 27 Jul 2020 14:30:38 -0700 Subject: [PATCH 8/8] Setting default num_workers = None to be set in the parallelized_loader as 0 so the user will need to turn multiprocessing explicilty on --- src/gluonts/dataset/parallelized_loader.py | 4 ++-- src/gluonts/shell/train.py | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index cb44f118b2..0f0807ab92 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -512,9 +512,9 @@ def __next__(self) -> DataBatch: "\n Please consider to reduce `num_workers` or increase shared_memory in system." ) raise - except Exception: + except Exception as e: logger.error( - "An unexpected error occurred in the WorkerIterator." + f"An unexpected error occurred in the WorkerIterator: {e}." ) self._worker_pool.terminate() raise diff --git a/src/gluonts/shell/train.py b/src/gluonts/shell/train.py index dc3c4e2e3f..259d7214db 100644 --- a/src/gluonts/shell/train.py +++ b/src/gluonts/shell/train.py @@ -104,7 +104,11 @@ def run_train( hyperparameters: dict, validation_dataset: Optional[Dataset], ) -> Predictor: - num_workers = int(hyperparameters.get("num_workers", 2)) + num_workers = ( + int(hyperparameters["num_workers"]) + if "num_workers" in hyperparameters.keys() + else None + ) shuffle_buffer_length = ( int(hyperparameters["shuffle_buffer_length"]) if "shuffle_buffer_length" in hyperparameters.keys()