Skip to content

Commit dae99ac

Browse files
authored
feat: selectivity metrics (for Explain Analyze) in Hash Join (apache#18488)
## Which issue does this PR close? - Closes apache#18409 ## What changes are included in this PR? Added a distinct element calculator in core hash join loop. It also works on an assumption that indices will be returned in an increasing order, I couldn't see a place where this assumption is broken, but if that's not the case, please do help me out. Also, I am not 100% sure my implementation for `avg_fanout` is correct, so do let me know if that needs changes. ## Are these changes tested? No failures in `sqllogictests`/tests in `datafusion/core/tests/sql/`, should I add a test case for this?
1 parent 2a6f3aa commit dae99ac

File tree

3 files changed

+84
-2
lines changed

3 files changed

+84
-2
lines changed

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,3 +1156,24 @@ async fn nested_loop_join_selectivity() {
11561156
);
11571157
}
11581158
}
1159+
1160+
#[tokio::test]
1161+
async fn explain_analyze_hash_join() {
1162+
let sql = "EXPLAIN ANALYZE \
1163+
SELECT * \
1164+
FROM generate_series(10) as t1(a) \
1165+
JOIN generate_series(20) as t2(b) \
1166+
ON t1.a=t2.b";
1167+
1168+
for (level, needle, should_contain) in [
1169+
(ExplainAnalyzeLevel::Summary, "probe_hit_rate", true),
1170+
(ExplainAnalyzeLevel::Summary, "avg_fanout", true),
1171+
] {
1172+
let plan = collect_plan(sql, level).await;
1173+
assert_eq!(
1174+
plan.contains(needle),
1175+
should_contain,
1176+
"plan for level {level:?} unexpected content: {plan}"
1177+
);
1178+
}
1179+
}

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::{
4242
RecordBatchStream, SendableRecordBatchStream,
4343
};
4444

45-
use arrow::array::{ArrayRef, UInt32Array, UInt64Array};
45+
use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array};
4646
use arrow::datatypes::{Schema, SchemaRef};
4747
use arrow::record_batch::RecordBatch;
4848
use datafusion_common::{
@@ -297,6 +297,35 @@ pub(super) fn lookup_join_hashmap(
297297
Ok((build_indices, probe_indices, next_offset))
298298
}
299299

300+
/// Counts the number of distinct elements in the input array.
301+
///
302+
/// The input array must be sorted (e.g., `[0, 1, 1, 2, 2, ...]`) and contain no null values.
303+
#[inline]
304+
fn count_distinct_sorted_indices(indices: &UInt32Array) -> usize {
305+
if indices.is_empty() {
306+
return 0;
307+
}
308+
309+
debug_assert!(indices.null_count() == 0);
310+
311+
let values_buf = indices.values();
312+
let values = values_buf.as_ref();
313+
let mut iter = values.iter();
314+
let Some(&first) = iter.next() else {
315+
return 0;
316+
};
317+
318+
let mut count = 1usize;
319+
let mut last = first;
320+
for &value in iter {
321+
if value != last {
322+
last = value;
323+
count += 1;
324+
}
325+
}
326+
count
327+
}
328+
300329
impl HashJoinStream {
301330
#[allow(clippy::too_many_arguments)]
302331
pub(super) fn new(
@@ -480,6 +509,10 @@ impl HashJoinStream {
480509
let state = self.state.try_as_process_probe_batch_mut()?;
481510
let build_side = self.build_side.try_as_ready_mut()?;
482511

512+
self.join_metrics
513+
.probe_hit_rate
514+
.add_total(state.batch.num_rows());
515+
483516
let timer = self.join_metrics.join_time.timer();
484517

485518
// if the left side is empty, we can skip the (potentially expensive) join operation
@@ -509,6 +542,18 @@ impl HashJoinStream {
509542
state.offset,
510543
)?;
511544

545+
let distinct_right_indices_count = count_distinct_sorted_indices(&right_indices);
546+
547+
self.join_metrics
548+
.probe_hit_rate
549+
.add_part(distinct_right_indices_count);
550+
551+
self.join_metrics.avg_fanout.add_part(left_indices.len());
552+
553+
self.join_metrics
554+
.avg_fanout
555+
.add_total(distinct_right_indices_count);
556+
512557
// apply join filter if exists
513558
let (left_indices, right_indices) = if let Some(filter) = &self.filter {
514559
apply_join_filter_to_indices(

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use std::sync::Arc;
2727
use std::task::{Context, Poll};
2828

2929
use crate::joins::SharedBitmapBuilder;
30-
use crate::metrics::{self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder};
30+
use crate::metrics::{
31+
self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricType,
32+
};
3133
use crate::projection::{ProjectionExec, ProjectionExpr};
3234
use crate::{
3335
ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics,
@@ -1328,6 +1330,10 @@ pub(crate) struct BuildProbeJoinMetrics {
13281330
pub(crate) input_batches: metrics::Count,
13291331
/// Number of rows consumed by probe-side this operator
13301332
pub(crate) input_rows: metrics::Count,
1333+
/// Fraction of probe rows that found more than one match
1334+
pub(crate) probe_hit_rate: metrics::RatioMetrics,
1335+
/// Average number of build matches per matched probe row
1336+
pub(crate) avg_fanout: metrics::RatioMetrics,
13311337
}
13321338

13331339
// This Drop implementation updates the elapsed compute part of the metrics.
@@ -1371,6 +1377,14 @@ impl BuildProbeJoinMetrics {
13711377

13721378
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
13731379

1380+
let probe_hit_rate = MetricBuilder::new(metrics)
1381+
.with_type(MetricType::SUMMARY)
1382+
.ratio_metrics("probe_hit_rate", partition);
1383+
1384+
let avg_fanout = MetricBuilder::new(metrics)
1385+
.with_type(MetricType::SUMMARY)
1386+
.ratio_metrics("avg_fanout", partition);
1387+
13741388
Self {
13751389
build_time,
13761390
build_input_batches,
@@ -1380,6 +1394,8 @@ impl BuildProbeJoinMetrics {
13801394
input_batches,
13811395
input_rows,
13821396
baseline,
1397+
probe_hit_rate,
1398+
avg_fanout,
13831399
}
13841400
}
13851401
}

0 commit comments

Comments
 (0)