Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes empty partitions after dropping rows and splitting datasets #2328

Merged
merged 34 commits into from
Aug 4, 2022
Merged
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
dd83127
Custom to_dask() implementation that also passes meta during dd.DataF…
arnavgarg1 Jul 27, 2022
3ba39cd
Using 100 rows instead
arnavgarg1 Jul 28, 2022
55d3019
Pin Ray nightly version
geoffreyangus Jul 28, 2022
0a1477e
Merge branch 'master' of https://github.com/ludwig-ai/ludwig
geoffreyangus Jul 28, 2022
05fb78a
fix link
geoffreyangus Jul 28, 2022
ddb4ada
pin torch to 07/26
geoffreyangus Jul 28, 2022
adea85c
cleanup
geoffreyangus Jul 28, 2022
9506182
Removes empty partitions after dropping rows and splitting datasets
geoffreyangus Jul 28, 2022
590cddb
remove extraneous comment in known_divisions
geoffreyangus Jul 28, 2022
9a81617
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 28, 2022
d5402f5
add unit test
geoffreyangus Jul 29, 2022
d94ad85
Merge branch 'remove-empty-partitions' of https://github.com/ludwig-a…
geoffreyangus Jul 29, 2022
83ea53b
upgrade ray pinned version to enable parquet partition filtering
geoffreyangus Jul 29, 2022
0f712f4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 29, 2022
6f0ac1d
Merge branch 'custom_to_dask' of https://github.com/ludwig-ai/ludwig
geoffreyangus Jul 29, 2022
dad7605
added preliminary check for empty partitions to improve speed
geoffreyangus Jul 29, 2022
d3c2a5a
downgrade Ray to ensure TensorDtypes are not inferred during Ray Data…
geoffreyangus Jul 29, 2022
8586c70
merge
geoffreyangus Jul 29, 2022
51dfa85
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 29, 2022
6116a71
Merge branch 'pin-ray-nightly' into remove-empty-partitions
geoffreyangus Jul 29, 2022
566b755
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 29, 2022
883d008
add NoneType check to split
geoffreyangus Jul 29, 2022
c03bf4f
unpin torch
geoffreyangus Aug 1, 2022
289e974
Merge branch 'pin-ray-nightly' into remove-empty-partitions
geoffreyangus Aug 1, 2022
459255a
move persist call to this PR
geoffreyangus Aug 1, 2022
825fe72
Merge branch 'master' of https://github.com/ludwig-ai/ludwig
geoffreyangus Aug 2, 2022
f9d715e
Merge branch 'master' of https://github.com/ludwig-ai/ludwig
geoffreyangus Aug 3, 2022
d77ee41
Merge branch 'master' into remove-empty-partitions
geoffreyangus Aug 3, 2022
aa221b0
revert to to_dask()
geoffreyangus Aug 3, 2022
724d202
Merge branch 'master' of https://github.com/ludwig-ai/ludwig
geoffreyangus Aug 3, 2022
fb53d72
Merge branch 'master' into remove-empty-partitions
geoffreyangus Aug 3, 2022
d079969
reverted custom to_dask and isolated ray into DaskEngine methods
geoffreyangus Aug 3, 2022
6efa162
Merge branch 'master' of https://github.com/ludwig-ai/ludwig
geoffreyangus Aug 3, 2022
5cd4d49
Merge branch 'master' into remove-empty-partitions
geoffreyangus Aug 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions ludwig/data/dataframe/dask.py
Original file line number Diff line number Diff line change
@@ -20,9 +20,7 @@
import dask
import dask.array as da
import dask.dataframe as dd
import ray.data
from dask.diagnostics import ProgressBar
from ray.util.dask import ray_dask_get

from ludwig.data.dataframe.base import DataFrameEngine
from ludwig.utils.data_utils import split_by_slices
@@ -39,6 +37,8 @@ def set_scheduler(scheduler):

class DaskEngine(DataFrameEngine):
def __init__(self, parallelism=None, persist=True, _use_ray=True, **kwargs):
from ray.util.dask import ray_dask_get

self._parallelism = parallelism
self._persist = persist
if _use_ray:
@@ -86,6 +86,8 @@ def map_partitions(self, series, map_fn, meta=None):
return series.map_partitions(map_fn, meta=meta)

def map_batches(self, series, map_fn):
import ray.data

ds = ray.data.from_dask(series)
ds = ds.map_batches(map_fn, batch_format="pandas")
return ds.to_dask()
@@ -114,6 +116,23 @@ def split(self, df, probabilities):
slices = df.partitions
return split_by_slices(slices, n, probabilities)

def remove_empty_partitions(self, df):
# Reference: https://stackoverflow.com/questions/47812785/remove-empty-partitions-in-dask
ll = list(df.map_partitions(len).compute())
if all([ll_i > 0 for ll_i in ll]):
return df

df_delayed = df.to_delayed()
df_delayed_new = list()
empty_partition = None
for ix, n in enumerate(ll):
if n == 0:
empty_partition = df.get_partition(ix)
else:
df_delayed_new.append(df_delayed[ix])
df = dd.from_delayed(df_delayed_new, meta=empty_partition)
return df

def to_parquet(self, df, path, index=False):
with ProgressBar():
df.to_parquet(
3 changes: 3 additions & 0 deletions ludwig/data/dataframe/modin.py
Original file line number Diff line number Diff line change
@@ -59,6 +59,9 @@ def reduce_objects(self, series, reduce_fn):
def split(self, df, probabilities):
return split_by_slices(df.iloc, len(df), probabilities)

def remove_empty_partitions(self, df):
return df

def to_parquet(self, df, path, index=False):
df.to_parquet(path, engine="pyarrow", index=index)

3 changes: 3 additions & 0 deletions ludwig/data/dataframe/pandas.py
Original file line number Diff line number Diff line change
@@ -61,6 +61,9 @@ def reduce_objects(self, series, reduce_fn):
def split(self, df, probabilities):
return split_by_slices(df.iloc, len(df), probabilities)

def remove_empty_partitions(self, df):
return df

def to_parquet(self, df, path, index=False):
df.to_parquet(path, engine="pyarrow", index=index)

6 changes: 6 additions & 0 deletions ludwig/data/preprocessing.py
Original file line number Diff line number Diff line change
@@ -1187,6 +1187,12 @@ def build_dataset(
col_name_to_dtype[col_name] = col.dtype
dataset = dataset.astype(col_name_to_dtype)

# Persist the completed dataset with no NaNs
dataset = backend.df_engine.persist(dataset)

# Remove partitions that are empty after removing NaNs
dataset = backend.df_engine.remove_empty_partitions(dataset)

return dataset, metadata


3 changes: 3 additions & 0 deletions ludwig/data/split.py
Original file line number Diff line number Diff line change
@@ -217,4 +217,7 @@ def split_dataset(
"Encountered an empty training set while splitting data. Please double check the preprocessing split "
"configuration."
)

# Remove partitions that are empty after splitting
datasets = [None if dataset is None else backend.df_engine.remove_empty_partitions(dataset) for dataset in datasets]
return datasets
33 changes: 32 additions & 1 deletion tests/integration_tests/test_preprocessing.py
Original file line number Diff line number Diff line change
@@ -124,7 +124,6 @@ def test_dask_known_divisions(feature_fn, csv_filename, tmpdir):
input_features = [feature_fn(os.path.join(tmpdir, "generated_output"))]
output_features = [category_feature(vocab_size=5, reduce_input="sum")]

# num_examples=100 and npartitions=2 to ensure the test is not flaky, by having non-empty post-split datasets.
data_csv = generate_data(input_features, output_features, os.path.join(tmpdir, csv_filename), num_examples=100)
data_df = dd.from_pandas(pd.read_csv(data_csv), npartitions=2)
assert data_df.known_divisions
@@ -146,6 +145,38 @@ def test_dask_known_divisions(feature_fn, csv_filename, tmpdir):
)


@pytest.mark.distributed
def test_drop_empty_partitions(csv_filename, tmpdir):
import dask.dataframe as dd

input_features = [image_feature(os.path.join(tmpdir, "generated_output"))]
output_features = [category_feature(vocab_size=5, reduce_input="sum")]

# num_examples and npartitions set such that each post-split DataFrame has >1 samples, but empty partitions.
data_csv = generate_data(input_features, output_features, os.path.join(tmpdir, csv_filename), num_examples=25)
data_df = dd.from_pandas(pd.read_csv(data_csv), npartitions=10)

config = {
"input_features": input_features,
"output_features": output_features,
"trainer": {
"epochs": 2,
},
}

backend = "ray"
with init_backend(backend):
model = LudwigModel(config, backend=backend)
train_set, val_set, test_set, _ = model.preprocess(
data_df,
skip_save_processed_input=True,
)
for dataset in [train_set, val_set, test_set]:
df = dataset.ds.to_dask()
for partition in df.partitions:
assert len(partition) > 0, "empty partitions found in dataset"


@pytest.mark.parametrize("generate_images_as_numpy", [False, True])
def test_read_image_from_path(tmpdir, csv_filename, generate_images_as_numpy):
input_features = [image_feature(os.path.join(tmpdir, "generated_output"), save_as_numpy=generate_images_as_numpy)]