Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hook for sharing join state in distributed execution #12523

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

thinkharderdev
Copy link
Contributor

Which issue does this PR close?

Closes #12454

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the physical-expr Physical Expressions label Sep 18, 2024
@thinkharderdev
Copy link
Contributor Author

Set as draft for now as I still need to integrate into NestedLoopJoinStream but the basic approach is all here

let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let left_fut = match self.mode {
PartitionMode::CollectLeft => self.left_fut.once(|| {
let reservation =
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());

let probe_threads = shared_state
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If shared state is available we use the num_task_partitions instead of output partitioning to determine the number of local probe threads

return Poll::Ready(Ok(StatefulStreamResult::Continue));
}

if let Some(shared_state) = build_side.left_data.shared_state.as_ref() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When all local probe threads are complete and there is shared state we need to probe that before competing

@@ -2224,6 +2354,179 @@ mod tests {
assert_batches_sorted_eq!(expected, &batches);
}

struct Coordinator {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usage example here. In a real-world scenario the Coordinator would likely be an external service which we communicate with through an rpc call

use datafusion_expr::Operator;
use datafusion_physical_expr_common::datum::compare_op_for_nested;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use parking_lot::Mutex;

/// `SharedJoinState` provides an extension point allowing
/// `HashJoinStream` to share the `visited_indices_bitmap` of the build side of a join
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it left side or right side indices?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left

@comphead
Copy link
Contributor

@korowa FYI

/// across probe tasks without shared memory.
///
/// This can be used to, for example, implement a left outer join efficiently as a broadcast join
/// if the left side is small
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it left side small or right? My feeling was the left(driving) table is huge and right is small

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's the opposite. The left (build) side is small and can be efficiently broadcast. Then the right (probe) side can be partitioned across multiple nodes with the build side broadcast to all of them.

}
}

fn merge_bitmap(m1: &mut BooleanBufferBuilder, m2: BooleanBuffer) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bitmap here is boolean bitmask of what was visited or matched by join/filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. What will be shared and merged is the JoinLeftData::visited_indices_bitmap

@comphead
Copy link
Contributor

Maybe its easier to build some diagram in draw.io or something?
I got the point about shared state but I'm not sure how it will be travelling from caller side and to caller side

@thinkharderdev
Copy link
Contributor Author

Maybe its easier to build some diagram in draw.io or something? I got the point about shared state but I'm not sure how it will be travelling from caller side and to caller side

Yeah, good idea I'll work something up

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Proposal: Hook to better support CollectLeft joins in distributed execution
2 participants