diff --git a/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs b/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs index 061f54b6e8c..fca01c0f9e6 100644 --- a/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs +++ b/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs @@ -21,6 +21,35 @@ pub(super) fn profile_name( type IdAndExpression = (u32, Arc); +#[cfg(feature = "dynamic_group_by")] +fn rolling_evaluate( + df: &DataFrame, + state: &ExecutionState, + rolling: PlHashMap<&RollingGroupOptions, Vec>, +) -> PolarsResult>> { + POOL.install(|| { + rolling + .par_iter() + .map(|(options, partition)| { + // clear the cache for every partitioned group + let state = state.split(); + let (_time_key, _keys, groups) = df.group_by_rolling(vec![], options)?; + // Set the groups so all expressions in partition can use it. + // Create a separate scope, so the lock is dropped, otherwise we deadlock when the + // rolling expression try to get read access. + { + let mut groups_map = state.group_tuples.write().unwrap(); + groups_map.insert(options.index_column.to_string(), groups); + } + partition + .par_iter() + .map(|(idx, expr)| expr.evaluate(df, &state).map(|s| (*idx, s))) + .collect::>>() + }) + .collect() + }) +} + fn execute_projection_cached_window_fns( df: &DataFrame, exprs: &[Arc], @@ -83,25 +112,11 @@ fn execute_projection_cached_window_fns( // Per partition we run in parallel. We compute the groups before and store them once per partition. // The rolling expression knows how to fetch the groups. #[cfg(feature = "dynamic_group_by")] - for (options, partition) in rolling { - // clear the cache for every partitioned group - let state = state.split(); - let (_time_key, _keys, groups) = df.group_by_rolling(vec![], options)?; - // Set the groups so all expressions in partition can use it. - // Create a separate scope, so the lock is dropped, otherwise we deadlock when the - // rolling expression try to get read access. - { - let mut groups_map = state.group_tuples.write().unwrap(); - groups_map.insert(options.index_column.to_string(), groups); + { + let partitions = rolling_evaluate(df, state, rolling)?; + for part in partitions { + selected_columns.extend_from_slice(&part) } - - let results = POOL.install(|| { - partition - .par_iter() - .map(|(idx, expr)| expr.evaluate(df, &state).map(|s| (*idx, s))) - .collect::>>() - })?; - selected_columns.extend_from_slice(&results); } for partition in windows { diff --git a/crates/polars-time/src/windows/group_by.rs b/crates/polars-time/src/windows/group_by.rs index 851b9e5a69e..9f984c54685 100644 --- a/crates/polars-time/src/windows/group_by.rs +++ b/crates/polars-time/src/windows/group_by.rs @@ -504,6 +504,56 @@ fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usi std::mem::swap(thread_offsets, &mut new); } +#[allow(clippy::too_many_arguments)] +fn group_by_values_iter_lookbehind_collected( + period: Duration, + offset: Duration, + time: &[i64], + closed_window: ClosedWindow, + tu: TimeUnit, + tz: Option, + start_offset: usize, + upper_bound: Option, +) -> PolarsResult> { + let iter = group_by_values_iter_lookbehind( + period, + offset, + time, + closed_window, + tu, + tz, + start_offset, + upper_bound, + )?; + iter.map(|result| result.map(|(offset, len)| [offset, len])) + .collect::>>() +} + +#[allow(clippy::too_many_arguments)] +pub(crate) fn group_by_values_iter_lookahead_collected( + period: Duration, + offset: Duration, + time: &[i64], + closed_window: ClosedWindow, + tu: TimeUnit, + tz: Option, + start_offset: usize, + upper_bound: Option, +) -> PolarsResult> { + let iter = group_by_values_iter_lookahead( + period, + offset, + time, + closed_window, + tu, + tz, + start_offset, + upper_bound, + ); + iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len])) + .collect::>>() +} + /// Different from `group_by_windows`, where define window buckets and search which values fit that /// pre-defined bucket, this function defines every window based on the: /// - timestamp (lower bound) @@ -521,6 +571,9 @@ pub fn group_by_values( // there are duplicates in the splits, so we opt for a single partition prune_splits_on_duplicates(time, &mut thread_offsets); + // If we start from within parallel work we will do this single threaded. + let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false); + // we have a (partial) lookbehind window if offset.negative { // lookbehind @@ -528,13 +581,27 @@ pub fn group_by_values( // t is right at the end of the window // ------t--- // [------] + if !run_parallel { + let vecs = group_by_values_iter_lookbehind_collected( + period, + offset, + time, + closed_window, + tu, + tz, + 0, + None, + )?; + return Ok(GroupsSlice::from(vecs)); + } + POOL.install(|| { let vals = thread_offsets .par_iter() .copied() .map(|(base_offset, len)| { let upper_bound = base_offset + len; - let iter = group_by_values_iter_lookbehind( + group_by_values_iter_lookbehind_collected( period, offset, time, @@ -543,9 +610,7 @@ pub fn group_by_values( tz, base_offset, Some(upper_bound), - )?; - iter.map(|result| result.map(|(offset, len)| [offset, len])) - .collect::>>() + ) }) .collect::>>()?; Ok(flatten_par(&vals)) @@ -588,6 +653,21 @@ pub fn group_by_values( // window is completely ahead of t and t itself is not a member // --t----------- // [---] + + if !run_parallel { + let vecs = group_by_values_iter_lookahead_collected( + period, + offset, + time, + closed_window, + tu, + tz, + 0, + None, + )?; + return Ok(GroupsSlice::from(vecs)); + } + POOL.install(|| { let vals = thread_offsets .par_iter() @@ -595,7 +675,7 @@ pub fn group_by_values( .map(|(base_offset, len)| { let lower_bound = base_offset; let upper_bound = base_offset + len; - let iter = group_by_values_iter_lookahead( + group_by_values_iter_lookahead_collected( period, offset, time, @@ -604,14 +684,26 @@ pub fn group_by_values( tz, lower_bound, Some(upper_bound), - ); - iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len])) - .collect::>>() + ) }) .collect::>>()?; Ok(flatten_par(&vals)) }) } else { + if !run_parallel { + let vecs = group_by_values_iter_lookahead_collected( + period, + offset, + time, + closed_window, + tu, + tz, + 0, + None, + )?; + return Ok(GroupsSlice::from(vecs)); + } + // Offset is 0 and window is closed on the left: // it must be that the window starts at t and t is a member // --t----------- @@ -623,7 +715,7 @@ pub fn group_by_values( .map(|(base_offset, len)| { let lower_bound = base_offset; let upper_bound = base_offset + len; - let iter = group_by_values_iter_lookahead( + group_by_values_iter_lookahead_collected( period, offset, time, @@ -632,9 +724,7 @@ pub fn group_by_values( tz, lower_bound, Some(upper_bound), - ); - iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len])) - .collect::>>() + ) }) .collect::>>()?; Ok(flatten_par(&vals))