Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recursive CTEs: Stage 3 - add execution support #8840

Merged

Conversation

matthewgapp
Copy link
Contributor

@matthewgapp matthewgapp commented Jan 12, 2024

Adds execution support to recursive CTEs based on a worktable physical plan that mirrors the previous iteration's results back into the recursive query's execution plan.

Part of #462

Todos

  • Fill out docs and cleanup.
  • Add test that contians GROUPBY within the recursive CTE
  • Add test that contains WindowFunction within the recursive CTE
  • Add test with EXPLAIN explain plans of these queries to show how they work
  • Add tests that produce more than one batch on each iteration (these tests make batches much smaller than the default 8K rows). Maybe by setting target_batch_size to 3 or something.

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Jan 12, 2024
time,
1 as "val"
FROM
(SELECT DISTINCT "time" FROM "beg_account_balance")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails. But the following changes cause the test to pass

  • removing time or 1 as "val" from the sub_cte projection
  • removing the reference to beg_account_balance in the FROM clause (effectively removing the sub-query from the sub_cte)
  • removing the FULL JOIN "sub_cte" on 1=1

The test fails with index out of bounds: the len is 0 but the index is 0 but other incarnations of the error after playing around with the repro were

Internal error: PhysicalExpr Column references column 'time' at index 0 (zero-based) but input schema only has 0 columns: [].
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
[SQL] WITH RECURSIVE "recursive_cte" AS 
External error: query failed: DataFusion error: ProjectionPushdown
caused by
Internal error: PhysicalExpr Column references column 'time' at index 0 (zero-based) but input schema only has 0 columns: [].
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
[SQL] WITH RECURSIVE "recursive_cte" AS (
External error: query failed: DataFusion error: ProjectionPushdown
caused by
Internal error: PhysicalExpr Column references column 'LEAD(beg_account_balance.time,Int64(1)) ORDER BY [beg_account_balance.time ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW' at index 1 (zero-based) but input schema only has 1 columns: ["LEAD(beg_account_balance.time,Int64(1)) ORDER BY [beg_account_balance.time ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"].
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
[SQL] WITH RECURSIVE "recursive_cte" AS (

Note that the columns referenced in the error messages are different from the repro because the repro at the time that I collected those errors was different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same PR but based off of 393e48f98872c696a90fce033fa584533d2326fa on Nov 18th works but based on latest main, it fails.

@alamb or @jonahgao, does this ring any bell for you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like an empty schema is being used somewhere (len is zero). Maybe there is a bug in the output schema calculations or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After taking a quick pass at PRs that touched projection_pushdown.rs, I'm still stuck. Here's the log of my investigation. Guess I'll have to buckle up and start debugging 🙃

Tried to revert https://github.com/apache/arrow-datafusion/pull/8662/files but couldn’t get code to compile so gave up which is a shame since that PR looks the most relevant
Reverted #8573 but didn’t fix
Reverted #8454 but didn’t fix
Reverted #8485 but didn’t fix
Didn’t try to revert https://github.com/apache/arrow-datafusion/pull/8327/files because it doesn’t look relevant

Copy link
Contributor Author

@matthewgapp matthewgapp Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed min repro in this commit ea4cb6e. But still breaking in our downstream product so I'll add the fuller test case and work to fix against that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't seem to reproduce outside of our project just yet. Getting the following panic

The application panicked (crashed).
Message:  primitive array
Location: /Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-array-49.0.0/src/cast.rs:751

  ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ BACKTRACE ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
                                ⋮ 15 frames hidden ⋮                              
  16: core::panicking::panic_display::hbefe501d317ed1d7
      at /rustc/e51e98dde6a60637b6a71b8105245b629ac3fe77/library/core/src/panicking.rs:196
  17: core::panicking::panic_str::h9863c42b2d46e0f3
      at /rustc/e51e98dde6a60637b6a71b8105245b629ac3fe77/library/core/src/panicking.rs:171
  18: core::option::expect_failed::h4e9eb510dd8145dd
      at /rustc/e51e98dde6a60637b6a71b8105245b629ac3fe77/library/core/src/option.rs:1980
  19: core::option::Option<T>::expect::hec9b720ec03fc4f9
      at /rustc/e51e98dde6a60637b6a71b8105245b629ac3fe77/library/core/src/option.rs:894
  20: arrow_array::cast::AsArray::as_primitive::h493f40e8665dba69
      at /Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-array-49.0.0/src/cast.rs:751
       749/// Downcast this to a [`PrimitiveArray`] panicking if not possible
       750fn as_primitive<T: ArrowPrimitiveType>(&self) -> &PrimitiveArray<T> {
       751 >         self.as_primitive_opt().expect("primitive array")
       752}
       75321: <datafusion_physical_plan::aggregates::group_values::primitive::GroupValuesPrimitive<T> as datafusion_physical_plan::aggregates::group_values::GroupValues>::intern::h6edcb0cb053f37df
      at /Users/matthewgapp/code/forked/arrow-datafusion/datafusion/physical-plan/src/aggregates/group_values/primitive.rs:116
       114groups.clear();
       115116 >         for v in cols[0].as_primitive::<T>() {
       117let group_id = match v {
       118None => *self.null_group.get_or_insert_with(|| {
  22: datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream::group_aggregate_batch::h941a0f4d623d2cc4
      at /Users/matthewgapp/code/forked/arrow-datafusion/datafusion/physical-plan/src/aggregates/row_hash.rs:548
       546// calculate the group indices for each input row
       547let starting_num_groups = self.group_values.len();
       548 >             self.group_values
       549.intern(group_values, &mut self.current_group_indices)?;
       550let group_indices = &self.current_group_indices;
  23: <datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as futures_core::stream::Stream>::poll_next::h778e63c29afa7d81
      at /Users/matthewgapp/code/forked/arrow-datafusion/datafusion/physical-plan/src/aggregates/row_hash.rs:438
       436437// Do the grouping
       438 >                             extract_ok!(self.group_aggregate_batch(batch));
       439440// If we can begin emitting rows, do so,
  24: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next::h13c49d63ee92c868
      at /Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.29/src/stream.rs:120
       118119fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
       120 >         self.get_mut().as_mut().poll_next(cx)
       121}
       12225: futures_util::stream::stream::StreamExt::poll_next_unpin::h5f167c57e87b8a15
      at /Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.29/src/stream/stream/mod.rs:1638
      1636Self: Unpin,
      1637{
      1638 >         Pin::new(self).poll_next(cx)
      1639}
      164026: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll::h558ec691ab4e7eeb
      at /Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.29/src/stream/stream/next.rs:32
        3031fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        32 >         self.stream.poll_next_unpin(cx)
        33}
        34}
  27: datafusion_physical_plan::repartition::RepartitionExec::pull_from_input::{{closure}}::h0d5b828859fa1f18
      at /Users/matthewgapp/code/forked/arrow-datafusion/datafusion/physical-plan/src/repartition/mod.rs:702
       700// fetch the next batch
       701let timer = metrics.fetch_time.timer();
       702 >             let result = stream.next().await;
       703 │             timer.done();
       70428: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}::h24aa3949553e8510
      at /Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.0/src/runtime/task/core.rs:328
       326327let _guard = TaskIdGuard::enter(self.task_id);
       328 >                 future.poll(&mut cx)
       329})
       330};
  29: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut::hed0e8a2b03eb44a5
      at /Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.0/src/loom/std/unsafe_cell.rs:16
        14 │     #[inline(always)]
        15pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R {
        16 >         f(self.0.get())
        17}
        18}
  30: tokio::runtime::task::core::Core<T,S>::poll::h6e6c537ecabf42b1
      at /Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.0/src/runtime/task/core.rs:317
       315pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
       316let res = {
       317 >             self.stage.stage.with_mut(|ptr| {
       318// Safety: The caller ensures mutual exclusion to the field.
       319let future = match unsafe { &mut *ptr } {
  31: tokio::runtime::task::harness::poll_future::{{closure}}::hdaa68138fe576c41
      at /Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.0/src/runtime/task/harness.rs:485
       483}
       484let guard = Guard { core };
       485 >         let res = guard.core.poll(cx);
       486 │         mem::forget(guard);
       487 │         res

Copy link
Contributor Author

@matthewgapp matthewgapp Jan 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue seems to arise from (or at lease be correlated to) the MemoryExec. I think this because the difference in MemExec partition sizes (more on that below) and because when I remove try_swapping_with_memory https://github.com/matthewgapp/arrow-datafusion/blob/ea4cb6e9c6b8a0bc892ef76578fd7853ddc692ab/datafusion/core/src/physical_optimizer/projection_pushdown.rs#L110, our project no longer blows up.

The difference between the plans that fail (in our project) and succeed (outside of our project) is that the offending Memory exec has a partition size of 2 ( MemoryExec: partitions=1, partition_sizes=[2] ) and the successful plan has a partition size of 1 ( MemoryExec: partitions=1, partition_sizes=[1] ).

I'm not sure how to force datafusion to take on a larger partition size so that I can reproduce outside of our project 🤔

Offending physical plan from logs:

Input physical plan:
AggregateExec: mode=FinalPartitioned, gby=[emd@0 as emd, beg@1 as beg, prices_row_num@2 as prices_row_num, prices_row_num_advancement@3 as prices_row_num_advancement], aggr=[]
  AggregateExec: mode=Partial, gby=[emd@0 as emd, beg@1 as beg, prices_row_num@2 as prices_row_num, prices_row_num_advancement@3 as prices_row_num_advancement], aggr=[]
    ProjectionExec: expr=[emd@0 as emd, beg@1 as beg, prices_row_num@2 as prices_row_num, prices_row_num_advancement@3 as prices_row_num_advancement]
      RecursiveQueryExec: is_distinct=false
        ProjectionExec: expr=[40 as emd, 0 as beg, prices_row_num@5 as prices_row_num, prices_row_num_advancement@6 as prices_row_num_advancement]
          NestedLoopJoinExec: join_type=Left
            NestedLoopJoinExec: join_type=Left
              HashJoinExec: mode=Partitioned, join_type=Inner, on=[(prices_row_num@3, prices_row_num@0)]
                MemoryExec: partitions=1, partition_sizes=[2]
                AggregateExec: mode=FinalPartitioned, gby=[prices_row_num@0 as prices_row_num], aggr=[]
                  AggregateExec: mode=Partial, gby=[prices_row_num@0 as prices_row_num], aggr=[]
                    ProjectionExec: expr=[prices_row_num@0 as prices_row_num]
                      ProjectionExec: expr=[MIN(prices_with_row_num_2.prices_row_num)@0 as prices_row_num]
                        AggregateExec: mode=Final, gby=[], aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                          AggregateExec: mode=Partial, gby=[], aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                            MemoryExec: partitions=1, partition_sizes=[2]
              ProjectionExec: expr=[prices_row_num@0 as prices_row_num]
                ProjectionExec: expr=[MIN(prices_with_row_num_2.prices_row_num)@0 as prices_row_num]
                  AggregateExec: mode=Final, gby=[], aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                    AggregateExec: mode=Partial, gby=[], aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                      MemoryExec: partitions=1, partition_sizes=[2]
            ProjectionExec: expr=[prices_row_num_advancement@0 as prices_row_num_advancement]
              ProjectionExec: expr=[MIN(prices_with_row_num_2.prices_row_num)@0 as prices_row_num_advancement]
                AggregateExec: mode=Final, gby=[], aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                  AggregateExec: mode=Partial, gby=[], aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                    NestedLoopJoinExec: join_type=Inner, filter=prices_row_num@0 > prices_row_num@1
                      MemoryExec: partitions=1, partition_sizes=[2]
                      ProjectionExec: expr=[prices_row_num@0 as prices_row_num]
                        ProjectionExec: expr=[MIN(prices_with_row_num_2.prices_row_num)@0 as prices_row_num]
                          AggregateExec: mode=Final, gby=[], aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                            AggregateExec: mode=Partial, gby=[], aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                              MemoryExec: partitions=1, partition_sizes=[2]
        ProjectionExec: expr=[emd@0 + 40 as emd, emd@0 as beg, prices_row_num@8 as prices_row_num, prices_row_num_advancement@9 as prices_row_num_advancement]
          ProjectionExec: expr=[emd@0 as emd, beg@1 as beg, prices_row_num@2 as prices_row_num, prices_row_num_advancement@3 as prices_row_num_advancement, Index@4 as Index, product@5 as product, price@6 as price, prices_row_num@7 as prices_row_num, prices_row_num@9 as prices_row_num, prices_row_num_advancement@10 as prices_row_num_advancement]
            HashJoinExec: mode=Partitioned, join_type=Left, on=[(coalesce(prices_with_row_num_2.prices_row_num,recursive_cte.prices_row_num_advancement)@8, prices_row_num@0)]
              ProjectionExec: expr=[emd@0 as emd, beg@1 as beg, prices_row_num@2 as prices_row_num, prices_row_num_advancement@3 as prices_row_num_advancement, Index@4 as Index, product@5 as product, price@6 as price, prices_row_num@7 as prices_row_num, coalesce(prices_row_num@7, prices_row_num_advancement@3) as coalesce(prices_with_row_num_2.prices_row_num,recursive_cte.prices_row_num_advancement)]
                HashJoinExec: mode=Partitioned, join_type=Left, on=[(prices_row_num_advancement@3, prices_row_num@3)]
                  FilterExec: prices_row_num_advancement@3 IS NOT NULL
                    WorkTableExec: name=recursive_cte
                  MemoryExec: partitions=1, partition_sizes=[2]
              ProjectionExec: expr=[prices_row_num@0 as prices_row_num, LEAD(prices_with_row_num_2.prices_row_num,Int64(1)) ORDER BY [prices_with_row_num_2.prices_row_num ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as prices_row_num_advancement]
                BoundedWindowAggExec: wdw=[LEAD(prices_with_row_num_2.prices_row_num,Int64(1)) ORDER BY [prices_with_row_num_2.prices_row_num ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "LEAD(prices_with_row_num_2.prices_row_num,Int64(1)) ORDER BY [prices_with_row_num_2.prices_row_num ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
                  AggregateExec: mode=FinalPartitioned, gby=[prices_row_num@0 as prices_row_num], aggr=[]
                    AggregateExec: mode=Partial, gby=[prices_row_num@0 as prices_row_num], aggr=[]
                      ProjectionExec: expr=[prices_row_num@3 as prices_row_num]
                        MemoryExec: partitions=1, partition_sizes=[2]

Copy link
Contributor Author

@matthewgapp matthewgapp Jan 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to increase the partition size to 2 within the memory exec by setting the batch size to 50 on the session config, but I'm still unable to reproduce in my test crate outside of our larger project.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to reproduce and I think I'm getting to the bottom of this issue, which indeed appears to be a separate issue. It arises when creating an upstream table with ROW_NUMBER() window function. Some sort of subtle data type mismatch causes downstream execution to blow up. We can workaround this by wrapping the upstream ROW_NUMBER() column in a CAST to BIGINT.

})
.collect()
) -> Option<Vec<usize>> {
if source.is_none() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to fixing panic which caused a unit test in cte.slt to fail. More info here #8840 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that this bug might not have been fixed.
Returning None seems to select all the columns of the csv table balance.

DataFusion CLI v34.0.0
❯ CREATE EXTERNAL TABLE balance STORED as CSV WITH HEADER ROW LOCATION '../testing/data/csv/r_cte_balance.csv';
0 rows in set. Query took 0.026 seconds.

❯ set datafusion.optimizer.max_passes=0;
0 rows in set. Query took 0.002 seconds.

❯ select time from balance;
ProjectionPushdown
caused by
Internal error: PhysicalOptimizer rule 'ProjectionPushdown' failed, due to generate a different schema, 

original schema: Schema { fields: [
    Field { name: "time", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, 

new schema: Schema { fields: 
    Field { name: "time", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
    Field { name: "name", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
    Field { name: "account_balance", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }.

This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed issue #9004, and I think PR #9005 should be able to fix the ProjectionPushdown bug.

There might be another issue here, which is why no projection was pushed down to the CSV table, but I think it will not affect the functionality.

@matthewgapp matthewgapp force-pushed the matt/feat/recursive-ctes/execution-support branch 2 times, most recently from a993efb to f67763a Compare January 15, 2024 22:04
@matthewgapp matthewgapp marked this pull request as ready for review January 15, 2024 22:19
testing Outdated Show resolved Hide resolved
@matthewgapp matthewgapp force-pushed the matt/feat/recursive-ctes/execution-support branch from a7fd8bc to 8c8a1d4 Compare January 16, 2024 18:05
@matthewgapp matthewgapp requested a review from alamb January 16, 2024 18:29
@matthewgapp matthewgapp force-pushed the matt/feat/recursive-ctes/execution-support branch from 06fcbea to 68c9fe7 Compare January 18, 2024 19:56
@matthewgapp
Copy link
Contributor Author

The tests are failing because it's failing to find the commit that contains the new test files. I pushed the new commit to my branch here so it should start passing again when they're re-run apache/arrow-testing#93

@matthewgapp matthewgapp force-pushed the matt/feat/recursive-ctes/execution-support branch from 68c9fe7 to b11ca8b Compare January 21, 2024 08:49
@github-actions github-actions bot removed logical-expr Logical plan and expressions optimizer Optimizer rules labels Jan 21, 2024
@alamb
Copy link
Contributor

alamb commented Jan 22, 2024

FYI @jonahgao

I'll try and find time to review this over the next day or two. Thanks @matthewgapp

@matthewgapp
Copy link
Contributor Author

FYI @jonahgao

I'll try and find time to review this over the next day or two. Thanks @matthewgapp

Thank you, looking forward to getting this over the finish line!


/// Take the previously written batches from the work table.
/// This will be called by the [`WorkTableExec`] when it is executed.
fn take(&self) -> Vec<RecordBatch> {
Copy link
Member

@jonahgao jonahgao Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The work table might be read multiple times if there are multiple TableScans on it in the recursive term.

WITH RECURSIVE my_cte AS (
    SELECT 1 as a
    UNION ALL
    SELECT my_cte.a+2 as a
    FROM my_cte join my_cte c2 using(a)
    WHERE my_cte.a<5
)
SELECT a FROM my_cte;

We might need to clone the batches here, or we may not support it, like PostgreSQL does.

postgres=# WITH RECURSIVE my_cte AS (
    SELECT 1 as a
    UNION ALL
    SELECT my_cte.a+2 as a
    FROM my_cte join my_cte c2 using(a)
    WHERE my_cte.a<5
)
SELECT a FROM my_cte;
ERROR:  recursive reference to query "my_cte" must not appear more than once
LINE 5:     FROM my_cte join my_cte c2 using(a)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not supporting it for now would be fine (rather than paying the clone tax for every query regardless of whether it's referenced multiple times)

Copy link
Member

@jonahgao jonahgao Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we could make the Worktable capable of being read repeatedly. For example, we could use something like Arc<Vec<RecordBatch>>. However, I think it's okay not to support it; we can refine it in subsequent PRs.

Copy link
Contributor Author

@matthewgapp matthewgapp Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense. Off the top of your head, is there an easy way to traverse the recursive term of the logical plan to count the number of table references (relations) that match the cte name? I'll avoid building that machinery myself if it already exists.

Btw, looks like memory stream clones record batches anyway. https://github.com/matthewgapp/arrow-datafusion/blob/b11ca8bb7e61d78af831b8667fa551d26e4839c7/datafusion/physical-plan/src/memory.rs#L255. Feels like the poll_next implementation should instead drain the batches

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about implementing it in assign_work_table as well?

fn assign_work_table(
    plan: Arc<dyn ExecutionPlan>,
    work_table: Arc<WorkTable>,
) -> Result<Arc<dyn ExecutionPlan>> {
    let mut work_table_refs = 0;
    plan.transform_down_mut(&mut |plan| {
        if let Some(exec) = plan.as_any().downcast_ref::<WorkTableExec>() {
            if work_table_refs > 0 {
                not_impl_err!(
                    "Multiple recursive references to the same CTE are not supported"
                )
            } else {
                work_table_refs += 1;
                Ok(Transformed::Yes(Arc::new(
                    exec.with_work_table(work_table.clone()),
                )))
            }
        } else if plan.as_any().is::<RecursiveQueryExec>() {
            not_impl_err!("Recursive queries cannot be nested")
        } else {
            Ok(Transformed::No(plan))
        }
    })
}

Yes, cloning in MemoryStream seems to be avoidable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implemented here 2eaa48a (#8840), thanks @jonahgao!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create a separate PR to remove the unnecessary clones from memory stream.

@matthewgapp
Copy link
Contributor Author

Let me know if any other blocking comments or changes @alamb or @jonahgao! TY

@alamb
Copy link
Contributor

alamb commented Jan 25, 2024

I plan to review this tomorrow

@matthewgapp
Copy link
Contributor Author

I plan to review this tomorrow

Thank you!

@jonahgao
Copy link
Member

Except for the changes related to ProjectionPushdown, everything else looks good to me!
I believe that the bug related to ProjectionPushdown can be fixed after merging with main.

Thank you! @matthewgapp

add config flag for recursive ctes

update docs from script

update slt test for doc change

restore testing pin

add sql -> logical plan support

* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

wip: readying pr to implement only logical plan

fix sql integration test

wip: add sql test for logical plan

wip: format test assertion

wip: remove uncessary with qualifier method

some docs

more docs

Add comments to `RecursiveQuery`

Update datfusion-cli Cargo.lock

Fix clippy

better errors and comments

add sql -> logical plan support

* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

wip: readying pr to implement only logical plan

fix sql integration test

wip: add sql test for logical plan

wip: format test assertion

wip: remove uncessary with qualifier method

some docs

more docs

impl execution support

add sql -> logical plan support

* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

wip: readying pr to implement only logical plan

partway through porting over isidentical's work

Continuing implementation with fixes and improvements

Lint fixes

ensure that repartitions are not added immediately after RecursiveExec
in the physical-plan

add trivial sqllogictest

more recursive tests

remove test that asserts recursive cte should fail

additional cte test

wip: remove tokio from physical plan dev deps

format cargo tomls

fix issue where CTE could not be referenced more than 1 time

wip: fixes after rebase but tpcds_physical_q54 keeps overflowing its stack

Impl NamedRelation as CteWorkTable

* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

* upd

* assign work table state

* upd

* upd

fix min repro but still broken on larger test case

set config in sql logic tests

clean up cte slt tests

fixes

fix option

add group by test case and more test case files

wip

add window function recursive cte example

simplify stream impl for recrusive query stream

add explain to trivial test case

move setting of recursive ctes to slt file and add test to ensure multiple record batches are produced each iteration

remove tokio dep and remove mut

lint, comments and remove tokio stream

update submodule pin to feat branch that contains csvs

update submodule pin to feat branch that contains csvs
@matthewgapp matthewgapp force-pushed the matt/feat/recursive-ctes/execution-support branch from e65caee to 80069f7 Compare January 26, 2024 17:25
@matthewgapp
Copy link
Contributor Author

Except for the changes related to ProjectionPushdown, everything else looks good to me! I believe that the bug related to ProjectionPushdown can be fixed after merging with main.

Thank you! @matthewgapp

I rebased onto main which has the projection pushdown fix #9005

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @matthewgapp -- I reviewed this PR and it looks really nice (thank you @jonahgao for all the feedback that left it in such good shape)

The only thing I think is needed prior to merge is remove the arrow-testing pin -- the testing data really belong in the https://github.com/apache/arrow-testing repo, as that is shared across arrow implementations. Perhaps you could put it somewhere in the main repo, such as in https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/data ?

Otherwise I think this PR is looking great


# setup
statement ok
CREATE EXTERNAL TABLE balance STORED as CSV WITH HEADER ROW LOCATION '../../testing/data/csv/r_cte_balance.csv'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW if you wanted to avoid having to check in external files, you could use CREATE TABLE AS VALUES

For example: https://github.com/apache/arrow-datafusion/blob/fc752557204f4b52ab4cb38b5caff99b1b73b902/datafusion/sqllogictest/test_files/aggregate.slt#L41-L45

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip. I'll leave for files for now - since some of the files have a non-trivial number of values.

datafusion/physical-plan/src/work_table.rs Outdated Show resolved Hide resolved
@@ -84,7 +82,11 @@ impl TableProvider for CteWorkTable {
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("scan not implemented for CteWorkTable yet")
// TODO: pushdown filters and limits
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend tracking this separate tickets or tasks on the main recursive CTE ticket, otherwise things like this can be easily forgotten

Partitioning::UnknownPartitioning(1)
}

// TODO: control these hints and see whether we can
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this would be pretty tricky -- I think the choices you have below are the simple (and correct) ones, and excellent choices for the first implementation 👍

datafusion/physical-plan/src/recursive_query.rs Outdated Show resolved Hide resolved
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
// TODO: we should use this poll to record some metrics!
if let Some(static_stream) = &mut self.static_stream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very nicely written and easy to follow 👍

@matthewgapp
Copy link
Contributor Author

Thank you @matthewgapp -- I reviewed this PR and it looks really nice (thank you @jonahgao for all the feedback that left it in such good shape)

The only thing I think is needed prior to merge is remove the arrow-testing pin -- the testing data really belong in the https://github.com/apache/arrow-testing repo, as that is shared across arrow implementations. Perhaps you could put it somewhere in the main repo, such as in https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/data ?

Otherwise I think this PR is looking great

@alamb thanks so much for the review. I'll move the test files and remove the pin now.

@matthewgapp
Copy link
Contributor Author

@alamb I removed the pin and updated to reflect your tweaks. Should be good to go :D

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Epic work @matthewgapp

And thank you so much for all your help @jonahgao

@alamb
Copy link
Contributor

alamb commented Jan 27, 2024

I can't wait to get this hardned enough to turn on my default ❤️

@alamb alamb merged commit a6cdd0d into apache:main Jan 27, 2024
22 checks passed
@alamb
Copy link
Contributor

alamb commented Mar 11, 2024

Filed #9554 to track enabling this feature by default

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sql SQL Planner sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants