-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: Add selectivity metric to NestedLoopJoinExec for EXPLAIN ANALYZE #18481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
f0eec80
Implement selectivity metric for nested_loop_join
petern48 b1a32d5
Add test in explain_analyze.rs
petern48 0650e79
Fix test name
petern48 c2264b2
Reorganize: pull out metrics into a new struct NestedLoopJoinMetrics
petern48 5dd0630
Combine add_total logic into one place
petern48 f3d6a7e
Clean up tests and fix clippy
petern48 08f8bc7
Add back print statements that were already there
petern48 388aeb1
Update datafusion/physical-plan/src/joins/nested_loop_join.rs
petern48 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,9 @@ use crate::joins::utils::{ | |
| OnceAsync, OnceFut, | ||
| }; | ||
| use crate::joins::SharedBitmapBuilder; | ||
| use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricsSet}; | ||
| use crate::metrics::{ | ||
| Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, MetricsSet, RatioMetrics, | ||
| }; | ||
| use crate::projection::{ | ||
| try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, | ||
| ProjectionExec, | ||
|
|
@@ -496,7 +498,7 @@ impl ExecutionPlan for NestedLoopJoinExec { | |
| ); | ||
| } | ||
|
|
||
| let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); | ||
| let metrics = NestedLoopJoinMetrics::new(&self.metrics, partition); | ||
|
|
||
| // Initialization reservation for load of inner table | ||
| let load_reservation = | ||
|
|
@@ -508,7 +510,7 @@ impl ExecutionPlan for NestedLoopJoinExec { | |
|
|
||
| Ok(collect_left_input( | ||
| stream, | ||
| join_metrics.clone(), | ||
| metrics.join_metrics.clone(), | ||
| load_reservation, | ||
| need_produce_result_in_final(self.join_type), | ||
| self.right().output_partitioning().partition_count(), | ||
|
|
@@ -535,7 +537,7 @@ impl ExecutionPlan for NestedLoopJoinExec { | |
| probe_side_data, | ||
| build_side_data, | ||
| column_indices_after_projection, | ||
| join_metrics, | ||
| metrics, | ||
| batch_size, | ||
| ))) | ||
| } | ||
|
|
@@ -749,7 +751,7 @@ pub(crate) struct NestedLoopJoinStream { | |
| /// the join filter (e.g., `JOIN ON (b+c)>0`). | ||
| pub(crate) column_indices: Vec<ColumnIndex>, | ||
| /// Join execution metrics | ||
| pub(crate) join_metrics: BuildProbeJoinMetrics, | ||
| pub(crate) metrics: NestedLoopJoinMetrics, | ||
|
|
||
| /// `batch_size` from configuration | ||
| batch_size: usize, | ||
|
|
@@ -794,6 +796,24 @@ pub(crate) struct NestedLoopJoinStream { | |
| current_right_batch_matched: Option<BooleanArray>, | ||
| } | ||
|
|
||
| pub(crate) struct NestedLoopJoinMetrics { | ||
| /// Join execution metrics | ||
| pub(crate) join_metrics: BuildProbeJoinMetrics, | ||
| /// Selectivity of the join: output_rows / (left_rows * right_rows) | ||
| pub(crate) selectivity: RatioMetrics, | ||
| } | ||
|
|
||
| impl NestedLoopJoinMetrics { | ||
| pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { | ||
| Self { | ||
| join_metrics: BuildProbeJoinMetrics::new(partition, metrics), | ||
| selectivity: MetricBuilder::new(metrics) | ||
| .with_type(MetricType::SUMMARY) | ||
| .ratio_metrics("selectivity", partition), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Stream for NestedLoopJoinStream { | ||
| type Item = Result<RecordBatch>; | ||
|
|
||
|
|
@@ -844,7 +864,7 @@ impl Stream for NestedLoopJoinStream { | |
| // -side batches), related metrics except build time will be | ||
| // updated. | ||
| // stop on drop | ||
| let build_metric = self.join_metrics.build_time.clone(); | ||
| let build_metric = self.metrics.join_metrics.build_time.clone(); | ||
| let _build_timer = build_metric.timer(); | ||
|
|
||
| match self.handle_buffering_left(cx) { | ||
|
|
@@ -878,7 +898,7 @@ impl Stream for NestedLoopJoinStream { | |
| NLJState::FetchingRight => { | ||
| debug!("[NLJState] Entering: {:?}", self.state); | ||
| // stop on drop | ||
| let join_metric = self.join_metrics.join_time.clone(); | ||
| let join_metric = self.metrics.join_metrics.join_time.clone(); | ||
| let _join_timer = join_metric.timer(); | ||
|
|
||
| match self.handle_fetching_right(cx) { | ||
|
|
@@ -905,13 +925,13 @@ impl Stream for NestedLoopJoinStream { | |
| debug!("[NLJState] Entering: {:?}", self.state); | ||
|
|
||
| // stop on drop | ||
| let join_metric = self.join_metrics.join_time.clone(); | ||
| let join_metric = self.metrics.join_metrics.join_time.clone(); | ||
| let _join_timer = join_metric.timer(); | ||
|
|
||
| match self.handle_probe_right() { | ||
| ControlFlow::Continue(()) => continue, | ||
| ControlFlow::Break(poll) => { | ||
| return self.join_metrics.baseline.record_poll(poll) | ||
| return self.metrics.join_metrics.baseline.record_poll(poll) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -926,13 +946,13 @@ impl Stream for NestedLoopJoinStream { | |
| debug!("[NLJState] Entering: {:?}", self.state); | ||
|
|
||
| // stop on drop | ||
| let join_metric = self.join_metrics.join_time.clone(); | ||
| let join_metric = self.metrics.join_metrics.join_time.clone(); | ||
| let _join_timer = join_metric.timer(); | ||
|
|
||
| match self.handle_emit_right_unmatched() { | ||
| ControlFlow::Continue(()) => continue, | ||
| ControlFlow::Break(poll) => { | ||
| return self.join_metrics.baseline.record_poll(poll) | ||
| return self.metrics.join_metrics.baseline.record_poll(poll) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -956,13 +976,13 @@ impl Stream for NestedLoopJoinStream { | |
| debug!("[NLJState] Entering: {:?}", self.state); | ||
|
|
||
| // stop on drop | ||
| let join_metric = self.join_metrics.join_time.clone(); | ||
| let join_metric = self.metrics.join_metrics.join_time.clone(); | ||
| let _join_timer = join_metric.timer(); | ||
|
|
||
| match self.handle_emit_left_unmatched() { | ||
| ControlFlow::Continue(()) => continue, | ||
| ControlFlow::Break(poll) => { | ||
| return self.join_metrics.baseline.record_poll(poll) | ||
| return self.metrics.join_metrics.baseline.record_poll(poll) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -972,13 +992,13 @@ impl Stream for NestedLoopJoinStream { | |
| debug!("[NLJState] Entering: {:?}", self.state); | ||
|
|
||
| // stop on drop | ||
| let join_metric = self.join_metrics.join_time.clone(); | ||
| let join_metric = self.metrics.join_metrics.join_time.clone(); | ||
| let _join_timer = join_metric.timer(); | ||
| // counting it in join timer due to there might be some | ||
| // final resout batches to output in this state | ||
|
|
||
| let poll = self.handle_done(); | ||
| return self.join_metrics.baseline.record_poll(poll); | ||
| return self.metrics.join_metrics.baseline.record_poll(poll); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -1000,7 +1020,7 @@ impl NestedLoopJoinStream { | |
| right_data: SendableRecordBatchStream, | ||
| left_data: OnceFut<JoinLeftData>, | ||
| column_indices: Vec<ColumnIndex>, | ||
| join_metrics: BuildProbeJoinMetrics, | ||
| metrics: NestedLoopJoinMetrics, | ||
| batch_size: usize, | ||
| ) -> Self { | ||
| Self { | ||
|
|
@@ -1010,7 +1030,7 @@ impl NestedLoopJoinStream { | |
| right_data, | ||
| column_indices, | ||
| left_data, | ||
| join_metrics, | ||
| metrics, | ||
| buffered_left_data: None, | ||
| output_buffer: Box::new(BatchCoalescer::new(schema, batch_size)), | ||
| batch_size, | ||
|
|
@@ -1057,8 +1077,8 @@ impl NestedLoopJoinStream { | |
| Some(Ok(right_batch)) => { | ||
| // Update metrics | ||
| let right_batch_size = right_batch.num_rows(); | ||
| self.join_metrics.input_rows.add(right_batch_size); | ||
| self.join_metrics.input_batches.add(1); | ||
| self.metrics.join_metrics.input_rows.add(right_batch_size); | ||
| self.metrics.join_metrics.input_batches.add(1); | ||
|
|
||
| // Skip the empty batch | ||
| if right_batch_size == 0 { | ||
|
|
@@ -1108,6 +1128,17 @@ impl NestedLoopJoinStream { | |
| Ok(false) => { | ||
| // Left exhausted, transition to FetchingRight | ||
| self.left_probe_idx = 0; | ||
|
|
||
| // Selectivity Metric: Update total possibilities for the batch (left_rows * right_rows) | ||
| // If memory-limited execution is implemented, this logic must be updated accordingly. | ||
| if let (Ok(left_data), Some(right_batch)) = | ||
| (self.get_left_data(), self.current_right_batch.as_ref()) | ||
| { | ||
| let left_rows = left_data.batch().num_rows(); | ||
| let right_rows = right_batch.num_rows(); | ||
| self.metrics.selectivity.add_total(left_rows * right_rows); | ||
| } | ||
|
|
||
| if self.should_track_unmatched_right { | ||
| debug_assert!( | ||
| self.current_right_batch_matched.is_some(), | ||
|
|
@@ -1138,7 +1169,6 @@ impl NestedLoopJoinStream { | |
| && self.current_right_batch.is_some(), | ||
| "This state is yielding output for unmatched rows in the current right batch, so both the right batch and the bitmap must be present" | ||
| ); | ||
|
|
||
| // Construct the result batch for unmatched right rows using a utility function | ||
| match self.process_right_unmatched() { | ||
| Ok(Some(batch)) => { | ||
|
|
@@ -1205,7 +1235,7 @@ impl NestedLoopJoinStream { | |
| // should be with the expected schema for this operator | ||
| if !self.handled_empty_output { | ||
| let zero_count = Count::new(); | ||
| if *self.join_metrics.baseline.output_rows() == zero_count { | ||
| if *self.metrics.join_metrics.baseline.output_rows() == zero_count { | ||
| let empty_batch = RecordBatch::new_empty(Arc::clone(&self.output_schema)); | ||
| self.handled_empty_output = true; | ||
| return Poll::Ready(Some(Ok(empty_batch))); | ||
|
|
@@ -1455,7 +1485,11 @@ impl NestedLoopJoinStream { | |
| if let Some(batch) = self.output_buffer.next_completed_batch() { | ||
| // HACK: this is not part of `BaselineMetrics` yet, so update it | ||
| // manually | ||
| self.join_metrics.output_batches.add(1); | ||
| self.metrics.join_metrics.output_batches.add(1); | ||
|
|
||
| // Update output rows for selectivity metric | ||
| let output_rows = batch.num_rows(); | ||
| self.metrics.selectivity.add_part(output_rows); | ||
|
Comment on lines
+1488
to
+1492
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update |
||
|
|
||
| return Some(Poll::Ready(Some(Ok(batch)))); | ||
| } | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.