Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add window expression stream, delegated window aggregation to aggregate functions, and implement row_number #375

Merged
merged 2 commits into from
May 26, 2021

Conversation

jimexist
Copy link
Member

@jimexist jimexist commented May 21, 2021

  • Basic aggregation based window function
  • Streaming results from window functions
  • Basic Structure for row number

This partly fix-es #298

@jimexist jimexist force-pushed the add-row-number branch 2 times, most recently from 90daf11 to 4ad6e4d Compare May 22, 2021 09:50
@codecov-commenter
Copy link

codecov-commenter commented May 22, 2021

Codecov Report

Merging #375 (ad3d9c1) into master (3593d1f) will increase coverage by 0.49%.
The diff coverage is 77.02%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #375      +/-   ##
==========================================
+ Coverage   74.85%   75.34%   +0.49%     
==========================================
  Files         146      147       +1     
  Lines       24565    24782     +217     
==========================================
+ Hits        18387    18673     +286     
+ Misses       6178     6109      -69     
Impacted Files Coverage Δ
datafusion/src/physical_plan/expressions/mod.rs 71.42% <ø> (ø)
datafusion/src/physical_plan/hash_aggregate.rs 85.21% <ø> (ø)
datafusion/src/physical_plan/sort.rs 91.26% <ø> (ø)
datafusion/src/physical_plan/window_functions.rs 85.71% <57.89%> (-3.01%) ⬇️
datafusion/src/physical_plan/mod.rs 78.70% <61.90%> (-4.06%) ⬇️
datafusion/src/physical_plan/windows.rs 73.78% <76.57%> (+73.78%) ⬆️
...fusion/src/physical_plan/expressions/row_number.rs 81.25% <81.25%> (ø)
datafusion/src/execution/context.rs 92.08% <100.00%> (+0.03%) ⬆️
datafusion/src/physical_plan/planner.rs 80.45% <100.00%> (+3.94%) ⬆️
datafusion/tests/sql.rs 99.89% <100.00%> (+<0.01%) ⬆️
... and 11 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3593d1f...ad3d9c1. Read the comment docs.

@jimexist jimexist marked this pull request as ready for review May 22, 2021 10:44
@jimexist jimexist changed the title add streaming of window expr, row_number add window expression stream, delegated window aggregation, and a basic structure for row_number May 22, 2021
@alamb
Copy link
Contributor

alamb commented May 23, 2021

Thank @jimexist -- I plan to review this carefully tomorrow

datafusion/src/physical_plan/expressions/row_number.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/expressions/row_number.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/mod.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/mod.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/mod.rs Outdated Show resolved Hide resolved
pub trait WindowAccumulator: Send + Sync + Debug {
/// scans the accumulator's state from a vector of scalars, similar to Accumulator it also
/// optionally generates values.
fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would help me if you could explain where the "window" for the window function appears in this trait. I assume you already have a design in mind, so I figured I would ask here

I am thinking about a query like the following

select sum(value) OVER (ROWS 5 PRECEDING) FROM ....

I think in this case, you end up with 10 aggregate values from 10 different windows, in the the following manner:

                                                     
                  1 2 3 4 5 6 7 8 9       input      
                                                     
    window 1      ─                                  
    window 2      ───                                
    window 3      ─────                              
    window 4      ───────                            
    window 5      ─────────                          
    window 6        ─────────                        
    window 7          ─────────                      
    window 8            ─────────                    
    window 9              ─────────                  

I would have expected the WindowAccumulator to have functions like

/// Add a new row to the current window
fn new_row_in_window(ScalarValue);

/// remove a row from the current window
fn remove_row_from_window(ScalarValue);

/// The current value of this function for the given window
fn current_value(ScalarValue);

Or possibly something like

evaluate(window: &[ArrayRef]) -> ScalarValue

Copy link
Member Author

@jimexist jimexist May 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb good question!

The window word in this trait is purely indicating the fact that window functions will use this. it can be of a better name but...

for a design, there are two complications:

  1. multiple window functions, each having different window frames, can be scanning batches at the same time, so i'd want each to create its own window accumulator, remembering to push_back, and remove front, on their own; this trait would not put that into the API, it just scans
  2. specifically for window that peeks ahead, because batches arrive in async stream, it is not feasible to peek, so my plan is to allow them to optionally execute a one time shift upon finishing up (e.g. lead is just producing the same value in situ, but with a one time shift at the end)

Due to 1 and 2, a best possible state vector for window accumulator is possibly VecDequeue. And the name scan is because accumulators iterate the list and either optionally produce one value at a time (max with order by), or exactly one accumulated value upon finish scanning (max with empty over), but not both

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- looking at how these functions are used for nth_value here helped me get a better sense for how these traits work

I think as you go through the implementation, adding some additional details to help future implementers of this trait. For example, when it is ok to return values from scan() or scan_batch() and what the expected total number of rows produced?

However having several examples of of implemented window functions I think will help too so no need to change anything more at this time.


#[cfg(test)]
mod tests {
// use super::*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think tests would be good to have

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i try to showcase the design here, so if the interface is generally okay with reviewers, i can go ahead and write down tests.

Copy link
Member Author

@jimexist jimexist May 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jimexist jimexist force-pushed the add-row-number branch 2 times, most recently from 8e6c181 to b439b0e Compare May 25, 2021 01:40
@jimexist jimexist changed the title add window expression stream, delegated window aggregation, and a basic structure for row_number add window expression stream, delegated window aggregation, and implement row_number May 25, 2021
@jimexist jimexist changed the title add window expression stream, delegated window aggregation, and implement row_number add window expression stream, delegated window aggregation to aggregate functions, and implement row_number May 25, 2021
@jimexist
Copy link
Member Author

local test:

> select c1, row_number() over () from test limit 5;
+----+--------------+
| c1 | ROW_NUMBER() |
+----+--------------+
| c  | 1            |
| d  | 2            |
| b  | 3            |
| a  | 4            |
| b  | 5            |
+----+--------------+
5 rows in set. Query took 0 seconds.

@jimexist jimexist force-pushed the add-row-number branch 3 times, most recently from 1b82f25 to 563192a Compare May 25, 2021 12:43
@jimexist jimexist requested review from alamb and Dandandan May 25, 2021 15:56
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think this is a great start. The structure is clear and I think the follow on PRs will make it even better. Thank you so much @jimexist

I am still a bit confused about what the requirements are while implementing a WindowAccumulator but I think they will become clearer as we add more window aggregate support.

I think we should merge this one in. cc @Dandandan @andygrove @jorgecarleitao

pub trait WindowAccumulator: Send + Sync + Debug {
/// scans the accumulator's state from a vector of scalars, similar to Accumulator it also
/// optionally generates values.
fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- looking at how these functions are used for nth_value here helped me get a better sense for how these traits work

I think as you go through the implementation, adding some additional details to help future implementers of this trait. For example, when it is ok to return values from scan() or scan_batch() and what the expected total number of rows produced?

However having several examples of of implemented window functions I think will help too so no need to change anything more at this time.

min(c3) over () \
from aggregate_test_100 limit 5";
let actual = execute(&mut ctx, sql).await;
let expected = vec![
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

datafusion/src/physical_plan/windows.rs Outdated Show resolved Hide resolved

for i in 0..(schema.fields().len() - window_expr.len()) {
let col = concat(
&original_batches
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is only needed for the empty over clause right, as in other cases we probably only want to access the rows.
Also for functions like RowNumber I think we could be streaming/emit batches instead of "caching" them as we don't need to scan all the rows in order to compute the result. Both good in this PR or an issue / follow up work!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I am aware of the potential changes on how rows are scanned but plan to leave this as is because down the road (when adding partition by and order by) things could change a lot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like this to gradually fledge out.

count(c3) over (), \
max(c3) over (), \
min(c3) over () \
from aggregate_test_100 limit 5";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an explicit order by is required (or sorting of results) to make this deterministic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the ordering is "just" a implementation detail of the batches getting merged / concatenated into 1 batch and keeping the same ordering.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add sort by c1

&mut self,
num_rows: usize,
values: &[ArrayRef],
) -> Result<Option<ArrayRef>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get completely when this scan_batch is being used instead of the scan_batch of the implementations? Or can this be removed by now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a catch-all implementations. with more window functions implemented it'll be clearer and by that time we can feel free to remove this.

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good start / continuation - thanks @jimexist for this amazing contribution!

I think this is good to go - also feel free addressing some of the comments within this PR if you want.

commit 7fb3640
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 16:38:25 2021 +0800

    row number done

commit 1723926
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 16:05:50 2021 +0800

    add row number

commit bf5b8a5
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 15:04:49 2021 +0800

    save

commit d2ce852
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 14:53:05 2021 +0800

    add streams

commit 0a861a7
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 22:28:34 2021 +0800

    save stream

commit a9121af
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 22:01:51 2021 +0800

    update unit test

commit 2af2a27
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 14:25:12 2021 +0800

    fix unit test

commit bb57c76
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 14:23:34 2021 +0800

    use upper case

commit 5d96e52
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 14:16:16 2021 +0800

    fix unit test

commit 1ecae8f
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 12:27:26 2021 +0800

    fix unit test

commit bc2271d
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 10:04:29 2021 +0800

    fix error

commit 880b94f
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 08:24:00 2021 +0800

    fix unit test

commit 4e792e1
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 08:05:17 2021 +0800

    fix test

commit c36c04a
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 00:07:54 2021 +0800

    add more tests

commit f5e64de
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 23:41:36 2021 +0800

    update

commit a1eae86
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 23:36:15 2021 +0800

    enrich unit test

commit 0d2a214
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 23:25:43 2021 +0800

    adding filter by todo

commit 8b486d5
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 23:17:22 2021 +0800

    adding more built-in functions

commit abf08cd
Author: Jiayu Liu <Jimexist@users.noreply.github.com>
Date:   Thu May 20 22:36:27 2021 +0800

    Update datafusion/src/physical_plan/window_functions.rs

    Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

commit 0cbca53
Author: Jiayu Liu <Jimexist@users.noreply.github.com>
Date:   Thu May 20 22:34:57 2021 +0800

    Update datafusion/src/physical_plan/window_functions.rs

    Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

commit 831c069
Author: Jiayu Liu <Jimexist@users.noreply.github.com>
Date:   Thu May 20 22:34:04 2021 +0800

    Update datafusion/src/logical_plan/builder.rs

    Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

commit f70c739
Author: Jiayu Liu <Jimexist@users.noreply.github.com>
Date:   Thu May 20 22:33:04 2021 +0800

    Update datafusion/src/logical_plan/builder.rs

    Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

commit 3ee87aa
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Wed May 19 22:55:08 2021 +0800

    fix unit test

commit 5c4d92d
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Wed May 19 22:48:26 2021 +0800

    fix clippy

commit a0b7526
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Wed May 19 22:46:38 2021 +0800

    fix unused imports

commit 1d3b076
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 13 18:51:14 2021 +0800

    add window expr
@alamb alamb merged commit 4b1e9e6 into apache:master May 26, 2021
@houqp houqp added datafusion Changes in the datafusion crate enhancement New feature or request labels Jul 29, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants