Skip to content

Commit

Permalink
perf: elide parallelism restriction on generic rolling expressions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jan 12, 2024
1 parent b2198bb commit de01afa
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 30 deletions.
51 changes: 33 additions & 18 deletions crates/polars-lazy/src/physical_plan/executors/projection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,35 @@ pub(super) fn profile_name(

type IdAndExpression = (u32, Arc<dyn PhysicalExpr>);

#[cfg(feature = "dynamic_group_by")]
fn rolling_evaluate(
df: &DataFrame,
state: &ExecutionState,
rolling: PlHashMap<&RollingGroupOptions, Vec<IdAndExpression>>,
) -> PolarsResult<Vec<Vec<(u32, Series)>>> {
POOL.install(|| {
rolling
.par_iter()
.map(|(options, partition)| {
// clear the cache for every partitioned group
let state = state.split();
let (_time_key, _keys, groups) = df.group_by_rolling(vec![], options)?;
// Set the groups so all expressions in partition can use it.
// Create a separate scope, so the lock is dropped, otherwise we deadlock when the
// rolling expression try to get read access.
{
let mut groups_map = state.group_tuples.write().unwrap();
groups_map.insert(options.index_column.to_string(), groups);
}
partition
.par_iter()
.map(|(idx, expr)| expr.evaluate(df, &state).map(|s| (*idx, s)))
.collect::<PolarsResult<Vec<_>>>()
})
.collect()
})
}

fn execute_projection_cached_window_fns(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
Expand Down Expand Up @@ -83,25 +112,11 @@ fn execute_projection_cached_window_fns(
// Per partition we run in parallel. We compute the groups before and store them once per partition.
// The rolling expression knows how to fetch the groups.
#[cfg(feature = "dynamic_group_by")]
for (options, partition) in rolling {
// clear the cache for every partitioned group
let state = state.split();
let (_time_key, _keys, groups) = df.group_by_rolling(vec![], options)?;
// Set the groups so all expressions in partition can use it.
// Create a separate scope, so the lock is dropped, otherwise we deadlock when the
// rolling expression try to get read access.
{
let mut groups_map = state.group_tuples.write().unwrap();
groups_map.insert(options.index_column.to_string(), groups);
{
let partitions = rolling_evaluate(df, state, rolling)?;
for part in partitions {
selected_columns.extend_from_slice(&part)
}

let results = POOL.install(|| {
partition
.par_iter()
.map(|(idx, expr)| expr.evaluate(df, &state).map(|s| (*idx, s)))
.collect::<PolarsResult<Vec<_>>>()
})?;
selected_columns.extend_from_slice(&results);
}

for partition in windows {
Expand Down
114 changes: 102 additions & 12 deletions crates/polars-time/src/windows/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,56 @@ fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usi
std::mem::swap(thread_offsets, &mut new);
}

#[allow(clippy::too_many_arguments)]
fn group_by_values_iter_lookbehind_collected(
period: Duration,
offset: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
start_offset: usize,
upper_bound: Option<usize>,
) -> PolarsResult<Vec<[IdxSize; 2]>> {
let iter = group_by_values_iter_lookbehind(
period,
offset,
time,
closed_window,
tu,
tz,
start_offset,
upper_bound,
)?;
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<Vec<_>>>()
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn group_by_values_iter_lookahead_collected(
period: Duration,
offset: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
start_offset: usize,
upper_bound: Option<usize>,
) -> PolarsResult<Vec<[IdxSize; 2]>> {
let iter = group_by_values_iter_lookahead(
period,
offset,
time,
closed_window,
tu,
tz,
start_offset,
upper_bound,
);
iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
.collect::<PolarsResult<Vec<_>>>()
}

/// Different from `group_by_windows`, where define window buckets and search which values fit that
/// pre-defined bucket, this function defines every window based on the:
/// - timestamp (lower bound)
Expand All @@ -521,20 +571,37 @@ pub fn group_by_values(
// there are duplicates in the splits, so we opt for a single partition
prune_splits_on_duplicates(time, &mut thread_offsets);

// If we start from within parallel work we will do this single threaded.
let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false);

// we have a (partial) lookbehind window
if offset.negative {
// lookbehind
if offset.duration_ns() == period.duration_ns() {
// t is right at the end of the window
// ------t---
// [------]
if !run_parallel {
let vecs = group_by_values_iter_lookbehind_collected(
period,
offset,
time,
closed_window,
tu,
tz,
0,
None,
)?;
return Ok(GroupsSlice::from(vecs));
}

POOL.install(|| {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let upper_bound = base_offset + len;
let iter = group_by_values_iter_lookbehind(
group_by_values_iter_lookbehind_collected(
period,
offset,
time,
Expand All @@ -543,9 +610,7 @@ pub fn group_by_values(
tz,
base_offset,
Some(upper_bound),
)?;
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<Vec<_>>>()
)
})
.collect::<PolarsResult<Vec<_>>>()?;
Ok(flatten_par(&vals))
Expand Down Expand Up @@ -588,14 +653,29 @@ pub fn group_by_values(
// window is completely ahead of t and t itself is not a member
// --t-----------
// [---]

if !run_parallel {
let vecs = group_by_values_iter_lookahead_collected(
period,
offset,
time,
closed_window,
tu,
tz,
0,
None,
)?;
return Ok(GroupsSlice::from(vecs));
}

POOL.install(|| {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let lower_bound = base_offset;
let upper_bound = base_offset + len;
let iter = group_by_values_iter_lookahead(
group_by_values_iter_lookahead_collected(
period,
offset,
time,
Expand All @@ -604,14 +684,26 @@ pub fn group_by_values(
tz,
lower_bound,
Some(upper_bound),
);
iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
.collect::<PolarsResult<Vec<_>>>()
)
})
.collect::<PolarsResult<Vec<_>>>()?;
Ok(flatten_par(&vals))
})
} else {
if !run_parallel {
let vecs = group_by_values_iter_lookahead_collected(
period,
offset,
time,
closed_window,
tu,
tz,
0,
None,
)?;
return Ok(GroupsSlice::from(vecs));
}

// Offset is 0 and window is closed on the left:
// it must be that the window starts at t and t is a member
// --t-----------
Expand All @@ -623,7 +715,7 @@ pub fn group_by_values(
.map(|(base_offset, len)| {
let lower_bound = base_offset;
let upper_bound = base_offset + len;
let iter = group_by_values_iter_lookahead(
group_by_values_iter_lookahead_collected(
period,
offset,
time,
Expand All @@ -632,9 +724,7 @@ pub fn group_by_values(
tz,
lower_bound,
Some(upper_bound),
);
iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
.collect::<PolarsResult<Vec<_>>>()
)
})
.collect::<PolarsResult<Vec<_>>>()?;
Ok(flatten_par(&vals))
Expand Down

0 comments on commit de01afa

Please sign in to comment.