-
Notifications
You must be signed in to change notification settings - Fork 430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add support for multiple partition columns and filters in to_pyarrow_dataset() and OR filters in write_datalake() #1722
base: main
Are you sure you want to change the base?
Conversation
… to_pyarrow_dataset() - Partitions with multiple columns can be passed as lists of tuples in DNF format - Multiple partition filters can be passed
- Add tests for various filter/partition scenarios which can be passted to to_pyarrow_dataset()
If you are using polars lazy api, have you considered just writing the filters in Polars instead? They should be pushed down into the PyArrow dataset automatically. |
Yeah, I am doing that currently. I pass the DNF filters between Airflow tasks and convert them into polars filter expressions. When testing earlier I noticed the partitions argument in to_pyarrow_dataset was not working with multiple partitions or multiple columns so it seemed like a reasonable addition. My Airflow tasks do not always read data into polars (only for transformation and removing duplicates), so being able to stick with a single solution is nice. Pass the DNF filters between tasks and return a filtered dataset. Then scan that with polars or duckdb or turn it into a table etc. |
- Tests partition filters based on AND and OR conditions using a single and multiple partition columns
Can you check my latest changes? This is my attempt to tackle #1479 and adding support for overwriting multiple partitions (without touching the Rust code). This allows me to update a dataset with AND and OR filter logic by using lists of lists of DNF tuples, or a list of DNF tuples. The test I added shows a few scenarios. @MrPowers - this is what I was trying to accomplish during our chat on Slack the other day. Basically, I need to be able to overwrite partitions to with deduplicated data. Many of my sources are partitioned by some category and some temporal field, so for example Or some tasks download data based on updated_at timestamps but the data is partitioned by created_date. In this case I might need to overwrite all of the unique created_date partitions in the in-memory data which results in OR filters like |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is making the logic in reading and writing really complicated. Could you pull out the DNF filter parsing into some separate functions? If we can normalize them to the same general type, then we can pass that down into Rust and have it handle that.
python/deltalake/writer.py
Outdated
table._table.create_write_transaction( | ||
filtered_add_actions, | ||
mode, | ||
partition_by or [], | ||
schema, | ||
partition_filter, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we creating a write transaction per filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seemed like the create_write_transaction() was not able to handle a list of lists of tuples (OR conditions), so I had to treat each condition separately and loop through them.
[id_values0-account_values0-created_date_values0-updated_at_values0-value_values0-partition_by0-partition_filters0] TypeError: argument 'partitions_filters': 'list' object cannot be converted to 'PyTuple' [909, 1]
Looping through each filter results in this:
partition_filters=[
[("created_date", "=", "2023-08-25")],
[("created_date", "=", "2023-09-07")],
[("created_date", "=", "2023-09-21")],
],
ic| partition_filter: [('created_date', '=', '2023-08-25')]
ic| filtered_add_actions: [AddAction(path='created_date=2023-08-25/2-a542e805-94c2-4e57-a957-0944117d293d-0.parquet',
size=3715,
partition_values={'created_date': '2023-08-25'},
modification_time=1697321774036,
data_change=True,
stats='{"numRecords": 1, "minValues": {"id": 1, "account": '
'"account_a", "updated_at": "2023-10-14T22:16:13.921783", '
'"value": 44.5}, "maxValues": {"id": 1, "account": '
'"account_a", "updated_at": "2023-10-14T22:16:13.921783", '
'"value": 44.5}, "nullCount": {"id": 0, "account": 0, '
'"updated_at": 0, "value": 0}}')]
ic| 'update_incremental'
ic| partition_filter: [('created_date', '=', '2023-09-07')]
ic| filtered_add_actions: [AddAction(path='created_date=2023-09-07/2-a542e805-94c2-4e57-a957-0944117d293d-0.parquet',
size=3715,
partition_values={'created_date': '2023-09-07'},
modification_time=1697321774034,
data_change=True,
stats='{"numRecords": 1, "minValues": {"id": 3, "account": '
'"account_b", "updated_at": "2023-10-14T22:16:13.921786", '
'"value": 68.0}, "maxValues": {"id": 3, "account": '
'"account_b", "updated_at": "2023-10-14T22:16:13.921786", '
'"value": 68.0}, "nullCount": {"id": 0, "account": 0, '
'"updated_at": 0, "value": 0}}')]
ic| 'update_incremental'
ic| partition_filter: [('created_date', '=', '2023-09-21')]
ic| filtered_add_actions: [AddAction(path='created_date=2023-09-21/2-a542e805-94c2-4e57-a957-0944117d293d-0.parquet',
size=3715,
partition_values={'created_date': '2023-09-21'},
modification_time=1697321774034,
data_change=True,
stats='{"numRecords": 1, "minValues": {"id": 4, "account": '
'"account_a", "updated_at": "2023-10-14T22:16:13.921786", '
'"value": 11.5}, "maxValues": {"id": 4, "account": '
'"account_a", "updated_at": "2023-10-14T22:16:13.921786", '
'"value": 11.5}, "nullCount": {"id": 0, "account": 0, '
'"updated_at": 0, "value": 0}}')]
ic| 'update_incremental'
If we use a single list of tuples (AND condition) like the example below, there is still only one call to table.update_incremental()
.
ic| partition_filter: [('created_date', '>', '2023-08-01'), ('created_date', '<', '2023-12-31')]
ic| filtered_add_actions: [AddAction(path='created_date=2023-08-25/account=account_a/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
size=3383,
partition_values={'account': 'account_a',
'created_date': '2023-08-25'},
modification_time=1697322195775,
data_change=True,
stats='{"numRecords": 1, "minValues": {"id": 1, "updated_at": '
'"2023-10-14T22:23:15.400092", "value": 0.1}, "maxValues": '
'{"id": 1, "updated_at": "2023-10-14T22:23:15.400092", '
'"value": 0.1}, "nullCount": {"id": 0, "updated_at": 0, '
'"value": 0}}'),
AddAction(path='created_date=2023-09-05/account=account_b/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
size=3383,
partition_values={'account': 'account_b',
'created_date': '2023-09-05'},
modification_time=1697322195778,
data_change=True,
stats='{"numRecords": 1, "minValues": {"id": 2, "updated_at": '
'"2023-10-14T22:23:15.400093", "value": 0.2}, "maxValues": '
'{"id": 2, "updated_at": "2023-10-14T22:23:15.400093", '
'"value": 0.2}, "nullCount": {"id": 0, "updated_at": 0, '
'"value": 0}}'),
AddAction(path='created_date=2023-10-02/account=account_b/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
size=3383,
partition_values={'account': 'account_b',
'created_date': '2023-10-02'},
modification_time=1697322195780,
data_change=True,
stats='{"numRecords": 1, "minValues": {"id": 5, "updated_at": '
'"2023-10-14T22:23:15.400093", "value": 0.5}, "maxValues": '
'{"id": 5, "updated_at": "2023-10-14T22:23:15.400093", '
'"value": 0.5}, "nullCount": {"id": 0, "updated_at": 0, '
'"value": 0}}'),
AddAction(path='created_date=2023-09-07/account=account_a/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
size=3383,
partition_values={'account': 'account_a',
'created_date': '2023-09-07'},
modification_time=1697322195780,
data_change=True,
stats='{"numRecords": 1, "minValues": {"id": 3, "updated_at": '
'"2023-10-14T22:23:15.400093", "value": 0.3}, "maxValues": '
'{"id": 3, "updated_at": "2023-10-14T22:23:15.400093", '
'"value": 0.3}, "nullCount": {"id": 0, "updated_at": 0, '
'"value": 0}}'),
AddAction(path='created_date=2023-09-21/account=account_c/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
size=3383,
partition_values={'account': 'account_c',
'created_date': '2023-09-21'},
modification_time=1697322195781,
data_change=True,
stats='{"numRecords": 1, "minValues": {"id": 4, "updated_at": '
'"2023-10-14T22:23:15.400093", "value": 0.4}, "maxValues": '
'{"id": 4, "updated_at": "2023-10-14T22:23:15.400093", '
'"value": 0.4}, "nullCount": {"id": 0, "updated_at": 0, '
'"value": 0}}')]
ic| 'update_incremental'
- validate_filters ensures partitions and filters are in DNF format (list of tuples, list of lists of tuples) and checks for empty lists - stringify_partition_values ensures values are converted from dates, ints, etc to string for partition columns
- Use pyarrow.parquet filters_to_expression instead of the custom implementation - Move __stringify_partition_values to _util to be able to test more easily - Move partition validation to validate_filters function - Move fragment building to separate method
Agree. I was keeping the logic within the existing functions to not cause too many changes.
I still need add the validate_filters() to the writer.py file as well. I did not touch that yet. |
- validated_filters is guaranteed to be a list of list of tuples
- Shows that the output will still be a list of lists of tuples
The validate_filters() function I added ensures that the partition_filters is a list of list of tuples now, so I made updates to accomodate for that. I was not able to avoid looping through each OR condition. I think that is all I can do on the Python side for now and I am not familar with Rust. I will take a look at the
If that change is made, then further simplification can be done with the Python code. |
Description
This provides support for passing lists of DNF tuples to the to_pyarrow_dataset() method. I didn't want to touch the
dataset_partitions()
Rust code, so this can probably be further simplified and improved. My use case was to be able to create a pyarrow dataset from the DeltaTable and then use polars.scan_pyarrow_dataset().Related Issue(s)
Documentation
Here are some examples (which are part of the pytests as well):
Lists of lists of tuples in DNF format will create OR expressions:
A single list of tuples will create an AND expression: