@@ -42,7 +42,7 @@ use crate::{
4242 RecordBatchStream , SendableRecordBatchStream ,
4343} ;
4444
45- use arrow:: array:: { ArrayRef , UInt32Array , UInt64Array } ;
45+ use arrow:: array:: { Array , ArrayRef , UInt32Array , UInt64Array } ;
4646use arrow:: datatypes:: { Schema , SchemaRef } ;
4747use arrow:: record_batch:: RecordBatch ;
4848use datafusion_common:: {
@@ -296,6 +296,35 @@ pub(super) fn lookup_join_hashmap(
296296 Ok ( ( build_indices, probe_indices, next_offset) )
297297}
298298
299+ /// Counts the number of distinct elements in the input array.
300+ ///
301+ /// The input array must be sorted (e.g., `[0, 1, 1, 2, 2, ...]`) and contain no null values.
302+ #[ inline]
303+ fn count_distinct_sorted_indices ( indices : & UInt32Array ) -> usize {
304+ if indices. is_empty ( ) {
305+ return 0 ;
306+ }
307+
308+ debug_assert ! ( indices. null_count( ) == 0 ) ;
309+
310+ let values_buf = indices. values ( ) ;
311+ let values = values_buf. as_ref ( ) ;
312+ let mut iter = values. iter ( ) ;
313+ let Some ( & first) = iter. next ( ) else {
314+ return 0 ;
315+ } ;
316+
317+ let mut count = 1usize ;
318+ let mut last = first;
319+ for & value in iter {
320+ if value != last {
321+ last = value;
322+ count += 1 ;
323+ }
324+ }
325+ count
326+ }
327+
299328impl HashJoinStream {
300329 #[ allow( clippy:: too_many_arguments) ]
301330 pub ( super ) fn new (
@@ -517,21 +546,7 @@ impl HashJoinStream {
517546 state. offset ,
518547 ) ?;
519548
520- let mut last_seen: Option < u32 > = None ;
521- let distinct_right_indices_count = right_indices
522- . iter ( )
523- . filter ( |ele| match ele {
524- Some ( ele_val) => {
525- if last_seen. is_none ( ) || last_seen. unwrap ( ) != * ele_val {
526- last_seen = Some ( * ele_val) ;
527- true
528- } else {
529- false
530- }
531- }
532- None => false ,
533- } )
534- . count ( ) ;
549+ let distinct_right_indices_count = count_distinct_sorted_indices ( & right_indices) ;
535550
536551 self . join_metrics
537552 . probe_hit_rate
0 commit comments