Skip to content

Commit

Permalink
Global join selection (#183)
Browse files Browse the repository at this point in the history
* Feature/determine prunability (#139)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

* Determine prunability of tables for join operations (#90)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* fix the tables' unboundedness

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* [GITHUB ACTION] Refactor for license and actions (#148)

* Delete datafusion main publication

* Adding licence information, refactoring prunibility issues

* Update SYNNADA-CONTRIBUTIONS.txt

* Update rat_exclude_files.txt

* Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132)

* Very initial test passing algorithm

* Working except a minor bug in interval calculations

* After clippy

* Plan

* initial implemantation

* Before prune check ability is added.

Order equivalence implementations will vanish after we send a seperate PR

* minor changes

* Fix bug, ordering equivalence random head

* minor changes

* Add ordering equivalence for sort merge join

* Improvement on tests

* Upstream changes

* Add ordering equivalence for sort merge join

* Fmt issues

* Update comment

* Add ordering equivalence support for hash join

* Make 1 file

* Code enhancements/comment improvements

* Add projection cast handling

* Fix output ordering for sort merge join

* projection bug fix

* Minor changes

* minor changes

* simplify sort_merge_join

* Update equivalence implementation

* Update test_utils.rs

* Update cast implementation

* More idiomatic code

* After merge

* Comments visisted

* Add key swap according to the children orders

* Refactoring

* After merge refactor

* Update sort_enforcement.rs

* Update datafusion/core/src/physical_optimizer/join_selection.rs

Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>

* Comments are applied

* Feature/determine prunability (#139)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

* Determine prunability of tables for join operations (#90)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* fix the tables' unboundedness

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* Comment improvements and minor code improvements

* Splitting the order based join selection

* Update rat_exclude_files.txt

* Revert "Feature/determine prunability (#139)"

This reverts commit cf56105.

* Commented

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>
Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>

* Bug fix: Fix lexicographical column search among provided ordering (#156)

* Initial comments on working

* License update (#157)

This extension adds Synnada license information to the existing one.

* Adding comments

* Update sort_hash_join.rs

* After merge silent error

* Change the query in HashJoin

* Revert "Feature/determine prunability (#139)"

This reverts commit cf56105.

* Update rat_exclude_files.txt

* Clippy solving.

* [GITHUB ACTION] Refactor for license and actions (#148)

* Delete datafusion main publication

* Adding licence information, refactoring prunibility issues

* Update SYNNADA-CONTRIBUTIONS.txt

* Update rat_exclude_files.txt

* Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132)

* Very initial test passing algorithm

* Working except a minor bug in interval calculations

* After clippy

* Plan

* initial implemantation

* Before prune check ability is added.

Order equivalence implementations will vanish after we send a seperate PR

* minor changes

* Fix bug, ordering equivalence random head

* minor changes

* Add ordering equivalence for sort merge join

* Improvement on tests

* Upstream changes

* Add ordering equivalence for sort merge join

* Fmt issues

* Update comment

* Add ordering equivalence support for hash join

* Make 1 file

* Code enhancements/comment improvements

* Add projection cast handling

* Fix output ordering for sort merge join

* projection bug fix

* Minor changes

* minor changes

* simplify sort_merge_join

* Update equivalence implementation

* Update test_utils.rs

* Update cast implementation

* More idiomatic code

* After merge

* Comments visisted

* Add key swap according to the children orders

* Refactoring

* After merge refactor

* Update sort_enforcement.rs

* Update datafusion/core/src/physical_optimizer/join_selection.rs

Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>

* Comments are applied

* Feature/determine prunability (#139)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

* Determine prunability of tables for join operations (#90)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* fix the tables' unboundedness

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* Comment improvements and minor code improvements

* Splitting the order based join selection

* Update rat_exclude_files.txt

* Revert "Feature/determine prunability (#139)"

This reverts commit cf56105.

* Commented

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>
Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>

* Bug fix: Fix lexicographical column search among provided ordering (#156)

* License update (#157)

This extension adds Synnada license information to the existing one.

* [GITHUB ACTION] Refactor for license and actions (#148)

* Delete datafusion main publication

* Adding licence information, refactoring prunibility issues

* Update SYNNADA-CONTRIBUTIONS.txt

* Update rat_exclude_files.txt

* Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132)

* Very initial test passing algorithm

* Working except a minor bug in interval calculations

* After clippy

* Plan

* initial implemantation

* Before prune check ability is added.

Order equivalence implementations will vanish after we send a seperate PR

* minor changes

* Fix bug, ordering equivalence random head

* minor changes

* Add ordering equivalence for sort merge join

* Improvement on tests

* Upstream changes

* Add ordering equivalence for sort merge join

* Fmt issues

* Update comment

* Add ordering equivalence support for hash join

* Make 1 file

* Code enhancements/comment improvements

* Add projection cast handling

* Fix output ordering for sort merge join

* projection bug fix

* Minor changes

* minor changes

* simplify sort_merge_join

* Update equivalence implementation

* Update test_utils.rs

* Update cast implementation

* More idiomatic code

* After merge

* Comments visisted

* Add key swap according to the children orders

* Refactoring

* After merge refactor

* Update sort_enforcement.rs

* Update datafusion/core/src/physical_optimizer/join_selection.rs

Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>

* Comments are applied

* Feature/determine prunability (#139)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

* Determine prunability of tables for join operations (#90)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* fix the tables' unboundedness

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* Comment improvements and minor code improvements

* Splitting the order based join selection

* Update rat_exclude_files.txt

* Revert "Feature/determine prunability (#139)"

This reverts commit cf56105.

* Commented

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>
Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>

* Bug fix: Fix lexicographical column search among provided ordering (#156)

* License update (#157)

This extension adds Synnada license information to the existing one.

* Sliding Nested Join Algorithm (#142)

* Sliding Hash Join Algorithm (SWHJ) (#147)

* Fix errors introduced during rebase

* [GITHUB ACTION] Refactor for license and actions (#148)

* Delete datafusion main publication

* Adding licence information, refactoring prunibility issues

* Update SYNNADA-CONTRIBUTIONS.txt

* Update rat_exclude_files.txt

* Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132)

* Very initial test passing algorithm

* Working except a minor bug in interval calculations

* After clippy

* Plan

* initial implemantation

* Before prune check ability is added.

Order equivalence implementations will vanish after we send a seperate PR

* minor changes

* Fix bug, ordering equivalence random head

* minor changes

* Add ordering equivalence for sort merge join

* Improvement on tests

* Upstream changes

* Add ordering equivalence for sort merge join

* Fmt issues

* Update comment

* Add ordering equivalence support for hash join

* Make 1 file

* Code enhancements/comment improvements

* Add projection cast handling

* Fix output ordering for sort merge join

* projection bug fix

* Minor changes

* minor changes

* simplify sort_merge_join

* Update equivalence implementation

* Update test_utils.rs

* Update cast implementation

* More idiomatic code

* After merge

* Comments visisted

* Add key swap according to the children orders

* Refactoring

* After merge refactor

* Update sort_enforcement.rs

* Update datafusion/core/src/physical_optimizer/join_selection.rs

Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>

* Comments are applied

* Feature/determine prunability (#139)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

* Determine prunability of tables for join operations (#90)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* fix the tables' unboundedness

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* Comment improvements and minor code improvements

* Splitting the order based join selection

* Update rat_exclude_files.txt

* Revert "Feature/determine prunability (#139)"

This reverts commit cf56105.

* Commented

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>
Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>

* Bug fix: Fix lexicographical column search among provided ordering (#156)

* License update (#157)

This extension adds Synnada license information to the existing one.

* Sliding Nested Join Algorithm (#142)

* Sliding Hash Join Algorithm (SWHJ) (#147)

* Fix errors introduced during rebase

* Keep Track of Global Ordering Requirement (#165)

* Prunability of Join Filter Physical Expressions (#161)

* BinaryExpr Equivalence (#116)

* Fix errors introduced during rebase

* Support multiple ordered columns on joins and expression graph (#163)

* After merge

* SlidingHashJoin and SlidingNestedLoopJoin planner integration (#171)

* Before clippy fmt etc.

* lazy loading tables

* mini test

* [GITHUB ACTION] Refactor for license and actions (#148)

* Delete datafusion main publication

* Adding licence information, refactoring prunibility issues

* Update SYNNADA-CONTRIBUTIONS.txt

* Update rat_exclude_files.txt

* Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132)

* Very initial test passing algorithm

* Working except a minor bug in interval calculations

* After clippy

* Plan

* initial implemantation

* Before prune check ability is added.

Order equivalence implementations will vanish after we send a seperate PR

* minor changes

* Fix bug, ordering equivalence random head

* minor changes

* Add ordering equivalence for sort merge join

* Improvement on tests

* Upstream changes

* Add ordering equivalence for sort merge join

* Fmt issues

* Update comment

* Add ordering equivalence support for hash join

* Make 1 file

* Code enhancements/comment improvements

* Add projection cast handling

* Fix output ordering for sort merge join

* projection bug fix

* Minor changes

* minor changes

* simplify sort_merge_join

* Update equivalence implementation

* Update test_utils.rs

* Update cast implementation

* More idiomatic code

* After merge

* Comments visisted

* Add key swap according to the children orders

* Refactoring

* After merge refactor

* Update sort_enforcement.rs

* Update datafusion/core/src/physical_optimizer/join_selection.rs

Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>

* Comments are applied

* Feature/determine prunability (#139)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

* Determine prunability of tables for join operations (#90)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* fix the tables' unboundedness

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* Comment improvements and minor code improvements

* Splitting the order based join selection

* Update rat_exclude_files.txt

* Revert "Feature/determine prunability (#139)"

This reverts commit cf56105.

* Commented

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>
Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>

* Bug fix: Fix lexicographical column search among provided ordering (#156)

* License update (#157)

This extension adds Synnada license information to the existing one.

* Sliding Nested Join Algorithm (#142)

* Sliding Hash Join Algorithm (SWHJ) (#147)

* Fix errors introduced during rebase

* Keep Track of Global Ordering Requirement (#165)

* Prunability of Join Filter Physical Expressions (#161)

* BinaryExpr Equivalence (#116)

* Fix errors introduced during rebase

* Support multiple ordered columns on joins and expression graph (#163)

* SlidingHashJoin and SlidingNestedLoopJoin planner integration (#171)

* Add license, add contribution hash commits, minor changes

* Before rebase merge

* Add license, add contribution hash commits, minor changes, add scripts to automate hash generation,Delete docs yaml file

* Update utils.rs

* Print deletion

* Update Cargo.lock

* Refactor for review

* Working without slt

* [GITHUB ACTION] Refactor for license and actions (#148)

* Delete datafusion main publication

* Adding licence information, refactoring prunibility issues

* Update SYNNADA-CONTRIBUTIONS.txt

* Update rat_exclude_files.txt

* Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132)

* Very initial test passing algorithm

* Working except a minor bug in interval calculations

* After clippy

* Plan

* initial implemantation

* Before prune check ability is added.

Order equivalence implementations will vanish after we send a seperate PR

* minor changes

* Fix bug, ordering equivalence random head

* minor changes

* Add ordering equivalence for sort merge join

* Improvement on tests

* Upstream changes

* Add ordering equivalence for sort merge join

* Fmt issues

* Update comment

* Add ordering equivalence support for hash join

* Make 1 file

* Code enhancements/comment improvements

* Add projection cast handling

* Fix output ordering for sort merge join

* projection bug fix

* Minor changes

* minor changes

* simplify sort_merge_join

* Update equivalence implementation

* Update test_utils.rs

* Update cast implementation

* More idiomatic code

* After merge

* Comments visisted

* Add key swap according to the children orders

* Refactoring

* After merge refactor

* Update sort_enforcement.rs

* Update datafusion/core/src/physical_optimizer/join_selection.rs

Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>

* Comments are applied

* Feature/determine prunability (#139)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

* Determine prunability of tables for join operations (#90)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* fix the tables' unboundedness

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* Comment improvements and minor code improvements

* Splitting the order based join selection

* Update rat_exclude_files.txt

* Revert "Feature/determine prunability (#139)"

This reverts commit cf56105.

* Commented

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>
Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>

* Bug fix: Fix lexicographical column search among provided ordering (#156)

* License update (#157)

This extension adds Synnada license information to the existing one.

* Sliding Nested Join Algorithm (#142)

* Sliding Hash Join Algorithm (SWHJ) (#147)

* Fix errors introduced during rebase

* Keep Track of Global Ordering Requirement (#165)

* Prunability of Join Filter Physical Expressions (#161)

* BinaryExpr Equivalence (#116)

* Fix errors introduced during rebase

* Support multiple ordered columns on joins and expression graph (#163)

* SlidingHashJoin and SlidingNestedLoopJoin planner integration (#171)

* Add license, add contribution hash commits, minor changes, add scripts to automate hash generation,Delete docs yaml file

* Change in test folders

* Update join_pipeline_selection.rs

* Update utils.rs

* Before clippy

* Before SLT

* Tests are passing and clippy OK.

* [GITHUB ACTION] Refactor for license and actions (#148)

* Delete datafusion main publication

* Adding licence information, refactoring prunibility issues

* Update SYNNADA-CONTRIBUTIONS.txt

* Update rat_exclude_files.txt

* Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132)

* Very initial test passing algorithm

* Working except a minor bug in interval calculations

* After clippy

* Plan

* initial implemantation

* Before prune check ability is added.

Order equivalence implementations will vanish after we send a seperate PR

* minor changes

* Fix bug, ordering equivalence random head

* minor changes

* Add ordering equivalence for sort merge join

* Improvement on tests

* Upstream changes

* Add ordering equivalence for sort merge join

* Fmt issues

* Update comment

* Add ordering equivalence support for hash join

* Make 1 file

* Code enhancements/comment improvements

* Add projection cast handling

* Fix output ordering for sort merge join

* projection bug fix

* Minor changes

* minor changes

* simplify sort_merge_join

* Update equivalence implementation

* Update test_utils.rs

* Update cast implementation

* More idiomatic code

* After merge

* Comments visisted

* Add key swap according to the children orders

* Refactoring

* After merge refactor

* Update sort_enforcement.rs

* Update datafusion/core/src/physical_optimizer/join_selection.rs

Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>

* Comments are applied

* Feature/determine prunability (#139)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

* Determine prunability of tables for join operations (#90)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* fix the tables' unboundedness

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>

* Comment improvements and minor code improvements

* Splitting the order based join selection

* Update rat_exclude_files.txt

* Revert "Feature/determine prunability (#139)"

This reverts commit cf56105.

* Commented

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>
Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>

* Bug fix: Fix lexicographical column search among provided ordering (#156)

* License update (#157)

This extension adds Synnada license information to the existing one.

* Sliding Nested Join Algorithm (#142)

* Sliding Hash Join Algorithm (SWHJ) (#147)

* Fix errors introduced during rebase

* Keep Track of Global Ordering Requirement (#165)

* Prunability of Join Filter Physical Expressions (#161)

* BinaryExpr Equivalence (#116)

* Fix errors introduced during rebase

* Support multiple ordered columns on joins and expression graph (#163)

* SlidingHashJoin and SlidingNestedLoopJoin planner integration (#171)

* Add license, add contribution hash commits, minor changes, add scripts to automate hash generation,Delete docs yaml file

* Resolve errors introduced during rebase

* After merge

* Update rat_exclude_files.txt

* Comments visited

* Synnada Streaming SQL Tests (#190)

* Adds a new method to construct window function for the given input

* For mustafa

* Final

* Update rat_exclude_files.txt

* More commenting

* Fix linter errors, compile errors after rebase, Update commit hashes

* After merge refactors

* Dir

* Additional test for coverage

* Update join_disable_repartition_joins.slt

* Review changes, remove code duplicates

* Update subdirectory hashes

---------

Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>
Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>
  • Loading branch information
5 people committed Dec 15, 2023
1 parent 6156c3c commit 8c433af
Show file tree
Hide file tree
Showing 17 changed files with 5,205 additions and 1,203 deletions.
3,386 changes: 2,623 additions & 763 deletions datafusion/core/src/physical_optimizer/join_pipeline_selection.rs

Large diffs are not rendered by default.

380 changes: 119 additions & 261 deletions datafusion/core/src/physical_optimizer/join_selection.rs

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,25 @@ pub fn prunable_filter(left_index: ColumnIndex, right_index: ColumnIndex) -> Joi
);
JoinFilter::new(filter_expr, column_indices, intermediate_schema)
}
pub fn partial_prunable_filter(
left_index: ColumnIndex,
right_index: ColumnIndex,
) -> JoinFilter {
// Filter columns, ensure first batches will have matching rows.
let intermediate_schema = Schema::new(vec![
Field::new("0", DataType::Int32, true),
Field::new("1", DataType::Int32, true),
]);
let column_indices = vec![left_index, right_index];
let filter_expr = Arc::new(BinaryExpr::new(
col("0", &intermediate_schema).unwrap(),
Operator::Gt,
col("1", &intermediate_schema).unwrap(),
));

JoinFilter::new(filter_expr, column_indices, intermediate_schema)
}

pub fn not_prunable_filter(
left_index: ColumnIndex,
right_index: ColumnIndex,
Expand Down
14 changes: 13 additions & 1 deletion datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;

use crate::error::Result;
use crate::physical_plan::aggregates::AggregateExec;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::joins::{HashJoinExec, NestedLoopJoinExec};
use crate::physical_plan::joins::{CrossJoinExec, HashJoinExec, NestedLoopJoinExec};
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
Expand Down Expand Up @@ -164,3 +166,13 @@ pub fn is_hash_join(plan: &Arc<dyn ExecutionPlan>) -> bool {
pub fn is_nested_loop_join(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<NestedLoopJoinExec>()
}

/// Checks whether the given operator is a [`CrossJoinExec`].
pub fn is_cross_join(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<CrossJoinExec>()
}

/// Checks whether the given operator is an [`AggregateExec`].
pub fn is_aggregate(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<AggregateExec>()
}
6 changes: 3 additions & 3 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ async fn join_change_in_planner() -> Result<()> {
let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
let expected = {
[
"SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
"SlidingHashJoinExec: join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" SortPreservingRepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" SortPreservingRepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
]
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/joins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
pub use cross_join::CrossJoinExec;
pub use hash_join::HashJoinExec;
pub use nested_loop_join::NestedLoopJoinExec;
pub use partitioned_hash_join::PartitionedHashJoinExec;
pub use sliding_hash_join::{swap_sliding_hash_join, SlidingHashJoinExec};
pub use sliding_nested_loop_join::{
swap_sliding_nested_loop_join, SlidingNestedLoopJoinExec,
Expand All @@ -33,6 +34,7 @@ pub mod utils;
mod cross_join;
mod hash_join;
mod nested_loop_join;
mod partitioned_hash_join;
mod sliding_hash_join;
mod sliding_nested_loop_join;
mod sliding_window_join_utils;
Expand Down
100 changes: 28 additions & 72 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,22 @@ impl ExecutionPlan for NestedLoopJoinExec {
distribution_from_join_type(&self.join_type)
}

fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
let (left, right) = (children[0], children[1]);
if left || right {
plan_err!(
"Join Error: The join with cannot be executed with unbounded inputs. {}",
if left && right {
"Currently, we do not support unbounded inputs on both sides."
} else {
"Please consider a different type of join."
}
)
} else {
Ok(false)
}
}

fn equivalence_properties(&self) -> EquivalenceProperties {
join_equivalence_properties(
self.left.equivalence_properties(),
Expand Down Expand Up @@ -744,21 +760,15 @@ mod tests {
use std::sync::Arc;

use super::*;
use crate::{
common, expressions::Column, memory::MemoryExec, repartition::RepartitionExec,
test::build_table_i32,
};
use crate::joins::test_utils::partitioned_nested_join_with_filter;
use crate::joins::utils::JoinSide;
use crate::{expressions::Column, memory::MemoryExec, repartition::RepartitionExec, test::build_table_i32};

use arrow::datatypes::{DataType, Field};
use datafusion_common::{assert_batches_sorted_eq, assert_contains, ScalarValue};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};

use crate::physical_plan::joins::test_utils::partitioned_nested_join_with_filter;
use crate::joins::utils::JoinSide;
use datafusion_common::{assert_batches_sorted_eq, assert_contains, ScalarValue};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::PhysicalExpr;

fn build_table(
Expand Down Expand Up @@ -831,62 +841,13 @@ mod tests {
JoinFilter::new(filter_expression, column_indices, intermediate_schema)
}

async fn multi_partitioned_join_collect(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
join_type: &JoinType,
join_filter: Option<JoinFilter>,
context: Arc<TaskContext>,
) -> Result<(Vec<String>, Vec<RecordBatch>)> {
let partition_count = 4;
let mut output_partition = 1;
let distribution = distribution_from_join_type(join_type);
// left
let left = if matches!(distribution[0], Distribution::SinglePartition) {
left
} else {
output_partition = partition_count;
Arc::new(RepartitionExec::try_new(
left,
Partitioning::RoundRobinBatch(partition_count),
)?)
} as Arc<dyn ExecutionPlan>;

let right = if matches!(distribution[1], Distribution::SinglePartition) {
right
} else {
output_partition = partition_count;
Arc::new(RepartitionExec::try_new(
right,
Partitioning::RoundRobinBatch(partition_count),
)?)
} as Arc<dyn ExecutionPlan>;

// Use the required distribution for nested loop join to test partition data
let nested_loop_join =
NestedLoopJoinExec::try_new(left, right, join_filter, join_type)?;
let columns = columns(&nested_loop_join.schema());
let mut batches = vec![];
for i in 0..output_partition {
let stream = nested_loop_join.execute(i, context.clone())?;
let more_batches = common::collect(stream).await?;
batches.extend(
more_batches
.into_iter()
.filter(|b| b.num_rows() > 0)
.collect::<Vec<_>>(),
);
}
Ok((columns, batches))
}

#[tokio::test]
async fn join_inner_with_filter() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_left_table();
let right = build_right_table();
let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::Inner,
Expand Down Expand Up @@ -915,7 +876,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::Left,
Expand Down Expand Up @@ -946,7 +907,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::Right,
Expand Down Expand Up @@ -977,7 +938,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::Full,
Expand Down Expand Up @@ -1010,7 +971,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::LeftSemi,
Expand Down Expand Up @@ -1039,7 +1000,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::LeftAnti,
Expand Down Expand Up @@ -1069,7 +1030,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::RightSemi,
Expand Down Expand Up @@ -1098,7 +1059,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::RightAnti,
Expand Down Expand Up @@ -1152,7 +1113,7 @@ mod tests {
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

let err = multi_partitioned_join_collect(
let err = partitioned_nested_join_with_filter(
left.clone(),
right.clone(),
&join_type,
Expand All @@ -1171,9 +1132,4 @@ mod tests {

Ok(())
}

/// Returns the column names on the schema
fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
}
}
Loading

0 comments on commit 8c433af

Please sign in to comment.