diff --git a/datafusion/core/src/physical_plan/metrics/baseline.rs b/datafusion/core/src/physical_plan/metrics/baseline.rs index 9c4557fb139c..fbbb689aeef4 100644 --- a/datafusion/core/src/physical_plan/metrics/baseline.rs +++ b/datafusion/core/src/physical_plan/metrics/baseline.rs @@ -21,7 +21,7 @@ use std::task::Poll; use arrow::record_batch::RecordBatch; -use super::{Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time, Timestamp}; +use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp}; use crate::error::Result; /// Helper for creating and tracking common "baseline" metrics for @@ -43,7 +43,7 @@ use crate::error::Result; /// // when operator is finished: /// baseline_metrics.done(); /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BaselineMetrics { /// end_time is set when `ExecutionMetrics::done()` is called end_time: Timestamp, @@ -51,15 +51,6 @@ pub struct BaselineMetrics { /// amount of time the operator was actively trying to use the CPU elapsed_compute: Time, - /// count of spills during the execution of the operator - spill_count: Count, - - /// total spilled bytes during the execution of the operator - spilled_bytes: Count, - - /// current memory usage for the operator - mem_used: Gauge, - /// output rows: the total output rows output_rows: Count, } @@ -73,39 +64,28 @@ impl BaselineMetrics { Self { end_time: MetricBuilder::new(metrics).end_timestamp(partition), elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition), - spill_count: MetricBuilder::new(metrics).spill_count(partition), - spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), - mem_used: MetricBuilder::new(metrics).mem_used(partition), output_rows: MetricBuilder::new(metrics).output_rows(partition), } } + /// Returns a [`BaselineMetrics`] that updates the same `elapsed_compute` ignoring + /// all other metrics + /// + /// This is useful when an operator offloads some of its intermediate work to separate tasks + /// that as a result won't be recorded by [`Self::record_poll`] + pub fn intermediate(&self) -> BaselineMetrics { + Self { + end_time: Default::default(), + elapsed_compute: self.elapsed_compute.clone(), + output_rows: Default::default(), + } + } + /// return the metric for cpu time spend in this operator pub fn elapsed_compute(&self) -> &Time { &self.elapsed_compute } - /// return the metric for the total number of spills triggered during execution - pub fn spill_count(&self) -> &Count { - &self.spill_count - } - - /// return the metric for the total spilled bytes during execution - pub fn spilled_bytes(&self) -> &Count { - &self.spilled_bytes - } - - /// return the metric for current memory usage - pub fn mem_used(&self) -> &Gauge { - &self.mem_used - } - - /// Record a spill of `spilled_bytes` size. - pub fn record_spill(&self, spilled_bytes: usize) { - self.spill_count.add(1); - self.spilled_bytes.add(spilled_bytes); - } - /// return the metric for the total number of output rows produced pub fn output_rows(&self) -> &Count { &self.output_rows diff --git a/datafusion/core/src/physical_plan/metrics/composite.rs b/datafusion/core/src/physical_plan/metrics/composite.rs deleted file mode 100644 index 3c257805d2c5..000000000000 --- a/datafusion/core/src/physical_plan/metrics/composite.rs +++ /dev/null @@ -1,205 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Metrics common for complex operators with multiple steps. - -use crate::execution::memory_pool::MemoryPool; -use crate::physical_plan::metrics::tracker::MemTrackingMetrics; -use crate::physical_plan::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time, - Timestamp, -}; -use crate::physical_plan::Metric; -use chrono::{TimeZone, Utc}; -use std::sync::Arc; -use std::time::Duration; - -#[derive(Debug, Clone)] -/// Collects all metrics during a complex operation, which is composed of multiple steps and -/// each stage reports its statistics separately. -/// Give sort as an example, when the dataset is more significant than available memory, it will report -/// multiple in-mem sort metrics and final merge-sort metrics from `SortPreservingMergeStream`. -/// Therefore, We need a separation of metrics for which are final metrics (for output_rows accumulation), -/// and which are intermediate metrics that we only account for elapsed_compute time. -pub struct CompositeMetricsSet { - mid: ExecutionPlanMetricsSet, - final_: ExecutionPlanMetricsSet, -} - -impl Default for CompositeMetricsSet { - fn default() -> Self { - Self::new() - } -} - -impl CompositeMetricsSet { - /// Create a new aggregated set - pub fn new() -> Self { - Self { - mid: ExecutionPlanMetricsSet::new(), - final_: ExecutionPlanMetricsSet::new(), - } - } - - /// create a new intermediate baseline - pub fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics { - BaselineMetrics::new(&self.mid, partition) - } - - /// create a new final baseline - pub fn new_final_baseline(&self, partition: usize) -> BaselineMetrics { - BaselineMetrics::new(&self.final_, partition) - } - - /// create a new intermediate memory tracking metrics - pub fn new_intermediate_tracking( - &self, - partition: usize, - pool: &Arc, - ) -> MemTrackingMetrics { - MemTrackingMetrics::new(&self.mid, pool, partition) - } - - /// create a new final memory tracking metrics - pub fn new_final_tracking( - &self, - partition: usize, - pool: &Arc, - ) -> MemTrackingMetrics { - MemTrackingMetrics::new(&self.final_, pool, partition) - } - - fn merge_compute_time(&self, dest: &Time) { - let time1 = self - .mid - .clone_inner() - .elapsed_compute() - .map_or(0u64, |v| v as u64); - let time2 = self - .final_ - .clone_inner() - .elapsed_compute() - .map_or(0u64, |v| v as u64); - dest.add_duration(Duration::from_nanos(time1)); - dest.add_duration(Duration::from_nanos(time2)); - } - - fn merge_spill_count(&self, dest: &Count) { - let count1 = self.mid.clone_inner().spill_count().map_or(0, |v| v); - let count2 = self.final_.clone_inner().spill_count().map_or(0, |v| v); - dest.add(count1); - dest.add(count2); - } - - fn merge_spilled_bytes(&self, dest: &Count) { - let count1 = self.mid.clone_inner().spilled_bytes().map_or(0, |v| v); - let count2 = self.final_.clone_inner().spill_count().map_or(0, |v| v); - dest.add(count1); - dest.add(count2); - } - - fn merge_output_count(&self, dest: &Count) { - let count = self.final_.clone_inner().output_rows().map_or(0, |v| v); - dest.add(count); - } - - fn merge_start_time(&self, dest: &Timestamp) { - let start1 = self - .mid - .clone_inner() - .sum(|metric| matches!(metric.value(), MetricValue::StartTimestamp(_))) - .map(|v| v.as_usize()); - let start2 = self - .final_ - .clone_inner() - .sum(|metric| matches!(metric.value(), MetricValue::StartTimestamp(_))) - .map(|v| v.as_usize()); - match (start1, start2) { - (Some(start1), Some(start2)) => { - dest.set(Utc.timestamp_nanos(start1.min(start2) as i64)) - } - (Some(start1), None) => dest.set(Utc.timestamp_nanos(start1 as i64)), - (None, Some(start2)) => dest.set(Utc.timestamp_nanos(start2 as i64)), - (None, None) => {} - } - } - - fn merge_end_time(&self, dest: &Timestamp) { - let start1 = self - .mid - .clone_inner() - .sum(|metric| matches!(metric.value(), MetricValue::EndTimestamp(_))) - .map(|v| v.as_usize()); - let start2 = self - .final_ - .clone_inner() - .sum(|metric| matches!(metric.value(), MetricValue::EndTimestamp(_))) - .map(|v| v.as_usize()); - match (start1, start2) { - (Some(start1), Some(start2)) => { - dest.set(Utc.timestamp_nanos(start1.max(start2) as i64)) - } - (Some(start1), None) => dest.set(Utc.timestamp_nanos(start1 as i64)), - (None, Some(start2)) => dest.set(Utc.timestamp_nanos(start2 as i64)), - (None, None) => {} - } - } - - /// Aggregate all metrics into a one - pub fn aggregate_all(&self) -> MetricsSet { - let mut metrics = MetricsSet::new(); - let elapsed_time = Time::new(); - let spill_count = Count::new(); - let spilled_bytes = Count::new(); - let output_count = Count::new(); - let start_time = Timestamp::new(); - let end_time = Timestamp::new(); - - metrics.push(Arc::new(Metric::new( - MetricValue::ElapsedCompute(elapsed_time.clone()), - None, - ))); - metrics.push(Arc::new(Metric::new( - MetricValue::SpillCount(spill_count.clone()), - None, - ))); - metrics.push(Arc::new(Metric::new( - MetricValue::SpilledBytes(spilled_bytes.clone()), - None, - ))); - metrics.push(Arc::new(Metric::new( - MetricValue::OutputRows(output_count.clone()), - None, - ))); - metrics.push(Arc::new(Metric::new( - MetricValue::StartTimestamp(start_time.clone()), - None, - ))); - metrics.push(Arc::new(Metric::new( - MetricValue::EndTimestamp(end_time.clone()), - None, - ))); - - self.merge_compute_time(&elapsed_time); - self.merge_spill_count(&spill_count); - self.merge_spilled_bytes(&spilled_bytes); - self.merge_output_count(&output_count); - self.merge_start_time(&start_time); - self.merge_end_time(&end_time); - metrics - } -} diff --git a/datafusion/core/src/physical_plan/metrics/mod.rs b/datafusion/core/src/physical_plan/metrics/mod.rs index 95b579dbcf3c..652c0af5c2e4 100644 --- a/datafusion/core/src/physical_plan/metrics/mod.rs +++ b/datafusion/core/src/physical_plan/metrics/mod.rs @@ -19,8 +19,6 @@ mod baseline; mod builder; -mod composite; -mod tracker; mod value; use parking_lot::Mutex; @@ -35,8 +33,6 @@ use hashbrown::HashMap; // public exports pub use baseline::{BaselineMetrics, RecordOutput}; pub use builder::MetricBuilder; -pub use composite::CompositeMetricsSet; -pub use tracker::MemTrackingMetrics; pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp}; /// Something that tracks a value of interest (metric) of a DataFusion diff --git a/datafusion/core/src/physical_plan/metrics/tracker.rs b/datafusion/core/src/physical_plan/metrics/tracker.rs deleted file mode 100644 index f0980e6394b0..000000000000 --- a/datafusion/core/src/physical_plan/metrics/tracker.rs +++ /dev/null @@ -1,104 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Metrics with memory usage tracking capability - -use crate::physical_plan::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, Time, -}; -use std::sync::Arc; -use std::task::Poll; - -use crate::error::Result; -use crate::execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; -use arrow::record_batch::RecordBatch; - -/// Wraps a [`BaselineMetrics`] and records memory usage on a [`MemoryReservation`] -#[derive(Debug)] -pub struct MemTrackingMetrics { - reservation: MemoryReservation, - metrics: BaselineMetrics, -} - -/// Delegates most of the metrics functionalities to the inner BaselineMetrics, -/// intercept memory metrics functionalities and do memory manager bookkeeping. -impl MemTrackingMetrics { - /// Create memory tracking metrics with reference to memory manager - pub fn new( - metrics: &ExecutionPlanMetricsSet, - pool: &Arc, - partition: usize, - ) -> Self { - let reservation = MemoryConsumer::new(format!("MemTrackingMetrics[{partition}]")) - .register(pool); - - Self { - reservation, - metrics: BaselineMetrics::new(metrics, partition), - } - } - - /// return the metric for cpu time spend in this operator - pub fn elapsed_compute(&self) -> &Time { - self.metrics.elapsed_compute() - } - - /// return the size for current memory usage - pub fn mem_used(&self) -> usize { - self.metrics.mem_used().value() - } - - /// setup initial memory usage and register it with memory manager - pub fn init_mem_used(&mut self, size: usize) { - self.metrics.mem_used().set(size); - self.reservation.resize(size) - } - - /// return the metric for the total number of output rows produced - pub fn output_rows(&self) -> &Count { - self.metrics.output_rows() - } - - /// Records the fact that this operator's execution is complete - /// (recording the `end_time` metric). - /// - /// Note care should be taken to call `done()` manually if - /// `MemTrackingMetrics` is not `drop`ped immediately upon operator - /// completion, as async streams may not be dropped immediately - /// depending on the consumer. - pub fn done(&self) { - self.metrics.done() - } - - /// Record that some number of rows have been produced as output - /// - /// See the [`super::RecordOutput`] for conveniently recording record - /// batch output for other thing - pub fn record_output(&self, num_rows: usize) { - self.metrics.record_output(num_rows) - } - - /// Process a poll result of a stream producing output for an - /// operator, recording the output rows and stream done time and - /// returning the same poll result - pub fn record_poll( - &self, - poll: Poll>>, - ) -> Poll>> { - self.metrics.record_poll(poll) - } -} diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs b/datafusion/core/src/physical_plan/sorts/merge.rs index e6533deaba13..e6a569c711a7 100644 --- a/datafusion/core/src/physical_plan/sorts/merge.rs +++ b/datafusion/core/src/physical_plan/sorts/merge.rs @@ -16,7 +16,7 @@ // under the License. use crate::common::Result; -use crate::physical_plan::metrics::MemTrackingMetrics; +use crate::physical_plan::metrics::BaselineMetrics; use crate::physical_plan::sorts::builder::BatchBuilder; use crate::physical_plan::sorts::cursor::Cursor; use crate::physical_plan::sorts::stream::{ @@ -55,7 +55,7 @@ pub(crate) fn streaming_merge( streams: Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], - tracking_metrics: MemTrackingMetrics, + metrics: BaselineMetrics, batch_size: usize, ) -> Result { // Special case single column comparisons with optimized cursor implementations @@ -63,11 +63,11 @@ pub(crate) fn streaming_merge( let sort = expressions[0].clone(); let data_type = sort.expr.data_type(schema.as_ref())?; downcast_primitive! { - data_type => (primitive_merge_helper, sort, streams, schema, tracking_metrics, batch_size), - DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, tracking_metrics, batch_size) - DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, tracking_metrics, batch_size) - DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, tracking_metrics, batch_size) - DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, tracking_metrics, batch_size) + data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size), + DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size) + DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size) + DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size) + DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size) _ => {} } } @@ -76,7 +76,7 @@ pub(crate) fn streaming_merge( Ok(Box::pin(SortPreservingMergeStream::new( Box::new(streams), schema, - tracking_metrics, + metrics, batch_size, ))) } @@ -92,7 +92,7 @@ struct SortPreservingMergeStream { streams: CursorStream, /// used to record execution metrics - tracking_metrics: MemTrackingMetrics, + metrics: BaselineMetrics, /// If the stream has encountered an error aborted: bool, @@ -146,7 +146,7 @@ impl SortPreservingMergeStream { fn new( streams: CursorStream, schema: SchemaRef, - tracking_metrics: MemTrackingMetrics, + metrics: BaselineMetrics, batch_size: usize, ) -> Self { let stream_count = streams.partitions(); @@ -154,7 +154,7 @@ impl SortPreservingMergeStream { Self { in_progress: BatchBuilder::new(schema, stream_count, batch_size), streams, - tracking_metrics, + metrics, aborted: false, cursors: (0..stream_count).map(|_| None).collect(), loser_tree: vec![], @@ -209,7 +209,7 @@ impl SortPreservingMergeStream { // NB timer records time taken on drop, so there are no // calls to `timer.done()` below. - let elapsed_compute = self.tracking_metrics.elapsed_compute().clone(); + let elapsed_compute = self.metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); loop { @@ -347,7 +347,7 @@ impl Stream for SortPreservingMergeStream { cx: &mut Context<'_>, ) -> Poll> { let poll = self.poll_next_inner(cx); - self.tracking_metrics.record_poll(poll) + self.metrics.record_poll(poll) } } diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 544abb24740d..35dac19b27c6 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -28,7 +28,7 @@ use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::common::{batch_byte_size, spawn_buffered, IPCWriter}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ - BaselineMetrics, CompositeMetricsSet, MemTrackingMetrics, MetricsSet, + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; use crate::physical_plan::sorts::merge::streaming_merge; use crate::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; @@ -36,7 +36,6 @@ use crate::physical_plan::{ DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; -use crate::prelude::SessionConfig; pub use arrow::compute::SortOptions; use arrow::compute::{concat_batches, lexsort_to_indices, take}; use arrow::datatypes::SchemaRef; @@ -56,6 +55,27 @@ use tempfile::NamedTempFile; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task; +struct ExternalSorterMetrics { + /// metrics + baseline: BaselineMetrics, + + /// count of spills during the execution of the operator + spill_count: Count, + + /// total spilled bytes during the execution of the operator + spilled_bytes: Count, +} + +impl ExternalSorterMetrics { + fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + Self { + baseline: BaselineMetrics::new(metrics, partition), + spill_count: MetricBuilder::new(metrics).spill_count(partition), + spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), + } + } +} + /// Sort arbitrary size of data to get a total order (may spill several times during sorting based on free memory available). /// /// The basic architecture of the algorithm: @@ -72,13 +92,12 @@ struct ExternalSorter { spills: Vec, /// Sort expressions expr: Arc<[PhysicalSortExpr]>, - session_config: Arc, - runtime: Arc, - metrics_set: CompositeMetricsSet, - metrics: BaselineMetrics, + metrics: ExternalSorterMetrics, fetch: Option, reservation: MemoryReservation, partition_id: usize, + runtime: Arc, + batch_size: usize, } impl ExternalSorter { @@ -86,13 +105,12 @@ impl ExternalSorter { partition_id: usize, schema: SchemaRef, expr: Vec, - metrics_set: CompositeMetricsSet, - session_config: Arc, - runtime: Arc, + batch_size: usize, fetch: Option, + metrics: &ExecutionPlanMetricsSet, + runtime: Arc, ) -> Self { - let metrics = metrics_set.new_intermediate_baseline(partition_id); - + let metrics = ExternalSorterMetrics::new(metrics, partition_id); let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]")) .with_can_spill(true) .register(&runtime.memory_pool); @@ -103,13 +121,12 @@ impl ExternalSorter { in_mem_batches_sorted: true, spills: vec![], expr: expr.into(), - session_config, - runtime, - metrics_set, metrics, fetch, reservation, partition_id, + runtime, + batch_size, } } @@ -141,7 +158,7 @@ impl ExternalSorter { self.reservation.try_grow(size)? } } - self.metrics.mem_used().add(size); + self.in_mem_batches.push(input); self.in_mem_batches_sorted = false; Ok(()) @@ -154,17 +171,10 @@ impl ExternalSorter { /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. fn sort(&mut self) -> Result { if self.spilled_before() { - let intermediate_metrics = self - .metrics_set - .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool); - - let merge_metrics = self - .metrics_set - .new_final_tracking(self.partition_id, &self.runtime.memory_pool); - let mut streams = vec![]; if !self.in_mem_batches.is_empty() { - let in_mem_stream = self.in_mem_sort_stream(intermediate_metrics)?; + let in_mem_stream = + self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; streams.push(in_mem_stream); } @@ -177,14 +187,11 @@ impl ExternalSorter { streams, self.schema.clone(), &self.expr, - merge_metrics, - self.session_config.batch_size(), + self.metrics.baseline.clone(), + self.batch_size, ) } else if !self.in_mem_batches.is_empty() { - let tracking_metrics = self - .metrics_set - .new_final_tracking(self.partition_id, &self.runtime.memory_pool); - let result = self.in_mem_sort_stream(tracking_metrics); + let result = self.in_mem_sort_stream(self.metrics.baseline.clone()); // Report to the memory manager we are no longer using memory self.reservation.free(); result @@ -194,15 +201,15 @@ impl ExternalSorter { } fn used(&self) -> usize { - self.metrics.mem_used().value() + self.reservation.size() } fn spilled_bytes(&self) -> usize { - self.metrics.spilled_bytes().value() + self.metrics.spilled_bytes.value() } fn spill_count(&self) -> usize { - self.metrics.spill_count().value() + self.metrics.spill_count.value() } async fn spill(&mut self) -> Result { @@ -218,9 +225,9 @@ impl ExternalSorter { let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?; let batches = std::mem::take(&mut self.in_mem_batches); spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?; - self.reservation.free(); - let used = self.metrics.mem_used().set(0); - self.metrics.record_spill(used); + let used = self.reservation.free(); + self.metrics.spill_count.add(1); + self.metrics.spilled_bytes.add(used); self.spills.push(spillfile); Ok(used) } @@ -231,12 +238,8 @@ impl ExternalSorter { return Ok(()); } - let tracking_metrics = self - .metrics_set - .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool); - self.in_mem_batches = self - .in_mem_sort_stream(tracking_metrics)? + .in_mem_sort_stream(self.metrics.baseline.intermediate())? .try_collect() .await?; @@ -246,7 +249,6 @@ impl ExternalSorter { .map(|x| x.get_array_memory_size()) .sum(); - self.metrics.mem_used().set(size); self.reservation.resize(size); self.in_mem_batches_sorted = true; Ok(()) @@ -255,13 +257,12 @@ impl ExternalSorter { /// Consumes in_mem_batches returning a sorted stream fn in_mem_sort_stream( &mut self, - metrics: MemTrackingMetrics, + metrics: BaselineMetrics, ) -> Result { assert_ne!(self.in_mem_batches.len(), 0); if self.in_mem_batches.len() == 1 { let batch = self.in_mem_batches.remove(0); - let stream = - sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)?; + let stream = self.sort_batch_stream(batch, metrics)?; self.in_mem_batches.clear(); return Ok(stream); } @@ -273,21 +274,14 @@ impl ExternalSorter { // Concatenate memory batches together and sort let batch = concat_batches(&self.schema, &self.in_mem_batches)?; self.in_mem_batches.clear(); - return sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics); + return self.sort_batch_stream(batch, metrics); } - let streams = self - .in_mem_batches - .drain(..) + let streams = std::mem::take(&mut self.in_mem_batches) + .into_iter() .map(|batch| { - let metrics = self.metrics_set.new_intermediate_tracking( - self.partition_id, - &self.runtime.memory_pool, - ); - Ok(spawn_buffered( - sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)?, - 1, - )) + let metrics = self.metrics.baseline.intermediate(); + Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1)) }) .collect::>()?; @@ -298,9 +292,35 @@ impl ExternalSorter { self.schema.clone(), &self.expr, metrics, - self.session_config.batch_size(), + self.batch_size, ) } + + fn sort_batch_stream( + &self, + batch: RecordBatch, + metrics: BaselineMetrics, + ) -> Result { + let schema = batch.schema(); + + let mut reservation = + MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id)) + .register(&self.runtime.memory_pool); + + // TODO: This should probably be try_grow (#5885) + reservation.resize(batch.get_array_memory_size()); + + let fetch = self.fetch; + let expressions = self.expr.clone(); + let stream = futures::stream::once(futures::future::lazy(move |_| { + let sorted = sort_batch(&batch, &expressions, fetch)?; + metrics.record_output(sorted.num_rows()); + drop(batch); + reservation.free(); + Ok(sorted) + })); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } } impl Debug for ExternalSorter { @@ -313,22 +333,6 @@ impl Debug for ExternalSorter { } } -fn sort_batch_stream( - batch: RecordBatch, - expressions: Arc<[PhysicalSortExpr]>, - fetch: Option, - mut tracking_metrics: MemTrackingMetrics, -) -> Result { - let schema = batch.schema(); - tracking_metrics.init_mem_used(batch.get_array_memory_size()); - let stream = futures::stream::once(futures::future::lazy(move |_| { - let sorted = sort_batch(&batch, &expressions, fetch)?; - tracking_metrics.record_output(sorted.num_rows()); - Ok(sorted) - })); - Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) -} - fn sort_batch( batch: &RecordBatch, expressions: &[PhysicalSortExpr], @@ -424,7 +428,7 @@ pub struct SortExec { /// Sort expressions expr: Vec, /// Containing all metrics set created during sort - metrics_set: CompositeMetricsSet, + metrics_set: ExecutionPlanMetricsSet, /// Preserve partitions of input plan. If false, the input partitions /// will be sorted and merged into a single output partition. preserve_partitioning: bool, @@ -449,7 +453,7 @@ impl SortExec { Self { expr, input, - metrics_set: CompositeMetricsSet::new(), + metrics_set: ExecutionPlanMetricsSet::new(), preserve_partitioning: false, fetch: None, } @@ -586,26 +590,35 @@ impl ExecutionPlan for SortExec { ) -> Result { trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); - let input = self.input.execute(partition, context.clone())?; + let mut input = self.input.execute(partition, context.clone())?; trace!("End SortExec's input.execute for partition: {}", partition); + let mut sorter = ExternalSorter::new( + partition, + input.schema(), + self.expr.clone(), + context.session_config().batch_size(), + self.fetch, + &self.metrics_set, + context.runtime_env(), + ); + Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), - futures::stream::once(do_sort( - input, - partition, - self.expr.clone(), - self.metrics_set.clone(), - context, - self.fetch(), - )) + futures::stream::once(async move { + while let Some(batch) = input.next().await { + let batch = batch?; + sorter.insert_batch(batch).await?; + } + sorter.sort() + }) .try_flatten(), ))) } fn metrics(&self) -> Option { - Some(self.metrics_set.aggregate_all()) + Some(self.metrics_set.clone_inner()) } fn fmt_as( @@ -631,44 +644,6 @@ impl ExecutionPlan for SortExec { } } -async fn do_sort( - mut input: SendableRecordBatchStream, - partition_id: usize, - expr: Vec, - metrics_set: CompositeMetricsSet, - context: Arc, - fetch: Option, -) -> Result { - trace!( - "Start do_sort for partition {} of context session_id {} and task_id {:?}", - partition_id, - context.session_id(), - context.task_id() - ); - let schema = input.schema(); - let mut sorter = ExternalSorter::new( - partition_id, - schema.clone(), - expr, - metrics_set, - Arc::new(context.session_config().clone()), - context.runtime_env(), - fetch, - ); - while let Some(batch) = input.next().await { - let batch = batch?; - sorter.insert_batch(batch).await?; - } - let result = sorter.sort(); - trace!( - "End do_sort for partition {} of context session_id {} and task_id {:?}", - partition_id, - context.session_id(), - context.task_id() - ); - result -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 88916c93f0f4..e346eccbeecb 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -27,7 +27,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; use crate::physical_plan::common::spawn_buffered; use crate::physical_plan::metrics::{ - ExecutionPlanMetricsSet, MemTrackingMetrics, MetricsSet, + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; use crate::physical_plan::sorts::streaming_merge; use crate::physical_plan::{ @@ -158,9 +158,6 @@ impl ExecutionPlan for SortPreservingMergeExec { ))); } - let tracking_metrics = - MemTrackingMetrics::new(&self.metrics, context.memory_pool(), partition); - let input_partitions = self.input.output_partitioning().partition_count(); trace!( "Number of input partitions of SortPreservingMergeExec::execute: {}", @@ -193,7 +190,7 @@ impl ExecutionPlan for SortPreservingMergeExec { receivers, schema, &self.expr, - tracking_metrics, + BaselineMetrics::new(&self.metrics, partition), context.session_config().batch_size(), )?; @@ -813,14 +810,12 @@ mod tests { } let metrics = ExecutionPlanMetricsSet::new(); - let tracking_metrics = - MemTrackingMetrics::new(&metrics, task_ctx.memory_pool(), 0); let merge_stream = streaming_merge( streams, batches.schema(), sort.as_slice(), - tracking_metrics, + BaselineMetrics::new(&metrics, 0), task_ctx.session_config().batch_size(), ) .unwrap();