Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add row_group_size argument to Dataset.to_parquet #218

Merged
merged 6 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion merlin/io/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def _write_output_partition(
num_threads,
cpu,
suffix,
row_group_size,
):
df_size = len(df)
out_files_per_proc = out_files_per_proc or 1
Expand All @@ -85,6 +86,8 @@ def _write_output_partition(
num_threads=num_threads,
cpu=cpu,
suffix=suffix,
fs=fs,
row_group_size=row_group_size,
)
writer.set_col_names(labels=label_names, cats=cat_names, conts=cont_names)
writer_cache[processed_path] = writer
Expand Down Expand Up @@ -136,6 +139,7 @@ def _write_partitioned(
output_format,
num_threads,
cpu,
row_group_size,
):
# Logic copied from cudf/cudf/io/parquet.py
data_cols = df.columns.drop(partition_cols)
Expand All @@ -152,6 +156,7 @@ def _write_partitioned(
num_threads=num_threads,
cpu=cpu,
fns=fns,
row_group_size=row_group_size,
)
writer.set_col_names(labels=label_names, cats=cat_names, conts=cont_names)

Expand All @@ -176,6 +181,7 @@ def _write_subgraph(
num_threads,
cpu,
suffix,
row_group_size,
):
fns = fns if isinstance(fns, (tuple, list)) else (fns,)
writer = writer_factory(
Expand All @@ -187,6 +193,8 @@ def _write_subgraph(
num_threads=num_threads,
cpu=cpu,
fns=[fn + suffix for fn in fns],
fs=fs,
row_group_size=row_group_size,
)
writer.set_col_names(labels=label_names, cats=cat_names, conts=cont_names)

Expand Down Expand Up @@ -250,12 +258,21 @@ def _ddf_to_dataset(
num_threads,
cpu,
suffix="",
row_group_size=None,
partition_on=None,
schema=None,
):
# Construct graph for Dask-based dataset write
token = tokenize(
ddf, shuffle, out_files_per_proc, cat_names, cont_names, label_names, suffix, partition_on
ddf,
shuffle,
out_files_per_proc,
cat_names,
cont_names,
label_names,
suffix,
partition_on,
row_group_size,
)
name = "write-processed-" + token
write_name = name + "-partition" + token
Expand Down Expand Up @@ -284,6 +301,7 @@ def _ddf_to_dataset(
output_format,
num_threads,
cpu,
row_group_size,
)
dsk[name] = (
_write_metadata_files,
Expand Down Expand Up @@ -315,6 +333,7 @@ def _ddf_to_dataset(
num_threads,
cpu,
suffix,
row_group_size,
)
dsk[name] = (
_write_metadata_files,
Expand Down Expand Up @@ -342,6 +361,7 @@ def _ddf_to_dataset(
num_threads,
cpu,
suffix,
row_group_size,
)
task_list.append(key)
dsk[name] = (lambda x: x, task_list)
Expand Down
8 changes: 8 additions & 0 deletions merlin/io/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ def to_parquet(
preserve_files=False,
output_files=None,
out_files_per_proc=None,
row_group_size=None,
num_threads=0,
dtypes=None,
cats=None,
Expand Down Expand Up @@ -717,6 +718,12 @@ def to_parquet(
argument. If `method="subgraph"`, the total number of files is determined
by `output_files` (and `out_files_per_proc` must be 1 if a dictionary is
specified).
row_group_size : integer
Maximum number of rows to include in each Parquet row-group. By default,
the maximum row-group size will be chosen by the backend Parquet engine
(cudf or pyarrow). Note that cudf currently prohibits this value from
being less than `5000` rows. If smaller row-groups are necessary, try
calling `to_cpu()` before writing to disk.
num_threads : integer
Number of IO threads to use for writing the output dataset.
For `0` (default), no dedicated IO threads will be used.
Expand Down Expand Up @@ -922,6 +929,7 @@ def to_parquet(
num_threads,
self.cpu,
suffix=suffix,
row_group_size=row_group_size,
partition_on=partition_on,
schema=schema if write_hugectr_keyset else None,
)
Expand Down
9 changes: 6 additions & 3 deletions merlin/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,11 +1042,13 @@ def _bytesio_to_disk(self):


class GPUParquetWriter(BaseParquetWriter):
def __init__(self, out_dir, **kwargs):
def __init__(self, out_dir, row_group_size=None, **kwargs):
super().__init__(out_dir, **kwargs)
# Passing index=False when creating ParquetWriter
# to avoid bug: https://github.com/rapidsai/cudf/issues/7011
self.pwriter_kwargs = {"compression": None, "index": False}
if row_group_size:
self.pwriter_kwargs.update({"row_group_size_rows": row_group_size})

@property
def _pwriter(self):
Expand Down Expand Up @@ -1083,10 +1085,11 @@ def _close_writers(self):


class CPUParquetWriter(BaseParquetWriter):
def __init__(self, out_dir, **kwargs):
def __init__(self, out_dir, row_group_size=None, **kwargs):
super().__init__(out_dir, **kwargs)
self.md_collectors = {}
self.pwriter_kwargs = {"compression": None}
self._row_group_size = row_group_size

@property
def _pwriter(self):
Expand All @@ -1099,7 +1102,7 @@ def _get_row_group_size(self, df):
# Make sure our `row_group_size` argument (which corresponds
# to the number of rows in each row-group) will produce
# row-groups ~128MB in size.
if not hasattr(self, "_row_group_size"):
if self._row_group_size is None:
row_size = df.memory_usage(deep=True).sum() / max(len(df), 1)
self._row_group_size = math.ceil(128_000_000 / row_size)
return self._row_group_size
Expand Down
10 changes: 7 additions & 3 deletions merlin/io/writer_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ def writer_factory(
cpu=False,
fns=None,
suffix=None,
fs=None,
**kwargs, # Format-specific arguments
):
if output_format is None:
return None

writer_cls, fs = _writer_cls_factory(output_format, output_path, cpu=cpu)
writer_cls, fs = _writer_cls_factory(output_format, output_path, cpu=cpu, fs=fs)
return writer_cls(
output_path,
num_out_files=out_files_per_proc,
Expand All @@ -46,10 +48,11 @@ def writer_factory(
cpu=cpu,
fns=fns,
suffix=suffix,
**kwargs, # Format-specific arguments
)


def _writer_cls_factory(output_format, output_path, cpu=None):
def _writer_cls_factory(output_format, output_path, cpu=None, fs=None):
if output_format == "parquet" and cpu:
writer_cls = CPUParquetWriter
elif output_format == "parquet":
Expand All @@ -59,5 +62,6 @@ def _writer_cls_factory(output_format, output_path, cpu=None):
else:
raise ValueError("Output format not yet supported.")

fs = get_fs_token_paths(output_path)[0]
if fs is None:
fs = get_fs_token_paths(output_path)[0]
return writer_cls, fs
21 changes: 21 additions & 0 deletions tests/unit/io/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,27 @@ def test_to_parquet_output_files(tmpdir, datasets, output_files, out_files_per_p
assert len(ddf0) == len(ddf1)


@pytest.mark.parametrize("row_group_size", [5000, 10000])
@pytest.mark.parametrize("cpu", [True, False])
def test_to_parquet_row_group_size(tmpdir, cpu, row_group_size):
# Test that row_group_size argument is satisfied.
# NOTE: cuDF prohibits row_group_size_rows<5000
outdir = str(tmpdir)
ddf0 = dd.from_pandas(
(pd if cpu else cudf).DataFrame({"a": range(50_000)}),
npartitions=2,
)
dataset = merlin.io.Dataset(ddf0)
dataset.to_parquet(
outdir,
output_files=1,
row_group_size=row_group_size,
)

result = dd.read_parquet(outdir, split_row_groups=True)
assert all(len(part) <= row_group_size for part in result.partitions)


@pytest.mark.parametrize("engine", ["csv", "parquet"])
def test_validate_dataset(datasets, engine):
with warnings.catch_warnings():
Expand Down