Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Writing with large arrow types in MERGE #1753

Closed
ion-elgreco opened this issue Oct 22, 2023 · 0 comments · Fixed by #1720
Closed

Writing with large arrow types in MERGE #1753

ion-elgreco opened this issue Oct 22, 2023 · 0 comments · Fixed by #1720
Labels
enhancement New feature or request

Comments

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Oct 22, 2023

Description

It seems it's not possible to write with large arrow types in .merge() yet, however there is write support with write_deltalake(large_types=True), we should add this also in merge.

Use Case
Converting Polars dataframes to arrow and then merging immediately instead of casting to normal arrow types which may not fit if the arrays are too large.

DeltaError: Generic DeltaTable error: Execution error: Fail to build join indices in NestedLoopJoinExec, error:Arrow error: Invalid argument error: Invalid comparison operation: LargeUtf8 == Utf8
@ion-elgreco ion-elgreco added the enhancement New feature or request label Oct 22, 2023
Blajda added a commit that referenced this issue Nov 19, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances #850
- closes #1790 
- closes #1753
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this issue Nov 20, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this issue Nov 20, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this issue Nov 20, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
ion-elgreco added a commit that referenced this issue Nov 24, 2023
…iter/merge (#1820)

# Description
This ports some functionality that @stinodego and I had worked on in
Polars. Where we converted a pyarrow schema to a compatible delta
schema. It converts the following:

- uint -> int
- timestamp(any timeunit) -> timestamp(us) 

I adjusted the functionality to do schema conversion from large to
normal when necessary, which is still needed in MERGE as workaround
#1753.

Additional things I've added:

- Schema conversion for every input in write_deltalake/merge
- Add Pandas dataframe conversion
- Add Pandas dataframe as input in merge


# Related Issue(s)
- closes #686
- closes #1467

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
ion-elgreco added a commit to ion-elgreco/delta-rs that referenced this issue Nov 25, 2023
…iter/merge (delta-io#1820)

This ports some functionality that @stinodego and I had worked on in
Polars. Where we converted a pyarrow schema to a compatible delta
schema. It converts the following:

- uint -> int
- timestamp(any timeunit) -> timestamp(us)

I adjusted the functionality to do schema conversion from large to
normal when necessary, which is still needed in MERGE as workaround
delta-io#1753.

Additional things I've added:

- Schema conversion for every input in write_deltalake/merge
- Add Pandas dataframe conversion
- Add Pandas dataframe as input in merge

- closes delta-io#686
- closes delta-io#1467

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant