From 8f40b9402d77eb9649713a7534ea785ba11c1ff1 Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Fri, 3 Apr 2020 21:27:47 +0200 Subject: [PATCH 01/20] Dataset iterations are now cache aligned and switched to default Pool method. --- src/gluonts/dataset/common.py | 14 +++++++++----- src/gluonts/dataset/jsonl.py | 13 +++++++++---- src/gluonts/dataset/parallelized_loader.py | 8 ++++++-- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/gluonts/dataset/common.py b/src/gluonts/dataset/common.py index df197d25b0..df181b484c 100644 --- a/src/gluonts/dataset/common.py +++ b/src/gluonts/dataset/common.py @@ -255,16 +255,20 @@ def __init__( ) -> None: self.process = ProcessDataEntry(freq, one_dim_target) self.list_data = list(data_iter) - # TODO: implement caching here def __iter__(self) -> Iterator[DataEntry]: source_name = "list_data" + chunk_size = int(len(self.list_data) / util.MPWorkerInfo.num_workers) for row_number, data in enumerate(self.list_data): # The dataset is equally distributed among the workers - if not ( - row_number % util.MPWorkerInfo.num_workers - == util.MPWorkerInfo.worker_id - ): + lower_bound = util.MPWorkerInfo.worker_id * chunk_size + upper_bound = ( + (util.MPWorkerInfo.worker_id + 1) * chunk_size + if util.MPWorkerInfo.worker_id + 1 + != util.MPWorkerInfo.num_workers + else np.inf + ) + if not lower_bound <= row_number < upper_bound: continue data = self.process(data) diff --git a/src/gluonts/dataset/jsonl.py b/src/gluonts/dataset/jsonl.py index b82c3205db..c2a4c644db 100644 --- a/src/gluonts/dataset/jsonl.py +++ b/src/gluonts/dataset/jsonl.py @@ -18,6 +18,7 @@ # Third-party imports import ujson as json +import numpy as np # First-party imports from gluonts.core.exception import GluonTSDataError @@ -61,13 +62,17 @@ def __init__(self, path) -> None: # TODO: implement caching here def __iter__(self): + chunk_size = int(self.__len__() / MPWorkerInfo.num_workers) with open(self.path) as jsonl_file: for line_number, raw in enumerate(jsonl_file): # The dataset is equally distributed among the workers - if not ( - line_number % MPWorkerInfo.num_workers - == MPWorkerInfo.worker_id - ): + lower_bound = MPWorkerInfo.worker_id * chunk_size + upper_bound = ( + (MPWorkerInfo.worker_id + 1) * chunk_size + if MPWorkerInfo.worker_id + 1 != MPWorkerInfo.num_workers + else np.inf + ) + if not lower_bound <= line_number < upper_bound: continue span = Span(path=self.path, line=line_number) diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index f9cbb0a3a3..a527d914c6 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -253,8 +253,9 @@ def _worker_fn( else: # the second time without being able to provide a batch we want to delay calling them again # on fist exhaustion they should not be delayed, since they need to indicate depletion + # dont make the penalty to high, since that delays rescheduling of non empty iterators if _WorkerData.iterator_exhausted_indicator: - time.sleep(0.1) + time.sleep(0.05) else: _WorkerData.iterator_exhausted_indicator = True success = False @@ -528,7 +529,10 @@ def __init__( for i in range(self.num_workers): self.worker_id_queue.put(i) - self.worker_pool = multiprocessing.get_context("spawn").Pool( + # Use multiprocessing.get_context("spawn").Pool to check whether + # implementation `clean`, i.e no unix forking magic required, + # Otherwise use recommended defaults + self.worker_pool = multiprocessing.Pool( self.num_workers, initializer=_worker_initializer, initargs=[ From c7aad219b917f6b0768deb25fd5840dabb1d31fe Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Mon, 6 Apr 2020 17:09:20 +0200 Subject: [PATCH 02/20] Added caching support for FileDataset. --- src/gluonts/dataset/common.py | 27 +++++++++++++---- src/gluonts/dataset/jsonl.py | 55 +++++++++++++++++++++-------------- test/dataset/test_loader.py | 7 ++++- 3 files changed, 60 insertions(+), 29 deletions(-) diff --git a/src/gluonts/dataset/common.py b/src/gluonts/dataset/common.py index df181b484c..f2f42cb010 100644 --- a/src/gluonts/dataset/common.py +++ b/src/gluonts/dataset/common.py @@ -185,26 +185,39 @@ class FileDataset(Dataset): Must be a valid Pandas frequency. one_dim_target Whether to accept only univariate target time series. + cache + Indicates whether the dataset should be cached or not. """ def __init__( - self, path: Path, freq: str, one_dim_target: bool = True, + self, + path: Path, + freq: str, + one_dim_target: bool = True, + cache: Optional[bool] = False, ) -> None: + self.cache = cache self.path = path self.process = ProcessDataEntry(freq, one_dim_target=one_dim_target) self._len = None + if not self.files(): raise OSError(f"no valid file found in {path}") + # necessary, in order to preserve the cached datasets, in case caching was enabled + self._json_line_files = [ + jsonl.JsonLinesFile(path=path, cache=cache) + for path in self.files() + ] + def __iter__(self) -> Iterator[DataEntry]: - for path in self.files(): - for line in jsonl.JsonLinesFile(path=path): + for json_line_file in self._json_line_files: + for line in json_line_file: data = self.process(line.content) data["source"] = SourceContext( source=line.span.path, row=line.span.line ) yield data - self._burnt_in = True def __len__(self): if self._len is None: @@ -254,13 +267,15 @@ def __init__( one_dim_target: bool = True, ) -> None: self.process = ProcessDataEntry(freq, one_dim_target) - self.list_data = list(data_iter) + self.list_data = list(data_iter) # dataset always cached def __iter__(self) -> Iterator[DataEntry]: source_name = "list_data" + # Basic idea is to split the dataset into roughly equally sized segments + # with lower and upper bound, where each worker is assigned one segment chunk_size = int(len(self.list_data) / util.MPWorkerInfo.num_workers) + for row_number, data in enumerate(self.list_data): - # The dataset is equally distributed among the workers lower_bound = util.MPWorkerInfo.worker_id * chunk_size upper_bound = ( (util.MPWorkerInfo.worker_id + 1) * chunk_size diff --git a/src/gluonts/dataset/jsonl.py b/src/gluonts/dataset/jsonl.py index c2a4c644db..3cf4f36bfb 100644 --- a/src/gluonts/dataset/jsonl.py +++ b/src/gluonts/dataset/jsonl.py @@ -14,7 +14,7 @@ # Standard library imports import functools from pathlib import Path -from typing import NamedTuple +from typing import NamedTuple, Optional # Third-party imports import ujson as json @@ -56,32 +56,43 @@ class JsonLinesFile: JSON Lines file. """ - def __init__(self, path) -> None: + def __init__(self, path: Path, cache: Optional[bool] = False) -> None: self.path = path + self.cache = cache self._len = None - # TODO: implement caching here + self._data_cache: list = [] def __iter__(self): - chunk_size = int(self.__len__() / MPWorkerInfo.num_workers) - with open(self.path) as jsonl_file: - for line_number, raw in enumerate(jsonl_file): - # The dataset is equally distributed among the workers - lower_bound = MPWorkerInfo.worker_id * chunk_size - upper_bound = ( - (MPWorkerInfo.worker_id + 1) * chunk_size - if MPWorkerInfo.worker_id + 1 != MPWorkerInfo.num_workers - else np.inf - ) - if not lower_bound <= line_number < upper_bound: - continue - - span = Span(path=self.path, line=line_number) - try: - yield Line(json.loads(raw), span=span) - except ValueError: - raise GluonTSDataError( - f"Could not read json line {line_number}, {raw}" + # Basic idea is to split the dataset into roughly equally sized segments + # with lower and upper bound, where each worker is assigned one segment + segment_size = int(self.__len__() / MPWorkerInfo.num_workers) + + if not self.cache or (self.cache and not self._data_cache): + with open(self.path) as jsonl_file: + for line_number, raw in enumerate(jsonl_file): + lower_bound = MPWorkerInfo.worker_id * segment_size + upper_bound = ( + (MPWorkerInfo.worker_id + 1) * segment_size + if MPWorkerInfo.worker_id + 1 + != MPWorkerInfo.num_workers + else np.inf ) + if not lower_bound <= line_number < upper_bound: + continue + + span = Span(path=self.path, line=line_number) + try: + parsed_line = Line(json.loads(raw), span=span) + if self.cache: + self._data_cache.append(parsed_line) + yield parsed_line + except ValueError: + raise GluonTSDataError( + f"Could not read json line {line_number}, {raw}" + ) + else: + for i in range(len(self._data_cache)): + yield self._data_cache[i] def __len__(self): if self._len is None: diff --git a/test/dataset/test_loader.py b/test/dataset/test_loader.py index 57ca1b7cfc..607639eb09 100644 --- a/test/dataset/test_loader.py +++ b/test/dataset/test_loader.py @@ -67,6 +67,10 @@ def load_file_dataset(path: Path, freq: str) -> Iterator[Any]: return iter(FileDataset(path, freq)) +def load_file_dataset_cached(path: Path, freq: str) -> Iterator[Any]: + return iter(FileDataset(path, freq, cache=True)) + + def load_file_dataset_numpy(path: Path, freq: str) -> Iterator[Any]: for item in FileDataset(path, freq): item["start"] = pd.Timestamp(item["start"]) @@ -86,7 +90,7 @@ def load_list_dataset(path: Path, freq: str) -> Iterator[Any]: return iter(ListDataset(lines, freq)) -@pytest.mark.skip() +# @pytest.mark.skip() def test_io_speed() -> None: exp_size = 250 act_size = 0 @@ -111,6 +115,7 @@ def test_io_speed() -> None: ("JsonLinesFile", load_json_lines_file, 20000), ("ListDataset", load_list_dataset, 500), ("FileDataset", load_file_dataset, 500), + ("FileDatasetCached", load_file_dataset_cached, 500), ("FileDatasetNumpy", load_file_dataset_numpy, 500), ("ParsedDataset", load_parsed_dataset, 500), ] From 73483b392633e2022c559b2a6815c04723921e14 Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Mon, 6 Apr 2020 19:20:46 +0200 Subject: [PATCH 03/20] Refactoring. --- src/gluonts/dataset/loader.py | 26 ++- src/gluonts/dataset/parallelized_loader.py | 259 ++++++++++++++------- 2 files changed, 187 insertions(+), 98 deletions(-) diff --git a/src/gluonts/dataset/loader.py b/src/gluonts/dataset/loader.py index 47ad2a5d5a..d4169fefd6 100644 --- a/src/gluonts/dataset/loader.py +++ b/src/gluonts/dataset/loader.py @@ -67,13 +67,14 @@ def __init__( dataset: Dataset, *, transform: Transformation, + cyclic: bool, is_train: bool, batch_size: int, ctx: mx.Context, - dtype: DType = np.float32, - cyclic: bool = False, + dtype: Optional[DType] = np.float32, num_workers: Optional[int] = None, num_prefetch: Optional[int] = None, + num_batches_for_shuffling: Optional[int] = None, **kwargs ) -> None: self.batch_size = batch_size @@ -82,6 +83,9 @@ def __init__( self.is_train = is_train self.transform = transform self.cyclic = cyclic + self.num_worker = num_workers + self.num_prefetch = num_prefetch + self.num_batches_for_shuffling = num_batches_for_shuffling self.parallel_data_loader = ParallelDataLoader( dataset=dataset, @@ -89,10 +93,11 @@ def __init__( cyclic=self.cyclic, is_train=self.is_train, batch_size=self.batch_size, - ctx=ctx, + ctx=self.ctx, dtype=self.dtype, - num_workers=num_workers, - num_prefetch=num_prefetch, + num_workers=self.num_worker, + num_prefetch=self.num_prefetch, + num_batches_for_shuffling=self.num_batches_for_shuffling, **kwargs, ) @@ -144,9 +149,9 @@ def __init__( num_batches_per_epoch: int, num_workers: Optional[int] = None, num_prefetch: Optional[int] = None, - dtype: DType = np.float32, - shuffle_for_training: bool = True, - num_batches_for_shuffling: int = 10, # TODO: this does not work currently + dtype: Optional[DType] = np.float32, + shuffle_for_training: Optional[bool] = True, + num_batches_for_shuffling: Optional[int] = 8, **kwargs ) -> None: assert dataset, "empty dataset" @@ -162,6 +167,7 @@ def __init__( cyclic=True, num_workers=num_workers, num_prefetch=num_prefetch, + num_batches_for_shuffling=num_batches_for_shuffling, **kwargs, ) @@ -189,7 +195,7 @@ def __init__( ctx: mx.Context, num_workers: Optional[int] = None, num_prefetch: Optional[int] = None, - dtype: DType = np.float32, + dtype: Optional[DType] = np.float32, **kwargs ) -> None: super().__init__( @@ -216,7 +222,7 @@ def __init__( ctx: mx.Context, num_workers: Optional[int] = None, num_prefetch: Optional[int] = None, - dtype: DType = np.float32, + dtype: Optional[DType] = np.float32, **kwargs ) -> None: super().__init__( diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index a527d914c6..4164c23f4d 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -134,7 +134,7 @@ def stack( ) -def default_batchify_fn( +def _batchify_fn( data: List[dict], dtype: DType, multi_processing: bool, @@ -184,7 +184,11 @@ class _WorkerData: # indicates which cycle the iterator has been reset last iterator_latest_reset_cycle: Optional[int] = 0 # indicates whether the iterator was previously depleted - iterator_exhausted_indicator: Optional[bool] = None + iterator_exhausted_indicator: Optional[bool] = False + # is used to cached transformed_samples in case num_batches_for_shuffling > 1 + iterator_transformed_samples: Optional[Iterable] = None + # tracks how many batches have been retrieved from the + iterator_transformed_samples_counter: Optional[int] = 0 def _worker_initializer( @@ -223,6 +227,7 @@ def _worker_fn( dtype: DType, is_train: bool, shuffle: bool, + num_batches_for_shuffling: int, cyclic: bool, cycle_num: int, ): @@ -235,20 +240,57 @@ def _worker_fn( ): _worker_reset_iterator(is_train, cyclic, cycle_num) - assert isinstance( - _WorkerData.dataset_iterator, Iterable - ), f"Dataset not Iterable: {type(_WorkerData.dataset_iterator)}." - transformed_data = list( - itertools.islice(_WorkerData.dataset_iterator, batch_size) - ) - - if shuffle: - random.shuffle(transformed_data) + # retrieve the samples that will be batched + batch_samples = None + if num_batches_for_shuffling == 1: + assert isinstance( + _WorkerData.dataset_iterator, Iterable + ), f"Dataset not Iterable: {type(_WorkerData.dataset_iterator)}." + transformed_samples = list( + itertools.islice(_WorkerData.dataset_iterator, batch_size) + ) + if shuffle: + random.shuffle(transformed_samples) + batch_samples = transformed_samples + elif num_batches_for_shuffling > 1: + # if we haven't yet retrieved batches from the current num_batches_for_shuffling*batch_size samples chunk + if _WorkerData.iterator_transformed_samples_counter == 0: + assert isinstance( + _WorkerData.dataset_iterator, Iterable + ), f"Dataset not Iterable: {type(_WorkerData.dataset_iterator)}." + transformed_samples = list( + itertools.islice( + _WorkerData.dataset_iterator, + batch_size * num_batches_for_shuffling, + ) + ) + random.shuffle(transformed_samples) + _WorkerData.iterator_transformed_samples = iter( + transformed_samples + ) + assert isinstance(_WorkerData.iterator_transformed_samples, Iterable) + batch_samples = list( + itertools.islice( + _WorkerData.iterator_transformed_samples, batch_size + ) + ) + # drive the counter, and reset to 0 if all expected batches have been retrieved + assert isinstance( + _WorkerData.iterator_transformed_samples_counter, int + ) + _WorkerData.iterator_transformed_samples_counter = ( + _WorkerData.iterator_transformed_samples_counter + 1 + ) % batch_size + else: + raise AssertionError( + f"Invalid value for num_batches_for_shuffling encountered: {num_batches_for_shuffling}." + ) - if transformed_data: + # batch the samples, if there were any + if batch_samples: success = True batch = batchify_fn( - data=transformed_data, dtype=dtype, multi_processing=True + data=batch_samples, dtype=dtype, multi_processing=True ) else: # the second time without being able to provide a batch we want to delay calling them again @@ -298,43 +340,44 @@ def __init__( num_workers: int, batch_size: int, shuffle: bool, + num_batches_for_shuffling: int, cyclic: bool, cycle_num: int, num_prefetch: int, - worker_fn: Callable = _worker_fn, - dataset_len: int = None, - timeout: int = 120, + worker_fn: Callable, + dataset_len: int, + timeout: int, ): self._worker_pool = worker_pool self._batchify_fn = batchify_fn self._data_buffer: dict = ( {} - ) # Its a dictionary with {index: data} structure in our case + ) # Its a dictionary with {request_id: data_batch} structure in our case self._rcvd_idx = 0 self._sent_idx = 0 self._worker_fn = worker_fn self._timeout = timeout - self.is_train = is_train - self.dtype = dtype - self.ctx = ctx - self.cyclic = cyclic - self.num_workers = num_workers - self.batch_size = batch_size - self.shuffle = shuffle - self.dataset_len = dataset_len - + self._is_train = is_train + self._dtype = dtype + self._ctx = ctx + self._cyclic = cyclic + self._cycle_num = cycle_num # in case of cyclic=False iterators can be exhausted self._exhausted_iterators: set = set() - - # pre-fetch - self.cycle_num = cycle_num - self.num_prefetch = num_prefetch - for i in range(self.num_prefetch): + self._num_workers = num_workers + self._batch_size = batch_size + self._shuffle = shuffle + self._num_batches_for_shuffling = num_batches_for_shuffling + self._dataset_len = dataset_len + + # pre-fetch batches + self._num_prefetch = num_prefetch + for i in range(self._num_prefetch): self._push_next() def __len__(self): - return self.dataset_len + return self._dataset_len def _push_next(self): """Assign next batch workload to workers.""" @@ -343,13 +386,14 @@ def _push_next(self): async_ret = self._worker_pool.apply_async( self._worker_fn, ( - self.batch_size, + self._batch_size, self._batchify_fn, - self.dtype, - self.is_train, - self.shuffle, - self.cyclic, - self.cycle_num, + self._dtype, + self._is_train, + self._shuffle, + self._num_batches_for_shuffling, + self._cyclic, + self._cycle_num, ), ) self._data_buffer[self._sent_idx] = async_ret @@ -362,29 +406,30 @@ def __next__(self): while not success: try: self._push_next() + if self._rcvd_idx == self._sent_idx: assert ( not self._data_buffer ), "Data buffer should be empty at this moment" raise StopIteration - assert ( self._rcvd_idx < self._sent_idx ), "rcvd_idx must be smaller than sent_idx" assert ( self._rcvd_idx in self._data_buffer ), "fatal error with _push_next, rcvd_idx missing" - ret = self._data_buffer.pop(self._rcvd_idx) + ret = self._data_buffer.pop(self._rcvd_idx) got = ret.get(self._timeout) self._rcvd_idx += 1 - success, dataset_id, batch = pickle.loads(got) + # retrieve the batch from shared memory along with metadata + success, worker_id, batch = pickle.loads(got) # If iterator exhausted/empty if not success: - self._exhausted_iterators.add(dataset_id) - if self.num_workers == len(self._exhausted_iterators): + self._exhausted_iterators.add(worker_id) + if self._num_workers == len(self._exhausted_iterators): # No more batches to be generated return [] else: @@ -392,7 +437,7 @@ def __next__(self): else: # either pin to cpu memory (with ctx=context.cpu_pinned(self.pin_device_id)), # or return with the right context straight away - return _as_in_context(batch, self.ctx) + return _as_in_context(batch, self._ctx) except multiprocessing.context.TimeoutError: print( f"Worker timed out after {self._timeout} seconds. This might be caused by " @@ -402,6 +447,7 @@ def __next__(self): ) raise except Exception: + print("An unexpected error occurred in the WorkerIterator.") self._worker_pool.terminate() raise @@ -435,6 +481,10 @@ class ParallelDataLoader(object): The dataset from which to load data. transformation A transformation to apply to each entry in the dataset. + cyclic + Whether the dataset in question should be cycled. + is_train + Whether the dataset in question is used for training. batch_size Size of mini-batch. ctx @@ -443,8 +493,9 @@ class ParallelDataLoader(object): Floating point type to use. shuffle Whether to shuffle the samples. - sampler - The sampler to use. Either specify sampler or shuffle, not both. + num_batches_for_shuffling + The number of batches among which samples are shuffled. So for example if num_batches_for_shuffling = 8 + then the next num_batches_for_shuffling * 8 samples will be shuffled and then batched. num_workers The number of multiprocessing workers to use for data preprocessing. By default 0, in which case no multiprocessing will be utilized. @@ -465,20 +516,33 @@ def __init__( cyclic: bool, is_train: bool, batch_size: int, - shuffle: bool = False, - batchify_fn: Callable = None, - ctx: mx.Context = None, - dtype: DType = np.float32, + ctx: mx.Context, + dtype: Optional[DType] = np.float32, + shuffle: Optional[bool] = False, + num_batches_for_shuffling: Optional[int] = None, num_prefetch: Optional[int] = None, num_workers: Optional[int] = None, ): # Some windows error with the ForkingPickler prevents usage currently: if sys.platform == "win32": logging.warning( - "You have set `num_workers` for to a non zero value, " - "however, currently multiprocessing is not supported on windows." + "You have set `num_workers` to a non zero value, " + "however, currently multiprocessing is not supported on windows and therefore" + "`num_workers will be set to 0." ) num_workers = 0 + assert ( + batch_size > 0 + ), "Batch size has to be a strictly positive integer." + assert ( + num_batches_for_shuffling is None or num_batches_for_shuffling >= 1 + ), "Number of batches for shuffling has to be an integer >= 1." + assert ( + num_workers is None or 0 <= num_workers + ), "Num workers has to be >= 0." + assert ( + num_prefetch is None or num_prefetch >= 0 + ), "Num workers has to be >= 0." self.dataset = dataset self.dataset_len = None @@ -487,37 +551,43 @@ def __init__( self.dataset_len = len(dataset) else: self.dataset_len = len(list(dataset)) + self.transformation = transformation # indicates that we want to cycle through the dataset self.cyclic = cyclic # indicates the current cycle, needed for resetting iterators at each cycle self.cycle_num = 0 - - self.dtype = dtype self.is_train = is_train - self.transformation = transformation - self.ctx = ctx self.batch_size = batch_size - self.shuffle = shuffle + self.ctx = ctx - assert ( - num_workers is None or num_workers <= self.dataset_len - ), "Cannot have more workers than dataset entries currently." + self.dtype = dtype + self.shuffle = shuffle + self.num_batches_for_shuffling = ( + num_batches_for_shuffling + if num_batches_for_shuffling is not None + else 1 + ) # TODO: switch to default multiprocessing.cpu_count() here default_num_workers = 0 - self.num_workers = max( - 0, + self.num_workers = ( num_workers if num_workers is not None - else min(self.dataset_len, default_num_workers), + else min( + self.dataset_len, default_num_workers + ) # cannot have more than dataset entries ) - self.num_prefetch = max( - 0, - num_prefetch if num_prefetch is not None else 2 * self.num_workers, + self.num_prefetch = ( + num_prefetch if num_prefetch is not None else 2 * self.num_workers ) + if self.num_prefetch < self.num_workers: + logging.warning( + "You have set `num_prefetch` to less than `num_workers`, which is counter productive." + "If you want to reduce load, reduce `num_workers`." + ) self.worker_pool = None - # In order to set unique IDs to workers: self.worker_manager = None + # In order to set unique IDs to workers: self.worker_id_queue = None # In order to recycle unused but pre-calculated batches from last epoch for training: self.multi_worker_cache = None @@ -543,11 +613,6 @@ def __init__( ], ) - if batchify_fn is None: - self.batchify_fn = default_batchify_fn - else: - self.batchify_fn = batchify_fn - def __iter__(self): self.cycle_num += 1 if self.num_workers == 0: @@ -558,23 +623,39 @@ def __iter__(self): def same_process_iter(): while True: # take the next batch size elements - sample_batch = list( - itertools.islice(generator, self.batch_size) - ) - - # terminate if no more batches to be dealt with - if len(sample_batch) == 0: - return - - # make them into a single batch - batch = self.batchify_fn( - data=sample_batch, - multi_processing=False, - dtype=self.dtype, - single_process_ctx=self.ctx, + transformed_samples = list( + itertools.islice( + generator, + self.batch_size * self.num_batches_for_shuffling, + ) ) - yield batch + # shuffle data if appropriate and prepare for batching + if self.shuffle: + random.shuffle(transformed_samples) + transformed_samples_iterator = iter(transformed_samples) + + # batch the samples + for i in range(self.num_batches_for_shuffling): + batch_samples = list( + itertools.islice( + transformed_samples_iterator, self.batch_size + ) + ) + + # terminate if no more batches to be dealt with + if len(batch_samples) == 0: + return + + # make them into a single batch + batch = _batchify_fn( + data=batch_samples, + multi_processing=False, + dtype=self.dtype, + single_process_ctx=self.ctx, + ) + + yield batch return same_process_iter() else: @@ -586,7 +667,8 @@ def same_process_iter(): num_workers=self.num_workers, batch_size=self.batch_size, shuffle=self.shuffle, - batchify_fn=self.batchify_fn, + num_batches_for_shuffling=self.num_batches_for_shuffling, + batchify_fn=_batchify_fn, dtype=self.dtype, ctx=self.ctx, is_train=self.is_train, @@ -595,6 +677,7 @@ def same_process_iter(): num_prefetch=self.num_prefetch, dataset_len=self.dataset_len, cycle_num=self.cycle_num, + timeout=120, ) if self.cyclic: self.multi_worker_cache = iter(multi_worker) From b2563c9fd41627508207f357515cbda41d782dc9 Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Mon, 6 Apr 2020 20:16:34 +0200 Subject: [PATCH 04/20] Added missing documentation in train loader. --- src/gluonts/dataset/loader.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/gluonts/dataset/loader.py b/src/gluonts/dataset/loader.py index d4169fefd6..f89aedd5bb 100644 --- a/src/gluonts/dataset/loader.py +++ b/src/gluonts/dataset/loader.py @@ -137,7 +137,12 @@ class TrainDataLoader(DataLoader): multiple worker processes, try reduce `num_workers` in this case. By default it defaults to `num_workers * 2`. dtype - Floating point type to use. + Floating point type to use. Default is np.float32. + shuffle_for_training + Whether to shuffle the samples. + num_batches_for_shuffling + The number of batches among which samples are shuffled. So for example if num_batches_for_shuffling = 8 + then the next num_batches_for_shuffling * 8 samples will be shuffled and then batched. """ def __init__( From 09ecc26fea1d516c72f8004ceafb525f65c8399e Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Mon, 6 Apr 2020 20:54:04 +0200 Subject: [PATCH 05/20] Added more return types. --- src/gluonts/dataset/common.py | 1 + src/gluonts/dataset/loader.py | 4 +- src/gluonts/dataset/parallelized_loader.py | 53 +++++++++++++--------- 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/src/gluonts/dataset/common.py b/src/gluonts/dataset/common.py index f2f42cb010..a956ce9738 100644 --- a/src/gluonts/dataset/common.py +++ b/src/gluonts/dataset/common.py @@ -42,6 +42,7 @@ # Dictionary used for data flowing through the transformations. DataEntry = Dict[str, Any] +DataBatch = Dict[str, Any] # TODO: change this maybe to typing_extensions.Protocol # A Dataset is an iterable of DataEntry. diff --git a/src/gluonts/dataset/loader.py b/src/gluonts/dataset/loader.py index f89aedd5bb..07312b99cd 100644 --- a/src/gluonts/dataset/loader.py +++ b/src/gluonts/dataset/loader.py @@ -22,12 +22,10 @@ # First-party imports from gluonts.core.component import DType -from gluonts.dataset.common import DataEntry, Dataset +from gluonts.dataset.common import DataEntry, Dataset, DataBatch from gluonts.dataset.parallelized_loader import ParallelDataLoader from gluonts.transform import Transformation -DataBatch = Dict[str, Any] - class DataLoader(Iterable[DataEntry]): """ diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index 4164c23f4d..6ee9d8f1d6 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -23,13 +23,14 @@ import sys import time from collections import Sized -from typing import Callable, Iterable, Optional, List +from multiprocessing.managers import SyncManager +from typing import Callable, Iterable, Optional, List, Iterator import multiprocessing import multiprocessing.queues from multiprocessing.reduction import ForkingPickler from multiprocessing.pool import Pool -from multiprocessing import Queue +from queue import Queue try: import multiprocessing.resource_sharer @@ -44,7 +45,7 @@ # First-party imports from gluonts.core.component import DType -from gluonts.dataset.common import Dataset +from gluonts.dataset.common import Dataset, DataEntry, DataBatch from gluonts.transform import Transformation from gluonts.dataset.util import MPWorkerInfo @@ -139,7 +140,7 @@ def _batchify_fn( dtype: DType, multi_processing: bool, single_process_ctx: Optional[mx.Context] = None, -): +) -> DataBatch: """reduce the list of dictionaries to a single dictionary, where values referenced by identical key are reduced using the stack function""" return { @@ -153,7 +154,7 @@ def _batchify_fn( } -def _as_in_context(batch: dict, ctx: mx.Context): +def _as_in_context(batch: dict, ctx: mx.Context) -> DataBatch: """Move data into new context, should only be in main process.""" assert ( not MPWorkerInfo.worker_process @@ -180,13 +181,13 @@ class _WorkerData: # current dataset iterator in form of a transformation applied to the dataset transformation: Optional[Transformation] = None # replicate transformation - dataset_iterator: Optional[Iterable] = None + dataset_iterator: Optional[Iterator[DataEntry]] = None # indicates which cycle the iterator has been reset last iterator_latest_reset_cycle: Optional[int] = 0 # indicates whether the iterator was previously depleted iterator_exhausted_indicator: Optional[bool] = False # is used to cached transformed_samples in case num_batches_for_shuffling > 1 - iterator_transformed_samples: Optional[Iterable] = None + iterator_transformed_samples: Optional[Iterator[DataEntry]] = None # tracks how many batches have been retrieved from the iterator_transformed_samples_counter: Optional[int] = 0 @@ -196,7 +197,7 @@ def _worker_initializer( transformation: Transformation, num_workers: int, worker_id_queue: Queue, -): +) -> None: """Initialier for processing pool.""" _WorkerData.dataset = dataset @@ -212,7 +213,9 @@ def _worker_initializer( ) -def _sequential_sample_generator(dataset, transformation, is_train, cyclic): +def _sequential_sample_generator( + dataset, transformation, is_train, cyclic +) -> Iterator[DataEntry]: while True: for sample in transformation(data_it=dataset, is_train=is_train): yield sample @@ -313,7 +316,7 @@ def _worker_fn( # needed because some iterators are not cyclic def _worker_reset_iterator( is_train: bool, cyclic: bool, cycle_num: int, -): +) -> None: """Initialize or reset iterators of workers.""" _WorkerData.dataset_iterator = _sequential_sample_generator( @@ -379,7 +382,7 @@ def __init__( def __len__(self): return self._dataset_len - def _push_next(self): + def _push_next(self) -> None: """Assign next batch workload to workers.""" # Optimally one would want to task worker that have none depleted iterators, # however, this does not seem to be possible with a worker pool @@ -399,7 +402,7 @@ def _push_next(self): self._data_buffer[self._sent_idx] = async_ret self._sent_idx += 1 - def __next__(self): + def __next__(self) -> DataBatch: # Try to get a batch, sometimes its possible that an iterator was # exhausted and thus we don't get a new batch success = False @@ -431,7 +434,7 @@ def __next__(self): self._exhausted_iterators.add(worker_id) if self._num_workers == len(self._exhausted_iterators): # No more batches to be generated - return [] + return {} else: self._push_next() else: @@ -450,11 +453,12 @@ def __next__(self): print("An unexpected error occurred in the WorkerIterator.") self._worker_pool.terminate() raise + return {} - def __iter__(self): + def __iter__(self) -> Iterator[DataBatch]: while True: next_batch = next(self) - if len(next_batch) == 0: + if not next_batch: return yield next_batch @@ -545,7 +549,7 @@ def __init__( ), "Num workers has to be >= 0." self.dataset = dataset - self.dataset_len = None + self.dataset_len: int if isinstance(dataset, Sized): assert isinstance(dataset, Sized) self.dataset_len = len(dataset) @@ -585,12 +589,12 @@ def __init__( "You have set `num_prefetch` to less than `num_workers`, which is counter productive." "If you want to reduce load, reduce `num_workers`." ) - self.worker_pool = None - self.worker_manager = None + self.worker_pool: Optional[Pool] = None + self.worker_manager: Optional[SyncManager] = None # In order to set unique IDs to workers: - self.worker_id_queue = None + self.worker_id_queue: Optional[Queue] = None # In order to recycle unused but pre-calculated batches from last epoch for training: - self.multi_worker_cache = None + self.multi_worker_cache: Optional[Iterator[DataBatch]] = None if self.num_workers > 0: # generate unique ids for processes @@ -613,7 +617,7 @@ def __init__( ], ) - def __iter__(self): + def __iter__(self) -> Iterator[DataBatch]: self.cycle_num += 1 if self.num_workers == 0: generator = _sequential_sample_generator( @@ -659,6 +663,11 @@ def same_process_iter(): return same_process_iter() else: + # assertions due to Mypy + assert isinstance(self.worker_pool, Pool) + assert isinstance(self.shuffle, bool) + assert isinstance(self.dtype, DType) + # multi-worker takes care of asynchronously preparing batches # only cache multi_worker for cyclic datasets if self.multi_worker_cache is None: @@ -687,7 +696,7 @@ def same_process_iter(): # (cycle num is irrelevant for cyclic datasets, and rest of the arguments stays same between epochs) return self.multi_worker_cache - def __len__(self): + def __len__(self) -> int: return self.dataset_len def __del__(self): From 6e71dc6c1200c1023782b85bb8fcfa401a4802a9 Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Mon, 6 Apr 2020 20:59:21 +0200 Subject: [PATCH 06/20] mend --- src/gluonts/dataset/parallelized_loader.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index 6ee9d8f1d6..dc908804aa 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -521,8 +521,8 @@ def __init__( is_train: bool, batch_size: int, ctx: mx.Context, - dtype: Optional[DType] = np.float32, - shuffle: Optional[bool] = False, + dtype: Optional[DType] = None, + shuffle: Optional[bool] = None, num_batches_for_shuffling: Optional[int] = None, num_prefetch: Optional[int] = None, num_workers: Optional[int] = None, @@ -564,8 +564,8 @@ def __init__( self.batch_size = batch_size self.ctx = ctx - self.dtype = dtype - self.shuffle = shuffle + self.dtype = dtype if dtype is not None else np.float32 + self.shuffle = shuffle if shuffle is not None else False self.num_batches_for_shuffling = ( num_batches_for_shuffling if num_batches_for_shuffling is not None @@ -663,10 +663,8 @@ def same_process_iter(): return same_process_iter() else: - # assertions due to Mypy + # to prevent Mypy complaints assert isinstance(self.worker_pool, Pool) - assert isinstance(self.shuffle, bool) - assert isinstance(self.dtype, DType) # multi-worker takes care of asynchronously preparing batches # only cache multi_worker for cyclic datasets From f98d8a826e6ad9afbabb295ef184dd89b7720bf8 Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Tue, 7 Apr 2020 19:14:01 +0200 Subject: [PATCH 07/20] Fixed bug regarding num_batches_for_sampling. --- src/gluonts/dataset/jsonl.py | 2 +- src/gluonts/dataset/parallelized_loader.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/gluonts/dataset/jsonl.py b/src/gluonts/dataset/jsonl.py index 3cf4f36bfb..73eb7b1f0c 100644 --- a/src/gluonts/dataset/jsonl.py +++ b/src/gluonts/dataset/jsonl.py @@ -65,7 +65,7 @@ def __init__(self, path: Path, cache: Optional[bool] = False) -> None: def __iter__(self): # Basic idea is to split the dataset into roughly equally sized segments # with lower and upper bound, where each worker is assigned one segment - segment_size = int(self.__len__() / MPWorkerInfo.num_workers) + segment_size = int(len(self) / MPWorkerInfo.num_workers) if not self.cache or (self.cache and not self._data_cache): with open(self.path) as jsonl_file: diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index dc908804aa..435d1835b9 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -283,7 +283,7 @@ def _worker_fn( ) _WorkerData.iterator_transformed_samples_counter = ( _WorkerData.iterator_transformed_samples_counter + 1 - ) % batch_size + ) % num_batches_for_shuffling else: raise AssertionError( f"Invalid value for num_batches_for_shuffling encountered: {num_batches_for_shuffling}." @@ -474,7 +474,6 @@ def __del__(self): pass -# TODO: think about how a multiprocessing.Manager() would complement this implementation class ParallelDataLoader(object): """ Loads data from a dataset and returns mini-batches of data. From 50bffcd323f468587d9cf04d05303bde3fd9e7d2 Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Tue, 7 Apr 2020 20:20:11 +0200 Subject: [PATCH 08/20] Reverting back to modulo based segmentation for code readability. --- src/gluonts/dataset/common.py | 17 +++++------------ src/gluonts/dataset/jsonl.py | 17 +++++------------ 2 files changed, 10 insertions(+), 24 deletions(-) diff --git a/src/gluonts/dataset/common.py b/src/gluonts/dataset/common.py index a956ce9738..ef7b7bc63e 100644 --- a/src/gluonts/dataset/common.py +++ b/src/gluonts/dataset/common.py @@ -272,19 +272,12 @@ def __init__( def __iter__(self) -> Iterator[DataEntry]: source_name = "list_data" - # Basic idea is to split the dataset into roughly equally sized segments - # with lower and upper bound, where each worker is assigned one segment - chunk_size = int(len(self.list_data) / util.MPWorkerInfo.num_workers) - for row_number, data in enumerate(self.list_data): - lower_bound = util.MPWorkerInfo.worker_id * chunk_size - upper_bound = ( - (util.MPWorkerInfo.worker_id + 1) * chunk_size - if util.MPWorkerInfo.worker_id + 1 - != util.MPWorkerInfo.num_workers - else np.inf - ) - if not lower_bound <= row_number < upper_bound: + # Split the dataset into roughly equally sized segments + if ( + row_number % util.MPWorkerInfo.num_workers + != util.MPWorkerInfo.worker_id + ): continue data = self.process(data) diff --git a/src/gluonts/dataset/jsonl.py b/src/gluonts/dataset/jsonl.py index 73eb7b1f0c..00a7b76f9c 100644 --- a/src/gluonts/dataset/jsonl.py +++ b/src/gluonts/dataset/jsonl.py @@ -63,21 +63,14 @@ def __init__(self, path: Path, cache: Optional[bool] = False) -> None: self._data_cache: list = [] def __iter__(self): - # Basic idea is to split the dataset into roughly equally sized segments - # with lower and upper bound, where each worker is assigned one segment - segment_size = int(len(self) / MPWorkerInfo.num_workers) - if not self.cache or (self.cache and not self._data_cache): with open(self.path) as jsonl_file: for line_number, raw in enumerate(jsonl_file): - lower_bound = MPWorkerInfo.worker_id * segment_size - upper_bound = ( - (MPWorkerInfo.worker_id + 1) * segment_size - if MPWorkerInfo.worker_id + 1 - != MPWorkerInfo.num_workers - else np.inf - ) - if not lower_bound <= line_number < upper_bound: + # Split the dataset into roughly equally sized segments + if ( + line_number % MPWorkerInfo.num_workers + != MPWorkerInfo.worker_id + ): continue span = Span(path=self.path, line=line_number) From 4c38ef989c82eeabce5fbe7a5ffbcfa882b47936 Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Tue, 7 Apr 2020 22:01:54 +0200 Subject: [PATCH 09/20] Massively simplified worker_fn due to simplified logic of num_batches_for_schuffling --- src/gluonts/dataset/common.py | 17 ++- src/gluonts/dataset/jsonl.py | 17 ++- src/gluonts/dataset/parallelized_loader.py | 145 +++++++++----------- test/dataset/test_multiprocessing_loader.py | 3 + 4 files changed, 90 insertions(+), 92 deletions(-) diff --git a/src/gluonts/dataset/common.py b/src/gluonts/dataset/common.py index ef7b7bc63e..66bc52633f 100644 --- a/src/gluonts/dataset/common.py +++ b/src/gluonts/dataset/common.py @@ -272,12 +272,19 @@ def __init__( def __iter__(self) -> Iterator[DataEntry]: source_name = "list_data" + # Basic idea is to split the dataset into roughly equally sized segments + # with lower and upper bound, where each worker is assigned one segment + segment_size = int(len(self) / util.MPWorkerInfo.num_workers) + for row_number, data in enumerate(self.list_data): - # Split the dataset into roughly equally sized segments - if ( - row_number % util.MPWorkerInfo.num_workers - != util.MPWorkerInfo.worker_id - ): + lower_bound = util.MPWorkerInfo.worker_id * segment_size + upper_bound = ( + (util.MPWorkerInfo.worker_id + 1) * segment_size + if util.MPWorkerInfo.worker_id + 1 + != util.MPWorkerInfo.num_workers + else np.inf + ) + if not lower_bound <= row_number < upper_bound: continue data = self.process(data) diff --git a/src/gluonts/dataset/jsonl.py b/src/gluonts/dataset/jsonl.py index 00a7b76f9c..73eb7b1f0c 100644 --- a/src/gluonts/dataset/jsonl.py +++ b/src/gluonts/dataset/jsonl.py @@ -63,14 +63,21 @@ def __init__(self, path: Path, cache: Optional[bool] = False) -> None: self._data_cache: list = [] def __iter__(self): + # Basic idea is to split the dataset into roughly equally sized segments + # with lower and upper bound, where each worker is assigned one segment + segment_size = int(len(self) / MPWorkerInfo.num_workers) + if not self.cache or (self.cache and not self._data_cache): with open(self.path) as jsonl_file: for line_number, raw in enumerate(jsonl_file): - # Split the dataset into roughly equally sized segments - if ( - line_number % MPWorkerInfo.num_workers - != MPWorkerInfo.worker_id - ): + lower_bound = MPWorkerInfo.worker_id * segment_size + upper_bound = ( + (MPWorkerInfo.worker_id + 1) * segment_size + if MPWorkerInfo.worker_id + 1 + != MPWorkerInfo.num_workers + else np.inf + ) + if not lower_bound <= line_number < upper_bound: continue span = Span(path=self.path, line=line_number) diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index 435d1835b9..b9316746ed 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -186,10 +186,6 @@ class _WorkerData: iterator_latest_reset_cycle: Optional[int] = 0 # indicates whether the iterator was previously depleted iterator_exhausted_indicator: Optional[bool] = False - # is used to cached transformed_samples in case num_batches_for_shuffling > 1 - iterator_transformed_samples: Optional[Iterator[DataEntry]] = None - # tracks how many batches have been retrieved from the - iterator_transformed_samples_counter: Optional[int] = 0 def _worker_initializer( @@ -214,10 +210,33 @@ def _worker_initializer( def _sequential_sample_generator( - dataset, transformation, is_train, cyclic + dataset: Dataset, + transformation: Transformation, + is_train: bool, + cyclic: bool, + num_batches_for_shuffling: int, ) -> Iterator[DataEntry]: + # Approximate shuffling `num_batches_for_shuffling` probabilistically + def skip_data_iter(data: Dataset): + for data_entry in data: + if ( + random.randint(1, num_batches_for_shuffling) + == num_batches_for_shuffling + ): + yield data_entry + + # sanity check + assert ( + cyclic or num_batches_for_shuffling == 1 + ), "num_batches_for_shuffling only makes sense in the context of cyclic datasets" + while True: - for sample in transformation(data_it=dataset, is_train=is_train): + for sample in transformation( + data_it=skip_data_iter(dataset) + if num_batches_for_shuffling > 1 + else dataset, + is_train=is_train, + ): yield sample # Dont cycle if not training time if not cyclic: @@ -241,53 +260,19 @@ def _worker_fn( if (_WorkerData.iterator_latest_reset_cycle < cycle_num) and ( _WorkerData.iterator_latest_reset_cycle == 0 or not cyclic ): - _worker_reset_iterator(is_train, cyclic, cycle_num) + _worker_reset_iterator( + is_train, cyclic, cycle_num, num_batches_for_shuffling + ) # retrieve the samples that will be batched - batch_samples = None - if num_batches_for_shuffling == 1: - assert isinstance( - _WorkerData.dataset_iterator, Iterable - ), f"Dataset not Iterable: {type(_WorkerData.dataset_iterator)}." - transformed_samples = list( - itertools.islice(_WorkerData.dataset_iterator, batch_size) - ) - if shuffle: - random.shuffle(transformed_samples) - batch_samples = transformed_samples - elif num_batches_for_shuffling > 1: - # if we haven't yet retrieved batches from the current num_batches_for_shuffling*batch_size samples chunk - if _WorkerData.iterator_transformed_samples_counter == 0: - assert isinstance( - _WorkerData.dataset_iterator, Iterable - ), f"Dataset not Iterable: {type(_WorkerData.dataset_iterator)}." - transformed_samples = list( - itertools.islice( - _WorkerData.dataset_iterator, - batch_size * num_batches_for_shuffling, - ) - ) - random.shuffle(transformed_samples) - _WorkerData.iterator_transformed_samples = iter( - transformed_samples - ) - assert isinstance(_WorkerData.iterator_transformed_samples, Iterable) - batch_samples = list( - itertools.islice( - _WorkerData.iterator_transformed_samples, batch_size - ) - ) - # drive the counter, and reset to 0 if all expected batches have been retrieved - assert isinstance( - _WorkerData.iterator_transformed_samples_counter, int - ) - _WorkerData.iterator_transformed_samples_counter = ( - _WorkerData.iterator_transformed_samples_counter + 1 - ) % num_batches_for_shuffling - else: - raise AssertionError( - f"Invalid value for num_batches_for_shuffling encountered: {num_batches_for_shuffling}." - ) + assert isinstance( + _WorkerData.dataset_iterator, Iterable + ), f"Dataset not Iterable: {type(_WorkerData.dataset_iterator)}." + batch_samples = list( + itertools.islice(_WorkerData.dataset_iterator, batch_size) + ) + if shuffle: + random.shuffle(batch_samples) # batch the samples, if there were any if batch_samples: @@ -315,7 +300,10 @@ def _worker_fn( # needed because some iterators are not cyclic def _worker_reset_iterator( - is_train: bool, cyclic: bool, cycle_num: int, + is_train: bool, + cyclic: bool, + cycle_num: int, + num_batches_for_shuffling: int, ) -> None: """Initialize or reset iterators of workers.""" @@ -324,6 +312,7 @@ def _worker_reset_iterator( transformation=_WorkerData.transformation, is_train=is_train, cyclic=cyclic, + num_batches_for_shuffling=num_batches_for_shuffling, ) assert isinstance(_WorkerData.iterator_latest_reset_cycle, int) _WorkerData.iterator_latest_reset_cycle = cycle_num @@ -620,45 +609,37 @@ def __iter__(self) -> Iterator[DataBatch]: self.cycle_num += 1 if self.num_workers == 0: generator = _sequential_sample_generator( - self.dataset, self.transformation, self.is_train, self.cyclic + self.dataset, + self.transformation, + self.is_train, + self.cyclic, + self.num_batches_for_shuffling, ) def same_process_iter(): while True: # take the next batch size elements - transformed_samples = list( - itertools.islice( - generator, - self.batch_size * self.num_batches_for_shuffling, - ) + batch_samples = list( + itertools.islice(generator, self.batch_size,) ) # shuffle data if appropriate and prepare for batching if self.shuffle: - random.shuffle(transformed_samples) - transformed_samples_iterator = iter(transformed_samples) - - # batch the samples - for i in range(self.num_batches_for_shuffling): - batch_samples = list( - itertools.islice( - transformed_samples_iterator, self.batch_size - ) - ) - - # terminate if no more batches to be dealt with - if len(batch_samples) == 0: - return - - # make them into a single batch - batch = _batchify_fn( - data=batch_samples, - multi_processing=False, - dtype=self.dtype, - single_process_ctx=self.ctx, - ) - - yield batch + random.shuffle(batch_samples) + + # terminate if no more batches to be dealt with + if len(batch_samples) == 0: + return + + # make them into a single batch + batch = _batchify_fn( + data=batch_samples, + multi_processing=False, + dtype=self.dtype, + single_process_ctx=self.ctx, + ) + + yield batch return same_process_iter() else: diff --git a/test/dataset/test_multiprocessing_loader.py b/test/dataset/test_multiprocessing_loader.py index d4f9894bf7..ce1725fd68 100644 --- a/test/dataset/test_multiprocessing_loader.py +++ b/test/dataset/test_multiprocessing_loader.py @@ -271,6 +271,7 @@ def test_training_loader_soft_constraint_01() -> None: num_workers=NUM_WORKERS_MP, # This is the crucial difference ctx=current_context(), num_batches_per_epoch=int(3 * exp_num_batches), + num_batches_for_shuffling=1, ) # give all the workers a little time to get ready, so they can start at the same time @@ -312,6 +313,7 @@ def test_training_loader_soft_constraint_02() -> None: num_workers=NUM_WORKERS_MP, # This is the crucial difference ctx=current_context(), num_batches_per_epoch=int(0.5 * exp_num_batches), + num_batches_for_shuffling=1, ) # multi-processed validation dataset @@ -348,6 +350,7 @@ def test_training_loader_soft_constraint_03() -> None: num_workers=1, # This is the crucial difference ctx=current_context(), num_batches_per_epoch=int(3 * exp_num_batches), + num_batches_for_shuffling=1, ) # multi-processed validation dataset From d75202eb1a95e81d769554c27ef574b03bf4b679 Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Tue, 7 Apr 2020 22:06:03 +0200 Subject: [PATCH 10/20] mend --- src/gluonts/dataset/parallelized_loader.py | 120 ++++++++++----------- 1 file changed, 58 insertions(+), 62 deletions(-) diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index b9316746ed..e943c70508 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -172,43 +172,6 @@ def _as_in_context(batch: dict, ctx: mx.Context) -> DataBatch: return batch -# Each process has its own copy, so other processes can't interfere -class _WorkerData: - """Contain the current data that the worker is using.""" - - # dataset replica - dataset: Optional[Dataset] = None - # current dataset iterator in form of a transformation applied to the dataset - transformation: Optional[Transformation] = None - # replicate transformation - dataset_iterator: Optional[Iterator[DataEntry]] = None - # indicates which cycle the iterator has been reset last - iterator_latest_reset_cycle: Optional[int] = 0 - # indicates whether the iterator was previously depleted - iterator_exhausted_indicator: Optional[bool] = False - - -def _worker_initializer( - dataset: Dataset, - transformation: Transformation, - num_workers: int, - worker_id_queue: Queue, -) -> None: - """Initialier for processing pool.""" - - _WorkerData.dataset = dataset - _WorkerData.transformation = transformation - - # get unique worker id - worker_id = int(worker_id_queue.get()) - multiprocessing.current_process().name = f"worker_{worker_id}" - - # propagate worker information - MPWorkerInfo.set_worker_info( - num_workers=num_workers, worker_id=worker_id, worker_process=True - ) - - def _sequential_sample_generator( dataset: Dataset, transformation: Transformation, @@ -243,6 +206,64 @@ def skip_data_iter(data: Dataset): return +# Each process has its own copy, so other processes can't interfere +class _WorkerData: + """Contain the current data that the worker is using.""" + + # dataset replica + dataset: Dataset + # current dataset iterator in form of a transformation applied to the dataset + transformation: Transformation + # replicate transformation + dataset_iterator: Iterator[DataEntry] + # indicates which cycle the iterator has been reset last + iterator_latest_reset_cycle: int = 0 + # indicates whether the iterator was previously depleted + iterator_exhausted_indicator: bool = False + + +# needed because some iterators are not cyclic +def _worker_reset_iterator( + is_train: bool, + cyclic: bool, + cycle_num: int, + num_batches_for_shuffling: int, +) -> None: + """Initialize or reset iterators of workers.""" + + _WorkerData.dataset_iterator = _sequential_sample_generator( + dataset=_WorkerData.dataset, + transformation=_WorkerData.transformation, + is_train=is_train, + cyclic=cyclic, + num_batches_for_shuffling=num_batches_for_shuffling, + ) + + _WorkerData.iterator_latest_reset_cycle = cycle_num + _WorkerData.iterator_exhausted_indicator = False + + +def _worker_initializer( + dataset: Dataset, + transformation: Transformation, + num_workers: int, + worker_id_queue: Queue, +) -> None: + """Initialier for processing pool.""" + + _WorkerData.dataset = dataset + _WorkerData.transformation = transformation + + # get unique worker id + worker_id = int(worker_id_queue.get()) + multiprocessing.current_process().name = f"worker_{worker_id}" + + # propagate worker information + MPWorkerInfo.set_worker_info( + num_workers=num_workers, worker_id=worker_id, worker_process=True + ) + + def _worker_fn( batch_size: int, batchify_fn: Callable, @@ -256,7 +277,6 @@ def _worker_fn( """Function for processing data in worker process.""" # initialize, or reset the iterator at each cycle - assert isinstance(_WorkerData.iterator_latest_reset_cycle, int) if (_WorkerData.iterator_latest_reset_cycle < cycle_num) and ( _WorkerData.iterator_latest_reset_cycle == 0 or not cyclic ): @@ -265,9 +285,6 @@ def _worker_fn( ) # retrieve the samples that will be batched - assert isinstance( - _WorkerData.dataset_iterator, Iterable - ), f"Dataset not Iterable: {type(_WorkerData.dataset_iterator)}." batch_samples = list( itertools.islice(_WorkerData.dataset_iterator, batch_size) ) @@ -298,27 +315,6 @@ def _worker_fn( return buf.getvalue() -# needed because some iterators are not cyclic -def _worker_reset_iterator( - is_train: bool, - cyclic: bool, - cycle_num: int, - num_batches_for_shuffling: int, -) -> None: - """Initialize or reset iterators of workers.""" - - _WorkerData.dataset_iterator = _sequential_sample_generator( - dataset=_WorkerData.dataset, - transformation=_WorkerData.transformation, - is_train=is_train, - cyclic=cyclic, - num_batches_for_shuffling=num_batches_for_shuffling, - ) - assert isinstance(_WorkerData.iterator_latest_reset_cycle, int) - _WorkerData.iterator_latest_reset_cycle = cycle_num - _WorkerData.iterator_exhausted_indicator = False - - class _MultiWorkerIter(object): """Internal multi-worker iterator for DataLoader.""" From e50813b54f047dcadb710b4b992a9fb47a85f55a Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Tue, 7 Apr 2020 22:49:06 +0200 Subject: [PATCH 11/20] User warning in case of mp but not caching. --- src/gluonts/dataset/parallelized_loader.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index e943c70508..ab4fe56aff 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -45,7 +45,7 @@ # First-party imports from gluonts.core.component import DType -from gluonts.dataset.common import Dataset, DataEntry, DataBatch +from gluonts.dataset.common import Dataset, DataEntry, DataBatch, FileDataset from gluonts.transform import Transformation from gluonts.dataset.util import MPWorkerInfo @@ -519,6 +519,15 @@ def __init__( "`num_workers will be set to 0." ) num_workers = 0 + # Make the user aware of ways to improve training performance: + if num_workers is not None and num_workers > 0: + if isinstance(dataset, FileDataset): + if not dataset.cache: + logging.warning( + "You have set `num_workers` to a non zero value, " + "however, you have not enabled caching for your FileDataset. " + "To improve training performance you can enable caching for the FileDataset. " + ) assert ( batch_size > 0 ), "Batch size has to be a strictly positive integer." From 9ba3b3ae3e218b622f2b42e9dad99cd22f917b6a Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Thu, 16 Apr 2020 10:46:19 +0200 Subject: [PATCH 12/20] Minor refactoring. --- src/gluonts/dataset/common.py | 2 +- src/gluonts/dataset/loader.py | 12 ++++++------ src/gluonts/dataset/parallelized_loader.py | 13 +++++++------ 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/gluonts/dataset/common.py b/src/gluonts/dataset/common.py index 66bc52633f..4396eaf0e2 100644 --- a/src/gluonts/dataset/common.py +++ b/src/gluonts/dataset/common.py @@ -195,7 +195,7 @@ def __init__( path: Path, freq: str, one_dim_target: bool = True, - cache: Optional[bool] = False, + cache: bool = False, ) -> None: self.cache = cache self.path = path diff --git a/src/gluonts/dataset/loader.py b/src/gluonts/dataset/loader.py index 07312b99cd..8022740648 100644 --- a/src/gluonts/dataset/loader.py +++ b/src/gluonts/dataset/loader.py @@ -69,7 +69,7 @@ def __init__( is_train: bool, batch_size: int, ctx: mx.Context, - dtype: Optional[DType] = np.float32, + dtype: DType = np.float32, num_workers: Optional[int] = None, num_prefetch: Optional[int] = None, num_batches_for_shuffling: Optional[int] = None, @@ -152,9 +152,9 @@ def __init__( num_batches_per_epoch: int, num_workers: Optional[int] = None, num_prefetch: Optional[int] = None, - dtype: Optional[DType] = np.float32, - shuffle_for_training: Optional[bool] = True, - num_batches_for_shuffling: Optional[int] = 8, + dtype: DType = np.float32, + shuffle_for_training: bool = True, + num_batches_for_shuffling: int = 8, **kwargs ) -> None: assert dataset, "empty dataset" @@ -198,7 +198,7 @@ def __init__( ctx: mx.Context, num_workers: Optional[int] = None, num_prefetch: Optional[int] = None, - dtype: Optional[DType] = np.float32, + dtype: DType = np.float32, **kwargs ) -> None: super().__init__( @@ -225,7 +225,7 @@ def __init__( ctx: mx.Context, num_workers: Optional[int] = None, num_prefetch: Optional[int] = None, - dtype: Optional[DType] = np.float32, + dtype: DType = np.float32, **kwargs ) -> None: super().__init__( diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index ab4fe56aff..3065aef025 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -188,10 +188,11 @@ def skip_data_iter(data: Dataset): ): yield data_entry - # sanity check + # sanity check: since `num_batches_for_shuffling` works by skipping + # entries, this should not be used for non cyclic datasets assert ( cyclic or num_batches_for_shuffling == 1 - ), "num_batches_for_shuffling only makes sense in the context of cyclic datasets" + ), "Setting num_batches_for_shuffling >= 1 only makes sense in the context of cyclic datasets currently." while True: for sample in transformation( @@ -505,8 +506,8 @@ def __init__( is_train: bool, batch_size: int, ctx: mx.Context, - dtype: Optional[DType] = None, - shuffle: Optional[bool] = None, + dtype: DType = np.float32, + shuffle: bool = False, num_batches_for_shuffling: Optional[int] = None, num_prefetch: Optional[int] = None, num_workers: Optional[int] = None, @@ -557,8 +558,8 @@ def __init__( self.batch_size = batch_size self.ctx = ctx - self.dtype = dtype if dtype is not None else np.float32 - self.shuffle = shuffle if shuffle is not None else False + self.dtype = dtype + self.shuffle = shuffle self.num_batches_for_shuffling = ( num_batches_for_shuffling if num_batches_for_shuffling is not None From f717eebf103f5948f160b6d6007735c2e81e1b41 Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Thu, 16 Apr 2020 15:42:33 +0200 Subject: [PATCH 13/20] Smaller reformatting. --- src/gluonts/dataset/jsonl.py | 4 ++-- src/gluonts/dataset/loader.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/gluonts/dataset/jsonl.py b/src/gluonts/dataset/jsonl.py index 73eb7b1f0c..da931f5da6 100644 --- a/src/gluonts/dataset/jsonl.py +++ b/src/gluonts/dataset/jsonl.py @@ -14,7 +14,7 @@ # Standard library imports import functools from pathlib import Path -from typing import NamedTuple, Optional +from typing import NamedTuple # Third-party imports import ujson as json @@ -56,7 +56,7 @@ class JsonLinesFile: JSON Lines file. """ - def __init__(self, path: Path, cache: Optional[bool] = False) -> None: + def __init__(self, path: Path, cache: bool = False) -> None: self.path = path self.cache = cache self._len = None diff --git a/src/gluonts/dataset/loader.py b/src/gluonts/dataset/loader.py index 8022740648..b96fe3fd0f 100644 --- a/src/gluonts/dataset/loader.py +++ b/src/gluonts/dataset/loader.py @@ -81,7 +81,7 @@ def __init__( self.is_train = is_train self.transform = transform self.cyclic = cyclic - self.num_worker = num_workers + self.num_workers = num_workers self.num_prefetch = num_prefetch self.num_batches_for_shuffling = num_batches_for_shuffling @@ -93,7 +93,7 @@ def __init__( batch_size=self.batch_size, ctx=self.ctx, dtype=self.dtype, - num_workers=self.num_worker, + num_workers=self.num_workers, num_prefetch=self.num_prefetch, num_batches_for_shuffling=self.num_batches_for_shuffling, **kwargs, From f5839a897273893d7e7c490e3b04d5a6e53d6cbd Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Thu, 16 Apr 2020 15:58:27 +0200 Subject: [PATCH 14/20] Updated doc. --- src/gluonts/dataset/loader.py | 4 ++-- src/gluonts/dataset/parallelized_loader.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/gluonts/dataset/loader.py b/src/gluonts/dataset/loader.py index b96fe3fd0f..0b749bdcc5 100644 --- a/src/gluonts/dataset/loader.py +++ b/src/gluonts/dataset/loader.py @@ -139,8 +139,8 @@ class TrainDataLoader(DataLoader): shuffle_for_training Whether to shuffle the samples. num_batches_for_shuffling - The number of batches among which samples are shuffled. So for example if num_batches_for_shuffling = 8 - then the next num_batches_for_shuffling * 8 samples will be shuffled and then batched. + The effective number of batches among which samples are shuffled. If num_batches_for_shuffling = 8 and + batch_size = 8 then the next batch will be randomly sampled from about 64 samples. """ def __init__( diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index 7e732eb7a0..fb45f2cbb6 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -479,8 +479,8 @@ class ParallelDataLoader(object): shuffle Whether to shuffle the samples. num_batches_for_shuffling - The number of batches among which samples are shuffled. So for example if num_batches_for_shuffling = 8 - then the next num_batches_for_shuffling * 8 samples will be shuffled and then batched. + The effective number of batches among which samples are shuffled. If num_batches_for_shuffling = 8 and + batch_size = 8 then the next batch will be randomly sampled from about 64 samples. num_workers The number of multiprocessing workers to use for data preprocessing. By default 0, in which case no multiprocessing will be utilized. From 957113d84b8307d4b80ac961256d22b73d3d025a Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Thu, 16 Apr 2020 17:31:05 +0200 Subject: [PATCH 15/20] Simplified segmenting, test fix. --- src/gluonts/dataset/common.py | 7 ++----- src/gluonts/dataset/jsonl.py | 9 ++++----- test/dataset/test_loader.py | 5 +++-- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/gluonts/dataset/common.py b/src/gluonts/dataset/common.py index 4396eaf0e2..19a6c855d9 100644 --- a/src/gluonts/dataset/common.py +++ b/src/gluonts/dataset/common.py @@ -278,11 +278,8 @@ def __iter__(self) -> Iterator[DataEntry]: for row_number, data in enumerate(self.list_data): lower_bound = util.MPWorkerInfo.worker_id * segment_size - upper_bound = ( - (util.MPWorkerInfo.worker_id + 1) * segment_size - if util.MPWorkerInfo.worker_id + 1 - != util.MPWorkerInfo.num_workers - else np.inf + upper_bound = min( + len(self), (util.MPWorkerInfo.worker_id + 1) * segment_size ) if not lower_bound <= row_number < upper_bound: continue diff --git a/src/gluonts/dataset/jsonl.py b/src/gluonts/dataset/jsonl.py index da931f5da6..45a78693ee 100644 --- a/src/gluonts/dataset/jsonl.py +++ b/src/gluonts/dataset/jsonl.py @@ -67,15 +67,14 @@ def __iter__(self): # with lower and upper bound, where each worker is assigned one segment segment_size = int(len(self) / MPWorkerInfo.num_workers) + print("dataset_len:, ", len(self)) + if not self.cache or (self.cache and not self._data_cache): with open(self.path) as jsonl_file: for line_number, raw in enumerate(jsonl_file): lower_bound = MPWorkerInfo.worker_id * segment_size - upper_bound = ( - (MPWorkerInfo.worker_id + 1) * segment_size - if MPWorkerInfo.worker_id + 1 - != MPWorkerInfo.num_workers - else np.inf + upper_bound = min( + len(self), (MPWorkerInfo.worker_id + 1) * segment_size ) if not lower_bound <= line_number < upper_bound: continue diff --git a/test/dataset/test_loader.py b/test/dataset/test_loader.py index 607639eb09..a34cf146a7 100644 --- a/test/dataset/test_loader.py +++ b/test/dataset/test_loader.py @@ -160,8 +160,9 @@ def test_loader_multivariate() -> None: tmp_path = Path(tmp_folder) lines = [ - """{"start": "2014-09-07", "target": [[1, 2, 3]]}""", - """{"start": "2014-09-07", "target": [[-1, -2, 3], [2, 4, 81]]}""", + """{"start": "2014-09-07", "target": [[1, 2, 3]]} + {"start": "2014-09-07", "target": [[-1, -2, 3], [2, 4, 81]]} + """, ] with open(tmp_path / "dataset.json", "w") as f: f.write("\n".join(lines)) From 23d09db035df4bdca414693c2b96963b9b9d2d0d Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Thu, 16 Apr 2020 17:41:16 +0200 Subject: [PATCH 16/20] Yield from improvement. --- src/gluonts/dataset/jsonl.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/gluonts/dataset/jsonl.py b/src/gluonts/dataset/jsonl.py index 45a78693ee..e60219132a 100644 --- a/src/gluonts/dataset/jsonl.py +++ b/src/gluonts/dataset/jsonl.py @@ -90,8 +90,7 @@ def __iter__(self): f"Could not read json line {line_number}, {raw}" ) else: - for i in range(len(self._data_cache)): - yield self._data_cache[i] + yield from self._data_cache def __len__(self): if self._len is None: From 33ef3e4cbcc613df4282ad0a5bcbbf5ee0929125 Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Thu, 16 Apr 2020 19:04:48 +0200 Subject: [PATCH 17/20] Dataset Coverage Test Explicit. --- src/gluonts/dataset/common.py | 7 +++++-- src/gluonts/dataset/jsonl.py | 7 +++++-- test/dataset/test_common.py | 2 +- test/dataset/test_multiprocessing_loader.py | 9 ++++++++- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/gluonts/dataset/common.py b/src/gluonts/dataset/common.py index 19a6c855d9..e86452d38e 100644 --- a/src/gluonts/dataset/common.py +++ b/src/gluonts/dataset/common.py @@ -278,8 +278,11 @@ def __iter__(self) -> Iterator[DataEntry]: for row_number, data in enumerate(self.list_data): lower_bound = util.MPWorkerInfo.worker_id * segment_size - upper_bound = min( - len(self), (util.MPWorkerInfo.worker_id + 1) * segment_size + upper_bound = ( + (util.MPWorkerInfo.worker_id + 1) * segment_size + if util.MPWorkerInfo.worker_id + 1 + != util.MPWorkerInfo.num_workers + else len(self) ) if not lower_bound <= row_number < upper_bound: continue diff --git a/src/gluonts/dataset/jsonl.py b/src/gluonts/dataset/jsonl.py index e60219132a..a2858ad102 100644 --- a/src/gluonts/dataset/jsonl.py +++ b/src/gluonts/dataset/jsonl.py @@ -73,8 +73,11 @@ def __iter__(self): with open(self.path) as jsonl_file: for line_number, raw in enumerate(jsonl_file): lower_bound = MPWorkerInfo.worker_id * segment_size - upper_bound = min( - len(self), (MPWorkerInfo.worker_id + 1) * segment_size + upper_bound = ( + (MPWorkerInfo.worker_id + 1) * segment_size + if MPWorkerInfo.worker_id + 1 + != MPWorkerInfo.num_workers + else len(self) ) if not lower_bound <= line_number < upper_bound: continue diff --git a/test/dataset/test_common.py b/test/dataset/test_common.py index 9d5f35dce6..9e2f609510 100644 --- a/test/dataset/test_common.py +++ b/test/dataset/test_common.py @@ -14,7 +14,7 @@ import pytest import pandas as pd -from gluonts.dataset.common import ProcessStartField +from gluonts.dataset.common import ProcessStartField, ListDataset @pytest.mark.parametrize( diff --git a/test/dataset/test_multiprocessing_loader.py b/test/dataset/test_multiprocessing_loader.py index ce1725fd68..083ba4aa0b 100644 --- a/test/dataset/test_multiprocessing_loader.py +++ b/test/dataset/test_multiprocessing_loader.py @@ -48,9 +48,12 @@ CONTEXT_LEN = 7 SPLITTING_SAMPLE_PROBABILITY = 1 # crucial for the ValidationDataLoader test CD_NUM_STEPS = 14 -CD_NUM_TIME_SERIES = 50 # too small and batch test might fail +CD_NUM_TIME_SERIES = 47 # too small and batch test might fail CD_MAX_LEN_MULTIPLICATION_FACTOR = 3 +# NEEDED FOR SEGMENTATION COVERAGE TEST: +assert CD_NUM_TIME_SERIES % NUM_WORKERS_MP != 0 + # CACHED DATA _data_cache = None @@ -162,6 +165,10 @@ def test_validation_loader_equivalence() -> None: # ASSERTIONS: + assert len(list_dataset.list_data) == len( + get_transformation_counts(mp_val_data_loader_result_01) + ), "The dataloaders do not cover the whole dataset. Check that each time series was assigned at least one worker." + assert get_transformation_counts( mp_val_data_loader_result_01 ) == get_transformation_counts( From 314cdbe176931533a9de32e6883f787fe7503908 Mon Sep 17 00:00:00 2001 From: Aaron Spieler <25365592+AaronSpieler@users.noreply.github.com> Date: Thu, 16 Apr 2020 22:09:46 +0200 Subject: [PATCH 18/20] removed print --- src/gluonts/dataset/jsonl.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/gluonts/dataset/jsonl.py b/src/gluonts/dataset/jsonl.py index a2858ad102..b2479dccb9 100644 --- a/src/gluonts/dataset/jsonl.py +++ b/src/gluonts/dataset/jsonl.py @@ -67,8 +67,6 @@ def __iter__(self): # with lower and upper bound, where each worker is assigned one segment segment_size = int(len(self) / MPWorkerInfo.num_workers) - print("dataset_len:, ", len(self)) - if not self.cache or (self.cache and not self._data_cache): with open(self.path) as jsonl_file: for line_number, raw in enumerate(jsonl_file): From 5e21cfc5ec20b72ebddc464dbf4aacd29d33c3db Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Thu, 16 Apr 2020 22:32:36 +0200 Subject: [PATCH 19/20] Removed unused import. --- src/gluonts/dataset/parallelized_loader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index fb45f2cbb6..d51d464b97 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -39,7 +39,6 @@ # Third-party imports import numpy as np -import pandas as pd from mxnet import nd, context import mxnet as mx From e6582e72055ee0919c7c965a7aac64d96ec81bd3 Mon Sep 17 00:00:00 2001 From: Aaron Spieler Date: Fri, 17 Apr 2020 11:36:58 +0200 Subject: [PATCH 20/20] Disabling windows mp evaluation, lowering required JSonLine throughput. --- src/gluonts/dataset/parallelized_loader.py | 2 +- src/gluonts/evaluation/_base.py | 4 +++- test/dataset/test_loader.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/gluonts/dataset/parallelized_loader.py b/src/gluonts/dataset/parallelized_loader.py index d51d464b97..ce19ca4603 100644 --- a/src/gluonts/dataset/parallelized_loader.py +++ b/src/gluonts/dataset/parallelized_loader.py @@ -621,7 +621,7 @@ def same_process_iter(): while True: # take the next batch size elements batch_samples = list( - itertools.islice(generator, self.batch_size,) + itertools.islice(generator, self.batch_size) ) # shuffle data if appropriate and prepare for batching diff --git a/src/gluonts/evaluation/_base.py b/src/gluonts/evaluation/_base.py index 2e68c86e51..56a3ca1b68 100644 --- a/src/gluonts/evaluation/_base.py +++ b/src/gluonts/evaluation/_base.py @@ -15,6 +15,8 @@ import logging import multiprocessing import re +import sys + from collections import Sized from functools import lru_cache from itertools import chain, tee @@ -161,7 +163,7 @@ def __call__( total=num_series, desc="Running evaluation", ) as it, np.errstate(invalid="ignore"): - if self.num_workers > 0: + if self.num_workers > 0 and not sys.platform == "win32": mp_pool = multiprocessing.Pool( initializer=_worker_init(self), processes=self.num_workers ) diff --git a/test/dataset/test_loader.py b/test/dataset/test_loader.py index a34cf146a7..97402ea1b5 100644 --- a/test/dataset/test_loader.py +++ b/test/dataset/test_loader.py @@ -112,7 +112,7 @@ def test_io_speed() -> None: ("baseline", baseline, 100_000), # ('json.loads', load_json, xxx), ("ujson.loads", load_ujson, 20000), - ("JsonLinesFile", load_json_lines_file, 20000), + ("JsonLinesFile", load_json_lines_file, 10000), ("ListDataset", load_list_dataset, 500), ("FileDataset", load_file_dataset, 500), ("FileDatasetCached", load_file_dataset_cached, 500),