Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use window_size_bytes: auto to specify automatic windowing #3076

Merged
merged 6 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions ludwig/backend/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,19 +400,13 @@ def train(
}

dataset = {"train": training_set.ds}
stream_window_size = {
"train": training_set.get_window_size_bytes(self.data_loader_kwargs.get("window_size_bytes", None))
}
stream_window_size = {"train": training_set.window_size_bytes}
if validation_set is not None:
dataset["val"] = validation_set.ds
stream_window_size["val"] = validation_set.get_window_size_bytes(
self.data_loader_kwargs.get("window_size_bytes", None)
)
stream_window_size["val"] = validation_set.window_size_bytes
if test_set is not None:
dataset["test"] = test_set.ds
stream_window_size["test"] = test_set.get_window_size_bytes(
self.data_loader_kwargs.get("window_size_bytes", None)
)
stream_window_size["test"] = test_set.window_size_bytes

with create_runner(**self.trainer_kwargs) as runner:
trainer_results = runner.run(
Expand Down Expand Up @@ -623,9 +617,7 @@ def batch_evaluation(
with create_runner(**self.trainer_kwargs) as runner:
# Collect eval metrics by distributing work across nodes / gpus with Horovod
datasets = {"eval": dataset.ds}
stream_window_size = {
"eval": dataset.get_window_size_bytes(self.data_loader_kwargs.get("window_size_bytes", None))
}
stream_window_size = {"eval": dataset.window_size_bytes}
predictor_kwargs = {**self.predictor_kwargs, "collect_predictions": False}
eval_results = runner.run(
lambda config: eval_fn(**config),
Expand Down
41 changes: 21 additions & 20 deletions ludwig/data/dataset/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import queue
import threading
from functools import lru_cache
from typing import Dict, Iterator, Optional, Union
from typing import Dict, Iterator, Literal, Optional, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -62,9 +62,13 @@ def read_remote_parquet(path: str):
class RayDataset(Dataset):
"""Wrapper around ray.data.Dataset.

Attributes:
auto_window: If True and the dataset is larger than available memory,
automatically set window size to `<available memory> // 5`.
Args:
df: The data to wrap
features: Feature-level config indexed by feature name
training_set_metadata: Additional training set information
backend: The local/distributed compute coordinator
window_size_bytes: The requested size of a dataset window in bytes. If "auto", sets the window size relative to
the dataset size and object store size. If not specified, no windowing will occur.
"""

def __init__(
Expand All @@ -73,7 +77,7 @@ def __init__(
features: Dict[str, FeatureConfigDict],
training_set_metadata: TrainingSetMetadataDict,
backend: Backend,
auto_window: bool = False,
window_size_bytes: Optional[Union[int, Literal["auto"]]] = None,
):
self.df_engine = backend.df_engine
self.ds = self.df_engine.to_ray_dataset(df) if not isinstance(df, str) else read_remote_parquet(df)
Expand All @@ -82,16 +86,17 @@ def __init__(
self.data_hdf5_fp = training_set_metadata.get(DATA_TRAIN_HDF5_FP)
self.data_parquet_fp = training_set_metadata.get(DATA_TRAIN_PARQUET_FP)
self._processed_data_fp = df if isinstance(df, str) else None
self.auto_window = auto_window
self.window_size_bytes = self.get_window_size_bytes(window_size_bytes)

def get_window_size_bytes(self, window_size_bytes: Optional[int] = None) -> int:
# If user has specified a window size, use it as is
if window_size_bytes:
def get_window_size_bytes(self, window_size_bytes: Optional[Union[int, Literal["auto"]]] = None) -> int:
Comment on lines -89 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice use of literal, we should use that more in Ludwig

"""Return this dataset's window size in bytes, or translate auto-windowing into bytes."""
# If user has specified a window size, use it as-is.
if isinstance(window_size_bytes, int):
return window_size_bytes

# If the user does not supply a window size and the dataset is large,
# If the user requests auto window sizing and the dataset is large,
# set the window size to `<available memory> // 5`.
if self.auto_window and window_size_bytes is None:
elif window_size_bytes == "auto":
ds_memory_size = self.in_memory_size_bytes
cluster_memory_size = ray.cluster_resources()["object_store_memory"]
if ds_memory_size > cluster_memory_size // 5:
Expand All @@ -100,8 +105,8 @@ def get_window_size_bytes(self, window_size_bytes: Optional[int] = None) -> int:
"In-memory dataset size is greater than 20%% of object store memory. "
"Enabling windowed shuffling of data to prevent chances of OOMs. "
)
window_size_bytes = int(cluster_memory_size // 5)
return window_size_bytes
return int(cluster_memory_size // 5)

# By default, set to -1 so that an infinite window size
# will be used which effectively results in bulk data ingestion
return -1
Expand Down Expand Up @@ -168,15 +173,11 @@ def create(
dataset: Union[str, DataFrame],
config: ModelConfigDict,
training_set_metadata: TrainingSetMetadataDict,
auto_window: bool = False,
) -> "RayDataset":
"""Create a new Ray dataset with config.

Args:
auto_window: If True, enable autosizing of data windows for large datasets.
"""
"""Create a new Ray dataset with config."""
window_size_bytes = self.backend._data_loader_kwargs.get("window_size_bytes", None)
return RayDataset(
dataset, get_proc_features(config), training_set_metadata, self.backend, auto_window=auto_window
dataset, get_proc_features(config), training_set_metadata, self.backend, window_size_bytes=window_size_bytes
)

def save(
Expand Down
22 changes: 10 additions & 12 deletions tests/integration_tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import copy
import os
import tempfile
from typing import Optional
from typing import Literal, Optional, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -987,13 +987,12 @@ def num_partitions(self):
return 100

def create_dataset_pipeline(
self, size: int, auto_window: bool = True, window_size_bytes: Optional[int] = None
self, size: int, window_size_bytes: Optional[Union[int, Literal["auto"]]] = None
) -> "DatasetPipeline":
"""Create a dataset of specified size to test auto-sizing.

Args:
size: Total size of the dataset in bytes
auto_window: Flag determining whether autosizing is enabled
window_size_bytes: Pass to override the auto_window size

Returns:
Expand All @@ -1015,15 +1014,14 @@ def create_dataset_pipeline(
"output_features": [{"name": "out_column", "type": "binary"}],
TRAINER: {"epochs": 1, BATCH_SIZE: 128},
}
backend_config = {**RAY_BACKEND_CONFIG}
backend_config = copy.deepcopy(RAY_BACKEND_CONFIG)
backend_config["loader"] = {"window_size_bytes": window_size_bytes}
Comment on lines +1017 to +1018
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious - are we always guaranteed to have the loader key in the backend config? If not, do we need to add a default value for this in the schema (possibly None)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under the hood, this update uses backend._data_loader_kwargs which the RayBackend initializer sets with self._data_loader_kwargs = loader or {}. We don't actually rely on the existence of loader, but rather the backend's handling of its config.

backend_config["preprocessor_kwargs"] = {"num_cpu": 1}
model = LudwigModel(config, backend=backend_config)

# Create a dataset using the model backend to ensure it
# is initialized correctly.
ds = model.backend.dataset_manager.create(
df, config=model.config, training_set_metadata={}, auto_window=auto_window
)
ds = model.backend.dataset_manager.create(df, config=model.config, training_set_metadata={})

# To window without using a training session, we configure `DataParallelIngestSpec` to use the specified window
# size and turn off other features (e.g., shuffle) that may incur computational overhead.
Expand All @@ -1032,7 +1030,7 @@ def create_dataset_pipeline(
split=False,
transform=False,
use_stream_api=True,
stream_window_size=ds.get_window_size_bytes(window_size_bytes=window_size_bytes),
stream_window_size=ds.window_size_bytes,
global_shuffle=False,
)
spec = DataParallelIngestSpec({"train": dataset_config})
Expand All @@ -1055,27 +1053,27 @@ def test_small_dataset(self, ray_cluster_2cpu):
Without automatic window sizing, the number of blocks in the pipeline should match the number of partitions in
the Dask dataframe.
"""
pipe = self.create_dataset_pipeline(self.auto_window_size // 2)
pipe = self.create_dataset_pipeline(self.auto_window_size // 2, window_size_bytes="auto")
window = next(self.window_gen(pipe))
assert window.num_blocks() == self.num_partitions

def test_large_dataset(self, ray_cluster_2cpu):
"""A large dataset should trigger windowing."""
pipe = self.create_dataset_pipeline(self.auto_window_size * 2)
pipe = self.create_dataset_pipeline(self.auto_window_size * 2, window_size_bytes="auto")
for i, window in enumerate(self.window_gen(pipe)):
assert window.num_blocks() < self.num_partitions
if i > 100:
break

def test_window_autosizing_disabled(self, ray_cluster_2cpu):
"""If window autosizing is disabled, no datasets should be windowed."""
pipe = self.create_dataset_pipeline(self.auto_window_size * 2, auto_window=False)
pipe = self.create_dataset_pipeline(self.auto_window_size * 2, window_size_bytes=None)
window = next(self.window_gen(pipe))
assert window.num_blocks() == self.num_partitions

def test_user_window_size(self, ray_cluster_2cpu):
"""If the user supplies a window size, do not autosize."""
auto_pipe = self.create_dataset_pipeline(self.auto_window_size * 2)
auto_pipe = self.create_dataset_pipeline(self.auto_window_size * 2, window_size_bytes="auto")
user_pipe = self.create_dataset_pipeline(self.auto_window_size * 2, window_size_bytes=self.auto_window_size * 4)
windows = zip(self.window_gen(auto_pipe), self.window_gen(user_pipe))

Expand Down