diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index a8c209a492ba..9a6832283486 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -41,7 +41,8 @@ use crate::joins::utils::{ JoinOnRef, }; use crate::metrics::{ - Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, SpillMetrics, + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, + SpillMetrics, }; use crate::projection::{ join_allows_pushdown, join_table_borders, new_join_children, @@ -609,8 +610,8 @@ struct SortMergeJoinMetrics { input_rows: Count, /// Number of batches produced by this operator output_batches: Count, - /// Number of rows produced by this operator - output_rows: Count, + /// Execution metrics + baseline_metrics: BaselineMetrics, /// Peak memory used for buffered data. /// Calculated as sum of peak memory values across partitions peak_mem_used: metrics::Gauge, @@ -627,16 +628,17 @@ impl SortMergeJoinMetrics { let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); let output_batches = MetricBuilder::new(metrics).counter("output_batches", partition); - let output_rows = MetricBuilder::new(metrics).output_rows(partition); let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition); let spill_metrics = SpillMetrics::new(metrics, partition); + let baseline_metrics = BaselineMetrics::new(metrics, partition); + Self { join_time, input_batches, input_rows, output_batches, - output_rows, + baseline_metrics, peak_mem_used, spill_metrics, } @@ -2032,7 +2034,9 @@ impl SortMergeJoinStream { let record_batch = concat_batches(&self.schema, &self.staging_output_record_batches.batches)?; self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(record_batch.num_rows()); + self.join_metrics + .baseline_metrics + .record_output(record_batch.num_rows()); // If join filter exists, `self.output_size` is not accurate as we don't know the exact // number of rows in the output record batch. If streamed row joined with buffered rows, // once join filter is applied, the number of output rows may be more than 1. @@ -2059,6 +2063,7 @@ impl SortMergeJoinStream { { self.staging_output_record_batches.batches.clear(); } + Ok(record_batch) }