Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controls row groups and empty tables #1782

Merged
merged 8 commits into from
Sep 8, 2024

Conversation

rudolfix
Copy link
Collaborator

@rudolfix rudolfix commented Sep 3, 2024

Description

See commit list and docs

@rudolfix rudolfix added the bug Something isn't working label Sep 3, 2024
@rudolfix rudolfix requested a review from sh-rp September 3, 2024 12:32
@rudolfix rudolfix self-assigned this Sep 3, 2024
Copy link

netlify bot commented Sep 3, 2024

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit f31e686
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/66de080d5cc9ab0008aa8bc6

Copy link
Collaborator

@willi-mueller willi-mueller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewing the docs first to make sure I understood the topic.

dlt/common/data_writers/buffered.py Outdated Show resolved Hide resolved
docs/website/docs/dlt-ecosystem/file-formats/parquet.md Outdated Show resolved Hide resolved
docs/website/docs/dlt-ecosystem/file-formats/parquet.md Outdated Show resolved Hide resolved
docs/website/docs/dlt-ecosystem/file-formats/parquet.md Outdated Show resolved Hide resolved
docs/website/docs/dlt-ecosystem/file-formats/parquet.md Outdated Show resolved Hide resolved
```
Mind that we must hold the tables in memory. 1 000 000 rows in example above may take quite large amount of it.

`row_group_size` has limited utility with `pyarrow` writer. It will split large tables into many groups if set below item buffer size.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentence is not clear to me yet. Is the instruction to the user something like the following?

Suggested change
`row_group_size` has limited utility with `pyarrow` writer. It will split large tables into many groups if set below item buffer size.
For the `pyarrow` parquet writer, ensure to have`row_group_size >= buffer_max_items`. Otherwise, your destination might have more row groups than optimal.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohhh so actually the reverse is true. row_group_size < buffer_max_item to have any effect. this is the core of the problem I'm fixing here. pyarrow will create row group of size of parquet table being written or smaller. btw. other, well desgined implementations allow to write batches to the same groups. not here

rudolfix and others added 5 commits September 3, 2024 16:14
Co-authored-by: Willi Müller <willi.mueller@posteo.de>
Co-authored-by: Willi Müller <willi.mueller@posteo.de>
Co-authored-by: Willi Müller <willi.mueller@posteo.de>
Co-authored-by: Willi Müller <willi.mueller@posteo.de>
Co-authored-by: Willi Müller <willi.mueller@posteo.de>
Copy link
Collaborator

@willi-mueller willi-mueller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great fix! I put some ideas that would help me read the code more easily and docs refactoring.
Just one blocking comment on a naming. Otherwise fine!

Comment on lines 120 to 125
# flush if max buffer exceeded, the second path of the expression prevents empty data frames to pile up in the buffer
if (
self._buffered_items_count >= self.buffer_max_items
or len(self._buffered_items) >= self.buffer_max_items
):
self._flush_items()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we eliminate the comment by refactoring it to a method?

Suggested change
# flush if max buffer exceeded, the second path of the expression prevents empty data frames to pile up in the buffer
if (
self._buffered_items_count >= self.buffer_max_items
or len(self._buffered_items) >= self.buffer_max_items
):
self._flush_items()
self.flush_if_max_buffer_exceeded()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, _update_row_count(item) might be neat.

```
Mind that `dlt` holds the tables in memory. Thus, 1,000,000 rows in the example above may consume a significant amount of RAM.

`row_group_size` has limited utility with `pyarrow` writer. It will split large tables into many groups if set below item buffer size.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it now, thanks! Maybe we can give a recommendation like this:

Suggested change
`row_group_size` has limited utility with `pyarrow` writer. It will split large tables into many groups if set below item buffer size.
Setting `row_group_size` has limited utility with the `pyarrow` parquet writer because large source tables can end up fragmented into too many groups.
Thus, we recommend setting `row_group_size < buffer_max_items` only when the write_disposition is `"replace"`.
For all other write dispositions, we recommend the default `row_group_size` to avoid fragmentation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row groups do not map to write dispositions like this... I think this is only relevant to advanced users that optimize their parquet files for a particular query engine...

elif isinstance(row, pyarrow.RecordBatch):
self.writer.write_batch(row, row_group_size=self.parquet_row_group_size)
self.items_count += row.num_rows
if isinstance(row, pyarrow.RecordBatch):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking: I find this surprising How can a row be a RecordBatch or Table? How can a row have num_rows?
Could we call it item like in the docs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right! this is how this class evolved, we started with lists of values to insert and now we deal with tables and other objects. I can rename to items and type properly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full typing requires using Generic classes, which is a good idea but we have no time to do it now

Comment on lines 479 to 499
# concat batches and tables into a single one, preserving order
# pyarrow writer starts a row group for each item it writes (even with 0 rows)
# it also converts batches into tables internally. by creating a single table
# we allow the user rudimentary control over row group size via max buffered items
batches = []
tables = []
for row in rows:
if not self.writer:
self.writer = self._create_writer(row.schema)
if isinstance(row, pyarrow.Table):
self.writer.write_table(row, row_group_size=self.parquet_row_group_size)
elif isinstance(row, pyarrow.RecordBatch):
self.writer.write_batch(row, row_group_size=self.parquet_row_group_size)
self.items_count += row.num_rows
if isinstance(row, pyarrow.RecordBatch):
batches.append(row)
elif isinstance(row, pyarrow.Table):
if batches:
tables.append(pyarrow.Table.from_batches(batches))
batches = []
tables.append(row)
else:
raise ValueError(f"Unsupported type {type(row)}")
# count rows that got written
self.items_count += row.num_rows
if batches:
tables.append(pyarrow.Table.from_batches(batches))

table = pyarrow.concat_tables(tables, promote_options="none")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would find it easier to understand if we extract this into a method: self._concat_items(items)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, moved that to libs

dlt/common/data_writers/writers.py Outdated Show resolved Hide resolved
@rudolfix
Copy link
Collaborator Author

rudolfix commented Sep 8, 2024

@willi-mueller I fixed most of the suggested code changes and some docs changes... I think we are good for now. thanks, good review :)

@rudolfix rudolfix force-pushed the feat/controls-row-groups-and-empty-tables branch from 1d54e1f to f31e686 Compare September 8, 2024 20:24
@rudolfix rudolfix merged commit 84f9fa7 into devel Sep 8, 2024
49 of 57 checks passed
@rudolfix rudolfix deleted the feat/controls-row-groups-and-empty-tables branch September 8, 2024 21:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants