Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting write.parquet.row-group-limit #1016

Merged
merged 5 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Iceberg tables support table properties to configure table behavior.
| -------------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- |
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko @sungwy Thanks, I believe this has resolved my issue #1012 as well.

However, I would like to remind you that this option already exists in the doc, right after write.parquet.dict-size-bytes, the UI doesn't allow me to leave a comment there, so please expand the collapsed area to see it.

Additionally, I'm kind of curious as to why the default value used this time is significantly larger than the previous one?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for flagging this @zhongyujiang - I'll get the second one below with the older default value removed.

To my understanding the new value is the correct default value that matches the default in the PyArrow ParquetWriter: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html

| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
Expand Down
4 changes: 2 additions & 2 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2197,8 +2197,8 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
row_group_size = property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT,
)

def write_parquet(task: WriteTask) -> DataFile:
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class TableProperties:
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB

PARQUET_ROW_GROUP_LIMIT = "write.parquet.row-group-limit"
PARQUET_ROW_GROUP_LIMIT_DEFAULT = 128 * 1024 * 1024 # 128 MB
PARQUET_ROW_GROUP_LIMIT_DEFAULT = 1048576

PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes"
PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024 # 1 MB
Expand Down
35 changes: 35 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
BinaryType,
BooleanType,
IntegerType,
LongType,
NestedField,
StringType,
TimestampType,
Expand Down Expand Up @@ -670,6 +671,40 @@ def another_task() -> None:


@pytest.mark.integration
def test_configure_row_group_batch_size(session_catalog: Catalog) -> None:
from pyiceberg.table import TableProperties

table_name = "default.test_small_row_groups"
try:
session_catalog.drop_table(table_name)
except NoSuchTableError:
pass # Just to make sure that the table doesn't exist

tbl = session_catalog.create_table(
table_name,
Schema(
NestedField(1, "number", LongType()),
),
properties={TableProperties.PARQUET_ROW_GROUP_LIMIT: "1"},
)

# Write 10 row groups, that should end up as 10 batches
entries = 10
tbl.append(
pa.Table.from_pylist(
[
{
"number": number,
}
for number in range(entries)
],
)
)

batches = list(tbl.scan().to_arrow_batch_reader())
assert len(batches) == entries


@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_table_scan_default_to_large_types(catalog: Catalog) -> None:
identifier = "default.test_table_scan_default_to_large_types"
Expand Down