Skip to content

Commit

Permalink
[time series transformers] update dataloader API (#1135)
Browse files Browse the repository at this point in the history
* update dataloader API

* revert comment

* add back Cached transform
  • Loading branch information
kashif authored May 23, 2023
1 parent c03f5b7 commit 2411c89
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 136 deletions.
58 changes: 23 additions & 35 deletions informer.md
Original file line number Diff line number Diff line change
Expand Up @@ -621,18 +621,17 @@ def create_instance_splitter(
)
```

## Create PyTorch DataLoaders
## Create DataLoaders

Next, it's time to create PyTorch DataLoaders, which allow us to have batches of (input, output) pairs - or in other words (`past_values`, `future_values`).
Next, it's time to create the DataLoaders, which allow us to have batches of (input, output) pairs - or in other words (`past_values`, `future_values`).


```python
from typing import Iterable

from torch.utils.data import DataLoader

from gluonts.itertools import Cached, Cyclic, IterableSlice, PseudoShuffled
from gluonts.torch.util import IterableDataset
import torch
from gluonts.itertools import Cached, Cyclic
from gluonts.dataset.loader import as_stacked_batches


def create_train_dataloader(
Expand Down Expand Up @@ -668,34 +667,23 @@ def create_train_dataloader(
transformed_data = Cached(transformed_data)

# we initialize a Training instance
instance_splitter = create_instance_splitter(config, "train") + SelectFields(
TRAINING_INPUT_NAMES
)
instance_splitter = create_instance_splitter(config, "train")

# the instance splitter will sample a window of
# context length + lags + prediction length (from all the possible transformed time series, 1 in our case)
# randomly from within the target time series and return an iterator.
stream = Cyclic(transformed_data).stream()
training_instances = instance_splitter.apply(
Cyclic(transformed_data)
if shuffle_buffer_length is None
else PseudoShuffled(
Cyclic(transformed_data),
shuffle_buffer_length=shuffle_buffer_length,
)
stream, is_train=True
)

# from the training instances iterator we now return a Dataloader which will
# continue to sample random windows for as long as it is called
# to return batch_size of the appropriate tensors ready for training!
return IterableSlice(
iter(
DataLoader(
IterableDataset(training_instances),
batch_size=batch_size,
**kwargs,
)
),
num_batches_per_epoch,

return as_stacked_batches(
training_instances,
batch_size=batch_size,
shuffle_buffer_length=shuffle_buffer_length,
field_names=TRAINING_INPUT_NAMES,
output_type=torch.tensor,
num_batches_per_epoch=num_batches_per_epoch,
)
```

Expand Down Expand Up @@ -725,16 +713,16 @@ def create_test_dataloader(

# we create a Test Instance splitter which will sample the very last
# context window seen during training only for the encoder.
instance_sampler = create_instance_splitter(config, "test") + SelectFields(
PREDICTION_INPUT_NAMES
)
instance_sampler = create_instance_splitter(config, "test")

# we apply the transformations in test mode
testing_instances = instance_sampler.apply(transformed_data, is_train=False)

# This returns a Dataloader which will go over the dataset once.
return DataLoader(
IterableDataset(testing_instances), batch_size=batch_size, **kwargs

return as_stacked_batches(
testing_instances,
batch_size=batch_size,
output_type=torch.tensor,
field_names=PREDICTION_INPUT_NAMES,
)
```

Expand Down
63 changes: 28 additions & 35 deletions time-series-transformers.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,25 +475,27 @@ def create_instance_splitter(
)
```

## Create PyTorch DataLoaders
## Create DataLoaders

Next, it's time to create PyTorch DataLoaders, which allow us to have batches of (input, output) pairs - or in other words (`past_values`, `future_values`).
Next, it's time to create the DataLoaders, which allow us to have batches of (input, output) pairs - or in other words (`past_values`, `future_values`).


```python
from gluonts.itertools import Cyclic, IterableSlice, PseudoShuffled
from gluonts.torch.util import IterableDataset
from torch.utils.data import DataLoader

from typing import Iterable

import torch
from gluonts.itertools import Cached, Cyclic
from gluonts.dataset.loader import as_stacked_batches


def create_train_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
num_batches_per_epoch: int,
shuffle_buffer_length: Optional[int] = None,
cache_data: bool = True,
**kwargs,
) -> Iterable:
PREDICTION_INPUT_NAMES = [
Expand All @@ -515,36 +517,27 @@ def create_train_dataloader(

transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data, is_train=True)
if cache_data:
transformed_data = Cached(transformed_data)

# we initialize a Training instance
instance_splitter = create_instance_splitter(config, "train") + SelectFields(
TRAINING_INPUT_NAMES
)
instance_splitter = create_instance_splitter(config, "train")

# the instance splitter will sample a window of
# context length + lags + prediction length (from the 366 possible transformed time series)
# randomly from within the target time series and return an iterator.
stream = Cyclic(transformed_data).stream()
training_instances = instance_splitter.apply(
Cyclic(transformed_data)
if shuffle_buffer_length is None
else PseudoShuffled(
Cyclic(transformed_data),
shuffle_buffer_length=shuffle_buffer_length,
)
stream, is_train=True
)

# from the training instances iterator we now return a Dataloader which will
# continue to sample random windows for as long as it is called
# to return batch_size of the appropriate tensors ready for training!
return IterableSlice(
iter(
DataLoader(
IterableDataset(training_instances),
batch_size=batch_size,
**kwargs,
)
),
num_batches_per_epoch,

return as_stacked_batches(
training_instances,
batch_size=batch_size,
shuffle_buffer_length=shuffle_buffer_length,
field_names=TRAINING_INPUT_NAMES,
output_type=torch.tensor,
num_batches_per_epoch=num_batches_per_epoch,
)
```

Expand Down Expand Up @@ -574,16 +567,16 @@ def create_test_dataloader(

# we create a Test Instance splitter which will sample the very last
# context window seen during training only for the encoder.
instance_sampler = create_instance_splitter(config, "test") + SelectFields(
PREDICTION_INPUT_NAMES
)
instance_sampler = create_instance_splitter(config, "test")

# we apply the transformations in test mode
testing_instances = instance_sampler.apply(transformed_data, is_train=False)

# This returns a Dataloader which will go over the dataset once.
return DataLoader(
IterableDataset(testing_instances), batch_size=batch_size, **kwargs

return as_stacked_batches(
testing_instances,
batch_size=batch_size,
output_type=torch.tensor,
field_names=PREDICTION_INPUT_NAMES,
)
```

Expand Down
54 changes: 21 additions & 33 deletions zh/informer.md
Original file line number Diff line number Diff line change
Expand Up @@ -625,10 +625,9 @@ def create_instance_splitter(
```python
from typing import Iterable

from torch.utils.data import DataLoader

from gluonts.itertools import Cached, Cyclic, IterableSlice, PseudoShuffled
from gluonts.torch.util import IterableDataset
import torch
from gluonts.itertools import Cached, Cyclic
from gluonts.dataset.loader import as_stacked_batches


def create_train_dataloader(
Expand Down Expand Up @@ -664,34 +663,23 @@ def create_train_dataloader(
transformed_data = Cached(transformed_data)

# we initialize a Training instance
instance_splitter = create_instance_splitter(config, "train") + SelectFields(
TRAINING_INPUT_NAMES
)
instance_splitter = create_instance_splitter(config, "train")

# the instance splitter will sample a window of
# context length + lags + prediction length (from all the possible transformed time series, 1 in our case)
# randomly from within the target time series and return an iterator.
stream = Cyclic(transformed_data).stream()
training_instances = instance_splitter.apply(
Cyclic(transformed_data)
if shuffle_buffer_length is None
else PseudoShuffled(
Cyclic(transformed_data),
shuffle_buffer_length=shuffle_buffer_length,
)
stream, is_train=True
)

# from the training instances iterator we now return a Dataloader which will
# continue to sample random windows for as long as it is called
# to return batch_size of the appropriate tensors ready for training!
return IterableSlice(
iter(
DataLoader(
IterableDataset(training_instances),
batch_size=batch_size,
**kwargs,
)
),
num_batches_per_epoch,

return as_stacked_batches(
training_instances,
batch_size=batch_size,
shuffle_buffer_length=shuffle_buffer_length,
field_names=TRAINING_INPUT_NAMES,
output_type=torch.tensor,
num_batches_per_epoch=num_batches_per_epoch,
)
```

Expand Down Expand Up @@ -721,16 +709,16 @@ def create_test_dataloader(

# we create a Test Instance splitter which will sample the very last
# context window seen during training only for the encoder.
instance_sampler = create_instance_splitter(config, "test") + SelectFields(
PREDICTION_INPUT_NAMES
)
instance_sampler = create_instance_splitter(config, "test")

# we apply the transformations in test mode
testing_instances = instance_sampler.apply(transformed_data, is_train=False)

# This returns a Dataloader which will go over the dataset once.
return DataLoader(
IterableDataset(testing_instances), batch_size=batch_size, **kwargs

return as_stacked_batches(
testing_instances,
batch_size=batch_size,
output_type=torch.tensor,
field_names=PREDICTION_INPUT_NAMES,
)
```

Expand Down
59 changes: 26 additions & 33 deletions zh/time-series-transformers.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,19 +463,21 @@ def create_instance_splitter(


```python
from gluonts.itertools import Cyclic, IterableSlice, PseudoShuffled
from gluonts.torch.util import IterableDataset
from torch.utils.data import DataLoader

from typing import Iterable

import torch
from gluonts.itertools import Cached, Cyclic
from gluonts.dataset.loader import as_stacked_batches


def create_train_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
num_batches_per_epoch: int,
shuffle_buffer_length: Optional[int] = None,
cache_data: bool = True,
**kwargs,
) -> Iterable:
PREDICTION_INPUT_NAMES = [
Expand All @@ -497,36 +499,27 @@ def create_train_dataloader(

transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data, is_train=True)
if cache_data:
transformed_data = Cached(transformed_data)

# we initialize a Training instance
instance_splitter = create_instance_splitter(config, "train") + SelectFields(
TRAINING_INPUT_NAMES
)
instance_splitter = create_instance_splitter(config, "train")

# the instance splitter will sample a window of
# context length + lags + prediction length (from the 366 possible transformed time series)
# randomly from within the target time series and return an iterator.
stream = Cyclic(transformed_data).stream()
training_instances = instance_splitter.apply(
Cyclic(transformed_data)
if shuffle_buffer_length is None
else PseudoShuffled(
Cyclic(transformed_data),
shuffle_buffer_length=shuffle_buffer_length,
)
stream, is_train=True
)

# from the training instances iterator we now return a Dataloader which will
# continue to sample random windows for as long as it is called
# to return batch_size of the appropriate tensors ready for training!
return IterableSlice(
iter(
DataLoader(
IterableDataset(training_instances),
batch_size=batch_size,
**kwargs,
)
),
num_batches_per_epoch,

return as_stacked_batches(
training_instances,
batch_size=batch_size,
shuffle_buffer_length=shuffle_buffer_length,
field_names=TRAINING_INPUT_NAMES,
output_type=torch.tensor,
num_batches_per_epoch=num_batches_per_epoch,
)
```

Expand Down Expand Up @@ -556,16 +549,16 @@ def create_test_dataloader(

# we create a Test Instance splitter which will sample the very last
# context window seen during training only for the encoder.
instance_sampler = create_instance_splitter(config, "test") + SelectFields(
PREDICTION_INPUT_NAMES
)
instance_sampler = create_instance_splitter(config, "test")

# we apply the transformations in test mode
testing_instances = instance_sampler.apply(transformed_data, is_train=False)

# This returns a Dataloader which will go over the dataset once.
return DataLoader(
IterableDataset(testing_instances), batch_size=batch_size, **kwargs

return as_stacked_batches(
testing_instances,
batch_size=batch_size,
output_type=torch.tensor,
field_names=PREDICTION_INPUT_NAMES,
)
```

Expand Down

0 comments on commit 2411c89

Please sign in to comment.