Skip to content

Commit e77b951

Browse files
committed
feat: selectivity metrics (for Explain Analyze) in Hash Join
1 parent 65bd13d commit e77b951

File tree

3 files changed

+84
-2
lines changed

3 files changed

+84
-2
lines changed

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,3 +1156,24 @@ async fn nested_loop_join_selectivity() {
11561156
);
11571157
}
11581158
}
1159+
1160+
#[tokio::test]
1161+
async fn explain_analyze_hash_join() {
1162+
let sql = "EXPLAIN ANALYZE \
1163+
SELECT * \
1164+
FROM generate_series(10) as t1(a) \
1165+
JOIN generate_series(20) as t2(b) \
1166+
ON t1.a=t2.b";
1167+
1168+
for (level, needle, should_contain) in [
1169+
(ExplainAnalyzeLevel::Summary, "probe_hit_rate", true),
1170+
(ExplainAnalyzeLevel::Summary, "avg_fanout", true),
1171+
] {
1172+
let plan = collect_plan(sql, level).await;
1173+
assert_eq!(
1174+
plan.contains(needle),
1175+
should_contain,
1176+
"plan for level {level:?} unexpected content: {plan}"
1177+
);
1178+
}
1179+
}

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::{
4242
RecordBatchStream, SendableRecordBatchStream,
4343
};
4444

45-
use arrow::array::{ArrayRef, UInt32Array, UInt64Array};
45+
use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array};
4646
use arrow::datatypes::{Schema, SchemaRef};
4747
use arrow::record_batch::RecordBatch;
4848
use datafusion_common::{
@@ -296,6 +296,35 @@ pub(super) fn lookup_join_hashmap(
296296
Ok((build_indices, probe_indices, next_offset))
297297
}
298298

299+
/// Counts the number of distinct elements in the input array.
300+
///
301+
/// The input array must be sorted (e.g., `[0, 1, 1, 2, 2, ...]`) and contain no null values.
302+
#[inline]
303+
fn count_distinct_sorted_indices(indices: &UInt32Array) -> usize {
304+
if indices.is_empty() {
305+
return 0;
306+
}
307+
308+
debug_assert!(indices.null_count() == 0);
309+
310+
let values_buf = indices.values();
311+
let values = values_buf.as_ref();
312+
let mut iter = values.iter();
313+
let Some(&first) = iter.next() else {
314+
return 0;
315+
};
316+
317+
let mut count = 1usize;
318+
let mut last = first;
319+
for &value in iter {
320+
if value != last {
321+
last = value;
322+
count += 1;
323+
}
324+
}
325+
count
326+
}
327+
299328
impl HashJoinStream {
300329
#[allow(clippy::too_many_arguments)]
301330
pub(super) fn new(
@@ -483,6 +512,10 @@ impl HashJoinStream {
483512
let state = self.state.try_as_process_probe_batch_mut()?;
484513
let build_side = self.build_side.try_as_ready_mut()?;
485514

515+
self.join_metrics
516+
.probe_hit_rate
517+
.add_total(state.batch.num_rows());
518+
486519
let timer = self.join_metrics.join_time.timer();
487520

488521
// if the left side is empty, we can skip the (potentially expensive) join operation
@@ -512,6 +545,18 @@ impl HashJoinStream {
512545
state.offset,
513546
)?;
514547

548+
let distinct_right_indices_count = count_distinct_sorted_indices(&right_indices);
549+
550+
self.join_metrics
551+
.probe_hit_rate
552+
.add_part(distinct_right_indices_count);
553+
554+
self.join_metrics.avg_fanout.add_part(left_indices.len());
555+
556+
self.join_metrics
557+
.avg_fanout
558+
.add_total(distinct_right_indices_count);
559+
515560
// apply join filter if exists
516561
let (left_indices, right_indices) = if let Some(filter) = &self.filter {
517562
apply_join_filter_to_indices(

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use std::sync::Arc;
2727
use std::task::{Context, Poll};
2828

2929
use crate::joins::SharedBitmapBuilder;
30-
use crate::metrics::{self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder};
30+
use crate::metrics::{
31+
self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricType,
32+
};
3133
use crate::projection::{ProjectionExec, ProjectionExpr};
3234
use crate::{
3335
ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics,
@@ -1327,6 +1329,10 @@ pub(crate) struct BuildProbeJoinMetrics {
13271329
pub(crate) input_batches: metrics::Count,
13281330
/// Number of rows consumed by probe-side this operator
13291331
pub(crate) input_rows: metrics::Count,
1332+
/// Fraction of probe rows that found more than one match
1333+
pub(crate) probe_hit_rate: metrics::RatioMetrics,
1334+
/// Average number of build matches per matched probe row
1335+
pub(crate) avg_fanout: metrics::RatioMetrics,
13301336
}
13311337

13321338
// This Drop implementation updates the elapsed compute part of the metrics.
@@ -1370,6 +1376,14 @@ impl BuildProbeJoinMetrics {
13701376

13711377
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
13721378

1379+
let probe_hit_rate = MetricBuilder::new(metrics)
1380+
.with_type(MetricType::SUMMARY)
1381+
.ratio_metrics("probe_hit_rate", partition);
1382+
1383+
let avg_fanout = MetricBuilder::new(metrics)
1384+
.with_type(MetricType::SUMMARY)
1385+
.ratio_metrics("avg_fanout", partition);
1386+
13731387
Self {
13741388
build_time,
13751389
build_input_batches,
@@ -1379,6 +1393,8 @@ impl BuildProbeJoinMetrics {
13791393
input_batches,
13801394
input_rows,
13811395
baseline,
1396+
probe_hit_rate,
1397+
avg_fanout,
13821398
}
13831399
}
13841400
}

0 commit comments

Comments
 (0)