Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: fast path to generate group idxs for vanilla int_range in group_by_dynamic #19932

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,16 @@ impl LazyFrame {
if let Expr::Column(name) = index_column {
options.index_column = name;
} else {
fn is_int_range(expr: &Expr) -> bool {
match expr {
Expr::Alias(input, _) => is_int_range(input),
Expr::Function { function, .. } => {
matches!(function, FunctionExpr::Range(f) if f.to_string() == "int_range")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit awkward that I have to check against the .to_string method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to:

 matches!(
        function,
        FunctionExpr::Range(RangeFunction::IntRange { .. })
    );

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do this check during the IR::conversion, in resolve_groupby here:

} else if let Some(options) = _options.dynamic.as_ref() {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I am just unsure how I should check in resolve_group_by whether the index of the group_by_dynamic is an int_range. Can I get the expression for this index (conveniently) from the expr_arena?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right. I see that we first do a with_columns here. We should store the index column on line 1155 and do the with_column rewrite during IR conversion.

I understand it's a bit more than you anticipated. I can do the pre-work for that later if you like?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this involves passing the index_column Expr from the lazy_frame.group_by_dynamic method to the dsl_to_ir functions? However, the best approach to do this is not clear to me..

Would thus be very helpful if you could do the pre-work for this! :)

},
_ => false,
}
}
options.int_range = is_int_range(&index_column);
let output_field = index_column
.to_field(&self.collect_schema().unwrap(), Context::Default)
.unwrap();
Expand Down
122 changes: 112 additions & 10 deletions crates/polars-time/src/group_by/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct DynamicGroupOptions {
pub include_boundaries: bool,
pub closed_window: ClosedWindow,
pub start_by: StartBy,
pub int_range: bool, // Whether the index column is a range column
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should be private (as we do not want users to set this "flag")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't.

We can add a new DynamicGroupOptionsIR where we add private flags set by the IR.

}

impl Default for DynamicGroupOptions {
Expand All @@ -47,6 +48,7 @@ impl Default for DynamicGroupOptions {
include_boundaries: false,
closed_window: ClosedWindow::Left,
start_by: Default::default(),
int_range: false,
}
}
}
Expand Down Expand Up @@ -311,16 +313,109 @@ impl Wrap<&DataFrame> {
let groups = if by.is_empty() {
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (groups, lower, upper) = group_by_windows(
w,
ts,
options.closed_window,
tu,
tz,
include_lower_bound,
include_upper_bound,
options.start_by,
);

let vanilla_start_step = (ts[0] == 0) && (ts[1] - ts[0] == 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to a separate function so that implementation is separate from dispatch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the logic to a separate function in the group_by.rs file

let (groups, lower, upper) = match (options.int_range, vanilla_start_step) {
(true, true) => {
let len: IdxSize = self.0.height() as IdxSize;
// assert_eq!(ts[len as usize - 1], len as i64 - 1);
ritchie46 marked this conversation as resolved.
Show resolved Hide resolved
let step: IdxSize = options.every.nanoseconds() as IdxSize;
let orig_window_size: IdxSize = options.period.nanoseconds() as IdxSize;
let mut window_size = orig_window_size;
let mut offset: IdxSize = options.offset.nanoseconds() as IdxSize;
if options.start_by == StartBy::DataPoint {
offset = 0;
}

if options.closed_window == ClosedWindow::Right {
offset += 1;
} else if options.closed_window == ClosedWindow::Both {
window_size += 1;
} else if options.closed_window == ClosedWindow::None {
offset += 1;
window_size -= 1;
}

let mut groups: Vec<[IdxSize; 2]> = match window_size {
0 => Vec::new(),
_ => generate_start_indices(offset, len, step)
.into_iter()
.map(|start| {
let end = std::cmp::min(window_size, len - start);
[start, end]
})
.collect(),
};

if (options.start_by == StartBy::WindowBound) && (window_size > 0) {
while offset >= step {
offset -= step;
groups.insert(0, [offset, window_size]);
}
}

let mut lower = match (include_lower_bound, window_size > 0) {
(true, true) => groups.iter().map(|&i| i[0] as i64).collect::<Vec<i64>>(),
_ => Vec::<i64>::new(),
};

let mut upper = match (include_upper_bound, window_size > 0) {
(true, true) => groups
.iter()
.map(|&i| (i[0] + orig_window_size) as i64)
.collect::<Vec<i64>>(),
_ => Vec::<i64>::new(),
};

if include_lower_bound
&& (options.closed_window == ClosedWindow::Right)
| (options.closed_window == ClosedWindow::None)
{
lower = lower.iter().map(|&i| i - 1).collect::<Vec<i64>>();
}
if include_upper_bound
&& (options.closed_window == ClosedWindow::Right)
| (options.closed_window == ClosedWindow::None)
{
upper = upper.iter().map(|&i| i - 1).collect::<Vec<i64>>();
}
if options.start_by == StartBy::WindowBound
&& (offset > 0)
&& (window_size >= offset)
&& (step < window_size + offset)
{
groups.insert(0, [0, offset + window_size - step]);
if include_lower_bound {
lower.insert(0, offset as i64 - step as i64);
if (options.closed_window == ClosedWindow::Right)
| (options.closed_window == ClosedWindow::None)
{
lower[0] -= 1;
}
}
if include_upper_bound {
upper.insert(0, groups[0][1] as i64);
if (options.closed_window == ClosedWindow::Right)
| (options.closed_window == ClosedWindow::Both)
{
upper[0] -= 1;
}
}
}

(groups, lower, upper)
},
_ => group_by_windows(
w,
ts,
options.closed_window,
tu,
tz,
include_lower_bound,
include_upper_bound,
options.start_by,
),
};
update_bounds(lower, upper);
PolarsResult::Ok(GroupsProxy::Slice {
groups,
Expand Down Expand Up @@ -616,6 +711,11 @@ impl Wrap<&DataFrame> {
}
}

fn generate_start_indices(offset: IdxSize, len: IdxSize, stride: IdxSize) -> Vec<IdxSize> {
let nb_idxs: IdxSize = (len - offset).div_ceil(stride);
(0..nb_idxs).map(|i| offset + i * stride).collect()
}

fn update_subgroups_slice(sub_groups: &[[IdxSize; 2]], base_g: [IdxSize; 2]) -> Vec<[IdxSize; 2]> {
sub_groups
.iter()
Expand Down Expand Up @@ -841,6 +941,7 @@ mod test {
include_boundaries: true,
closed_window: ClosedWindow::Both,
start_by: Default::default(),
int_range: Default::default(),
},
)
.unwrap();
Expand Down Expand Up @@ -961,6 +1062,7 @@ mod test {
include_boundaries: true,
closed_window: ClosedWindow::Both,
start_by: Default::default(),
int_range: Default::default(),
},
)
.unwrap();
Expand Down