@@ -36,7 +36,9 @@ use crate::joins::utils::{
3636 OnceAsync , OnceFut ,
3737} ;
3838use crate :: joins:: SharedBitmapBuilder ;
39- use crate :: metrics:: { Count , ExecutionPlanMetricsSet , MetricsSet } ;
39+ use crate :: metrics:: {
40+ Count , ExecutionPlanMetricsSet , MetricBuilder , MetricType , MetricsSet , RatioMetrics ,
41+ } ;
4042use crate :: projection:: {
4143 try_embed_projection, try_pushdown_through_join, EmbeddedProjection , JoinData ,
4244 ProjectionExec ,
@@ -496,7 +498,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
496498 ) ;
497499 }
498500
499- let join_metrics = BuildProbeJoinMetrics :: new ( partition , & self . metrics ) ;
501+ let metrics = NestedLoopJoinMetrics :: new ( & self . metrics , partition ) ;
500502
501503 // Initialization reservation for load of inner table
502504 let load_reservation =
@@ -508,7 +510,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
508510
509511 Ok ( collect_left_input (
510512 stream,
511- join_metrics. clone ( ) ,
513+ metrics . join_metrics . clone ( ) ,
512514 load_reservation,
513515 need_produce_result_in_final ( self . join_type ) ,
514516 self . right ( ) . output_partitioning ( ) . partition_count ( ) ,
@@ -535,7 +537,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
535537 probe_side_data,
536538 build_side_data,
537539 column_indices_after_projection,
538- join_metrics ,
540+ metrics ,
539541 batch_size,
540542 ) ) )
541543 }
@@ -749,7 +751,7 @@ pub(crate) struct NestedLoopJoinStream {
749751 /// the join filter (e.g., `JOIN ON (b+c)>0`).
750752 pub ( crate ) column_indices : Vec < ColumnIndex > ,
751753 /// Join execution metrics
752- pub ( crate ) join_metrics : BuildProbeJoinMetrics ,
754+ pub ( crate ) metrics : NestedLoopJoinMetrics ,
753755
754756 /// `batch_size` from configuration
755757 batch_size : usize ,
@@ -794,6 +796,24 @@ pub(crate) struct NestedLoopJoinStream {
794796 current_right_batch_matched : Option < BooleanArray > ,
795797}
796798
799+ pub ( crate ) struct NestedLoopJoinMetrics {
800+ /// Join execution metrics
801+ pub ( crate ) join_metrics : BuildProbeJoinMetrics ,
802+ /// Selectivity of the join: output_rows / (left_rows * right_rows)
803+ pub ( crate ) selectivity : RatioMetrics ,
804+ }
805+
806+ impl NestedLoopJoinMetrics {
807+ pub fn new ( metrics : & ExecutionPlanMetricsSet , partition : usize ) -> Self {
808+ Self {
809+ join_metrics : BuildProbeJoinMetrics :: new ( partition, metrics) ,
810+ selectivity : MetricBuilder :: new ( metrics)
811+ . with_type ( MetricType :: SUMMARY )
812+ . ratio_metrics ( "selectivity" , partition) ,
813+ }
814+ }
815+ }
816+
797817impl Stream for NestedLoopJoinStream {
798818 type Item = Result < RecordBatch > ;
799819
@@ -844,7 +864,7 @@ impl Stream for NestedLoopJoinStream {
844864 // -side batches), related metrics except build time will be
845865 // updated.
846866 // stop on drop
847- let build_metric = self . join_metrics . build_time . clone ( ) ;
867+ let build_metric = self . metrics . join_metrics . build_time . clone ( ) ;
848868 let _build_timer = build_metric. timer ( ) ;
849869
850870 match self . handle_buffering_left ( cx) {
@@ -878,7 +898,7 @@ impl Stream for NestedLoopJoinStream {
878898 NLJState :: FetchingRight => {
879899 debug ! ( "[NLJState] Entering: {:?}" , self . state) ;
880900 // stop on drop
881- let join_metric = self . join_metrics . join_time . clone ( ) ;
901+ let join_metric = self . metrics . join_metrics . join_time . clone ( ) ;
882902 let _join_timer = join_metric. timer ( ) ;
883903
884904 match self . handle_fetching_right ( cx) {
@@ -905,13 +925,13 @@ impl Stream for NestedLoopJoinStream {
905925 debug ! ( "[NLJState] Entering: {:?}" , self . state) ;
906926
907927 // stop on drop
908- let join_metric = self . join_metrics . join_time . clone ( ) ;
928+ let join_metric = self . metrics . join_metrics . join_time . clone ( ) ;
909929 let _join_timer = join_metric. timer ( ) ;
910930
911931 match self . handle_probe_right ( ) {
912932 ControlFlow :: Continue ( ( ) ) => continue ,
913933 ControlFlow :: Break ( poll) => {
914- return self . join_metrics . baseline . record_poll ( poll)
934+ return self . metrics . join_metrics . baseline . record_poll ( poll)
915935 }
916936 }
917937 }
@@ -926,13 +946,13 @@ impl Stream for NestedLoopJoinStream {
926946 debug ! ( "[NLJState] Entering: {:?}" , self . state) ;
927947
928948 // stop on drop
929- let join_metric = self . join_metrics . join_time . clone ( ) ;
949+ let join_metric = self . metrics . join_metrics . join_time . clone ( ) ;
930950 let _join_timer = join_metric. timer ( ) ;
931951
932952 match self . handle_emit_right_unmatched ( ) {
933953 ControlFlow :: Continue ( ( ) ) => continue ,
934954 ControlFlow :: Break ( poll) => {
935- return self . join_metrics . baseline . record_poll ( poll)
955+ return self . metrics . join_metrics . baseline . record_poll ( poll)
936956 }
937957 }
938958 }
@@ -956,13 +976,13 @@ impl Stream for NestedLoopJoinStream {
956976 debug ! ( "[NLJState] Entering: {:?}" , self . state) ;
957977
958978 // stop on drop
959- let join_metric = self . join_metrics . join_time . clone ( ) ;
979+ let join_metric = self . metrics . join_metrics . join_time . clone ( ) ;
960980 let _join_timer = join_metric. timer ( ) ;
961981
962982 match self . handle_emit_left_unmatched ( ) {
963983 ControlFlow :: Continue ( ( ) ) => continue ,
964984 ControlFlow :: Break ( poll) => {
965- return self . join_metrics . baseline . record_poll ( poll)
985+ return self . metrics . join_metrics . baseline . record_poll ( poll)
966986 }
967987 }
968988 }
@@ -972,13 +992,13 @@ impl Stream for NestedLoopJoinStream {
972992 debug ! ( "[NLJState] Entering: {:?}" , self . state) ;
973993
974994 // stop on drop
975- let join_metric = self . join_metrics . join_time . clone ( ) ;
995+ let join_metric = self . metrics . join_metrics . join_time . clone ( ) ;
976996 let _join_timer = join_metric. timer ( ) ;
977997 // counting it in join timer due to there might be some
978998 // final resout batches to output in this state
979999
9801000 let poll = self . handle_done ( ) ;
981- return self . join_metrics . baseline . record_poll ( poll) ;
1001+ return self . metrics . join_metrics . baseline . record_poll ( poll) ;
9821002 }
9831003 }
9841004 }
@@ -1000,7 +1020,7 @@ impl NestedLoopJoinStream {
10001020 right_data : SendableRecordBatchStream ,
10011021 left_data : OnceFut < JoinLeftData > ,
10021022 column_indices : Vec < ColumnIndex > ,
1003- join_metrics : BuildProbeJoinMetrics ,
1023+ metrics : NestedLoopJoinMetrics ,
10041024 batch_size : usize ,
10051025 ) -> Self {
10061026 Self {
@@ -1010,7 +1030,7 @@ impl NestedLoopJoinStream {
10101030 right_data,
10111031 column_indices,
10121032 left_data,
1013- join_metrics ,
1033+ metrics ,
10141034 buffered_left_data : None ,
10151035 output_buffer : Box :: new ( BatchCoalescer :: new ( schema, batch_size) ) ,
10161036 batch_size,
@@ -1057,8 +1077,8 @@ impl NestedLoopJoinStream {
10571077 Some ( Ok ( right_batch) ) => {
10581078 // Update metrics
10591079 let right_batch_size = right_batch. num_rows ( ) ;
1060- self . join_metrics . input_rows . add ( right_batch_size) ;
1061- self . join_metrics . input_batches . add ( 1 ) ;
1080+ self . metrics . join_metrics . input_rows . add ( right_batch_size) ;
1081+ self . metrics . join_metrics . input_batches . add ( 1 ) ;
10621082
10631083 // Skip the empty batch
10641084 if right_batch_size == 0 {
@@ -1108,6 +1128,17 @@ impl NestedLoopJoinStream {
11081128 Ok ( false ) => {
11091129 // Left exhausted, transition to FetchingRight
11101130 self . left_probe_idx = 0 ;
1131+
1132+ // Selectivity Metric: Update total possibilities for the batch (left_rows * right_rows)
1133+ // If memory-limited execution is implemented, this logic must be updated accordingly.
1134+ if let ( Ok ( left_data) , Some ( right_batch) ) =
1135+ ( self . get_left_data ( ) , self . current_right_batch . as_ref ( ) )
1136+ {
1137+ let left_rows = left_data. batch ( ) . num_rows ( ) ;
1138+ let right_rows = right_batch. num_rows ( ) ;
1139+ self . metrics . selectivity . add_total ( left_rows * right_rows) ;
1140+ }
1141+
11111142 if self . should_track_unmatched_right {
11121143 debug_assert ! (
11131144 self . current_right_batch_matched. is_some( ) ,
@@ -1138,7 +1169,6 @@ impl NestedLoopJoinStream {
11381169 && self . current_right_batch. is_some( ) ,
11391170 "This state is yielding output for unmatched rows in the current right batch, so both the right batch and the bitmap must be present"
11401171 ) ;
1141-
11421172 // Construct the result batch for unmatched right rows using a utility function
11431173 match self . process_right_unmatched ( ) {
11441174 Ok ( Some ( batch) ) => {
@@ -1205,7 +1235,7 @@ impl NestedLoopJoinStream {
12051235 // should be with the expected schema for this operator
12061236 if !self . handled_empty_output {
12071237 let zero_count = Count :: new ( ) ;
1208- if * self . join_metrics . baseline . output_rows ( ) == zero_count {
1238+ if * self . metrics . join_metrics . baseline . output_rows ( ) == zero_count {
12091239 let empty_batch = RecordBatch :: new_empty ( Arc :: clone ( & self . output_schema ) ) ;
12101240 self . handled_empty_output = true ;
12111241 return Poll :: Ready ( Some ( Ok ( empty_batch) ) ) ;
@@ -1455,7 +1485,11 @@ impl NestedLoopJoinStream {
14551485 if let Some ( batch) = self . output_buffer . next_completed_batch ( ) {
14561486 // HACK: this is not part of `BaselineMetrics` yet, so update it
14571487 // manually
1458- self . join_metrics . output_batches . add ( 1 ) ;
1488+ self . metrics . join_metrics . output_batches . add ( 1 ) ;
1489+
1490+ // Update output rows for selectivity metric
1491+ let output_rows = batch. num_rows ( ) ;
1492+ self . metrics . selectivity . add_part ( output_rows) ;
14591493
14601494 return Some ( Poll :: Ready ( Some ( Ok ( batch) ) ) ) ;
14611495 }
0 commit comments