Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(output-formats): add support for to_parquet_dir #9781

Merged
merged 17 commits into from
Aug 8, 2024
Merged
39 changes: 39 additions & 0 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,45 @@ def to_parquet(
for batch in batch_reader:
writer.write_batch(batch)

@util.experimental
def to_parquet_dir(
self,
expr: ir.Table,
path: str | Path,
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
*,
params: Mapping[ir.Scalar, Any] | None = None,
**kwargs: Any,
) -> None:
"""Write the results of executing the given expression to a parquet file in a directory.

This method is eager and will execute the associated expression
immediately.

Parameters
----------
expr
The ibis expression to execute and persist to parquet.
path
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
The data source. A string or Path to the directory where the parquet file will be written.
params
Mapping of scalar parameter expressions to value.
**kwargs
Additional keyword arguments passed to pyarrow.dataset.write_dataset

https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html

"""
self._import_pyarrow()
import pyarrow.dataset as ds

dir_path = Path(path)
ncclementi marked this conversation as resolved.
Show resolved Hide resolved

# by default write_dataset creates the directory
with expr.to_pyarrow_batches(params=params) as batch_reader:
ds.write_dataset(
batch_reader, base_dir=dir_path, format="parquet", **kwargs
)

@util.experimental
def to_csv(
self,
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@
df = df.write.format(format)
for k, v in (options or {}).items():
df = df.option(k, v)
df.save(path)
df.save(os.fspath(path))

Check warning on line 1289 in ibis/backends/pyspark/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/pyspark/__init__.py#L1289

Added line #L1289 was not covered by tests
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
return None
sq = df.writeStream.format(format)
sq = sq.option("path", os.fspath(path))
Expand Down
25 changes: 25 additions & 0 deletions ibis/backends/tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,31 @@
)


def test_table_to_parquet_dir(tmp_path, backend, awards_players):
outparquet_dir = tmp_path / "out"

if backend.name() == "pyspark":
# pyspark already writes more than one file
awards_players.to_parquet_dir(outparquet_dir)

Check warning on line 223 in ibis/backends/tests/test_export.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_export.py#L223

Added line #L223 was not covered by tests
else:
# max_ force pyarrow to write more than one parquet file
awards_players.to_parquet_dir(
outparquet_dir, max_rows_per_file=3000, max_rows_per_group=3000
)

parquet_files = sorted(
outparquet_dir.glob("*.parquet"),
key=lambda path: int(path.with_suffix("").name.split("-")[1]),
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
)

df_list = [pd.read_parquet(file) for file in parquet_files]
df = pd.concat(df_list).reset_index(drop=True)

backend.assert_frame_equal(
awards_players.to_pandas().fillna(pd.NA), df.fillna(pd.NA)
)


@pytest.mark.notimpl(
["duckdb"],
reason="cannot inline WriteOptions objects",
Expand Down
31 changes: 31 additions & 0 deletions ibis/expr/types/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,37 @@ def to_parquet(
"""
self._find_backend(use_default=True).to_parquet(self, path, **kwargs)

@experimental
def to_parquet_dir(
self,
path: str | Path,
*,
params: Mapping[ir.Scalar, Any] | None = None,
**kwargs: Any,
) -> None:
"""Write the results of executing the given expression to a parquet file in a directory.

This method is eager and will execute the associated expression
immediately.

Parameters
----------
path
The data source. A string or Path to the directory where the parquet file will be written.
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
filename_prefix
The prefix name of the parquet file. What is before `.parquet`.
write_batches
If True writes each batch into a different file.
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
params
Mapping of scalar parameter expressions to value.
**kwargs
Additional keyword arguments passed to pyarrow.parquet.ParquetWriter

https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html
ncclementi marked this conversation as resolved.
Show resolved Hide resolved

"""
self._find_backend(use_default=True).to_parquet_dir(self, path, **kwargs)

@experimental
def to_csv(
self,
Expand Down
Loading