Skip to content

Commit

Permalink
blaze: Revert "Cleanup ExternalSorter metrics (apache#5885) (apache#6364
Browse files Browse the repository at this point in the history
)"

This reverts commit cf81117.
  • Loading branch information
zhangli20 committed Jul 5, 2023
1 parent 4bd7b18 commit fc65a6e
Show file tree
Hide file tree
Showing 7 changed files with 495 additions and 131 deletions.
52 changes: 36 additions & 16 deletions datafusion/core/src/physical_plan/metrics/baseline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use std::task::Poll;

use arrow::record_batch::RecordBatch;

use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
use datafusion_common::Result;
use super::{Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time, Timestamp};
use crate::error::Result;

/// Helper for creating and tracking common "baseline" metrics for
/// each operator
Expand All @@ -43,14 +43,23 @@ use datafusion_common::Result;
/// // when operator is finished:
/// baseline_metrics.done();
/// ```
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct BaselineMetrics {
/// end_time is set when `ExecutionMetrics::done()` is called
end_time: Timestamp,

/// amount of time the operator was actively trying to use the CPU
elapsed_compute: Time,

/// count of spills during the execution of the operator
spill_count: Count,

/// total spilled bytes during the execution of the operator
spilled_bytes: Count,

/// current memory usage for the operator
mem_used: Gauge,

/// output rows: the total output rows
output_rows: Count,
}
Expand All @@ -64,28 +73,39 @@ impl BaselineMetrics {
Self {
end_time: MetricBuilder::new(metrics).end_timestamp(partition),
elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition),
spill_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
mem_used: MetricBuilder::new(metrics).mem_used(partition),
output_rows: MetricBuilder::new(metrics).output_rows(partition),
}
}

/// Returns a [`BaselineMetrics`] that updates the same `elapsed_compute` ignoring
/// all other metrics
///
/// This is useful when an operator offloads some of its intermediate work to separate tasks
/// that as a result won't be recorded by [`Self::record_poll`]
pub fn intermediate(&self) -> BaselineMetrics {
Self {
end_time: Default::default(),
elapsed_compute: self.elapsed_compute.clone(),
output_rows: Default::default(),
}
}

/// return the metric for cpu time spend in this operator
pub fn elapsed_compute(&self) -> &Time {
&self.elapsed_compute
}

/// return the metric for the total number of spills triggered during execution
pub fn spill_count(&self) -> &Count {
&self.spill_count
}

/// return the metric for the total spilled bytes during execution
pub fn spilled_bytes(&self) -> &Count {
&self.spilled_bytes
}

/// return the metric for current memory usage
pub fn mem_used(&self) -> &Gauge {
&self.mem_used
}

/// Record a spill of `spilled_bytes` size.
pub fn record_spill(&self, spilled_bytes: usize) {
self.spill_count.add(1);
self.spilled_bytes.add(spilled_bytes);
}

/// return the metric for the total number of output rows produced
pub fn output_rows(&self) -> &Count {
&self.output_rows
Expand Down
205 changes: 205 additions & 0 deletions datafusion/core/src/physical_plan/metrics/composite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Metrics common for complex operators with multiple steps.
use crate::execution::memory_pool::MemoryPool;
use crate::physical_plan::metrics::tracker::MemTrackingMetrics;
use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time,
Timestamp,
};
use crate::physical_plan::Metric;
use chrono::{TimeZone, Utc};
use std::sync::Arc;
use std::time::Duration;

#[derive(Debug, Clone)]
/// Collects all metrics during a complex operation, which is composed of multiple steps and
/// each stage reports its statistics separately.
/// Give sort as an example, when the dataset is more significant than available memory, it will report
/// multiple in-mem sort metrics and final merge-sort metrics from `SortPreservingMergeStream`.
/// Therefore, We need a separation of metrics for which are final metrics (for output_rows accumulation),
/// and which are intermediate metrics that we only account for elapsed_compute time.
pub struct CompositeMetricsSet {
mid: ExecutionPlanMetricsSet,
final_: ExecutionPlanMetricsSet,
}

impl Default for CompositeMetricsSet {
fn default() -> Self {
Self::new()
}
}

impl CompositeMetricsSet {
/// Create a new aggregated set
pub fn new() -> Self {
Self {
mid: ExecutionPlanMetricsSet::new(),
final_: ExecutionPlanMetricsSet::new(),
}
}

/// create a new intermediate baseline
pub fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics {
BaselineMetrics::new(&self.mid, partition)
}

/// create a new final baseline
pub fn new_final_baseline(&self, partition: usize) -> BaselineMetrics {
BaselineMetrics::new(&self.final_, partition)
}

/// create a new intermediate memory tracking metrics
pub fn new_intermediate_tracking(
&self,
partition: usize,
pool: &Arc<dyn MemoryPool>,
) -> MemTrackingMetrics {
MemTrackingMetrics::new(&self.mid, pool, partition)
}

/// create a new final memory tracking metrics
pub fn new_final_tracking(
&self,
partition: usize,
pool: &Arc<dyn MemoryPool>,
) -> MemTrackingMetrics {
MemTrackingMetrics::new(&self.final_, pool, partition)
}

fn merge_compute_time(&self, dest: &Time) {
let time1 = self
.mid
.clone_inner()
.elapsed_compute()
.map_or(0u64, |v| v as u64);
let time2 = self
.final_
.clone_inner()
.elapsed_compute()
.map_or(0u64, |v| v as u64);
dest.add_duration(Duration::from_nanos(time1));
dest.add_duration(Duration::from_nanos(time2));
}

fn merge_spill_count(&self, dest: &Count) {
let count1 = self.mid.clone_inner().spill_count().map_or(0, |v| v);
let count2 = self.final_.clone_inner().spill_count().map_or(0, |v| v);
dest.add(count1);
dest.add(count2);
}

fn merge_spilled_bytes(&self, dest: &Count) {
let count1 = self.mid.clone_inner().spilled_bytes().map_or(0, |v| v);
let count2 = self.final_.clone_inner().spill_count().map_or(0, |v| v);
dest.add(count1);
dest.add(count2);
}

fn merge_output_count(&self, dest: &Count) {
let count = self.final_.clone_inner().output_rows().map_or(0, |v| v);
dest.add(count);
}

fn merge_start_time(&self, dest: &Timestamp) {
let start1 = self
.mid
.clone_inner()
.sum(|metric| matches!(metric.value(), MetricValue::StartTimestamp(_)))
.map(|v| v.as_usize());
let start2 = self
.final_
.clone_inner()
.sum(|metric| matches!(metric.value(), MetricValue::StartTimestamp(_)))
.map(|v| v.as_usize());
match (start1, start2) {
(Some(start1), Some(start2)) => {
dest.set(Utc.timestamp_nanos(start1.min(start2) as i64))
}
(Some(start1), None) => dest.set(Utc.timestamp_nanos(start1 as i64)),
(None, Some(start2)) => dest.set(Utc.timestamp_nanos(start2 as i64)),
(None, None) => {}
}
}

fn merge_end_time(&self, dest: &Timestamp) {
let start1 = self
.mid
.clone_inner()
.sum(|metric| matches!(metric.value(), MetricValue::EndTimestamp(_)))
.map(|v| v.as_usize());
let start2 = self
.final_
.clone_inner()
.sum(|metric| matches!(metric.value(), MetricValue::EndTimestamp(_)))
.map(|v| v.as_usize());
match (start1, start2) {
(Some(start1), Some(start2)) => {
dest.set(Utc.timestamp_nanos(start1.max(start2) as i64))
}
(Some(start1), None) => dest.set(Utc.timestamp_nanos(start1 as i64)),
(None, Some(start2)) => dest.set(Utc.timestamp_nanos(start2 as i64)),
(None, None) => {}
}
}

/// Aggregate all metrics into a one
pub fn aggregate_all(&self) -> MetricsSet {
let mut metrics = MetricsSet::new();
let elapsed_time = Time::new();
let spill_count = Count::new();
let spilled_bytes = Count::new();
let output_count = Count::new();
let start_time = Timestamp::new();
let end_time = Timestamp::new();

metrics.push(Arc::new(Metric::new(
MetricValue::ElapsedCompute(elapsed_time.clone()),
None,
)));
metrics.push(Arc::new(Metric::new(
MetricValue::SpillCount(spill_count.clone()),
None,
)));
metrics.push(Arc::new(Metric::new(
MetricValue::SpilledBytes(spilled_bytes.clone()),
None,
)));
metrics.push(Arc::new(Metric::new(
MetricValue::OutputRows(output_count.clone()),
None,
)));
metrics.push(Arc::new(Metric::new(
MetricValue::StartTimestamp(start_time.clone()),
None,
)));
metrics.push(Arc::new(Metric::new(
MetricValue::EndTimestamp(end_time.clone()),
None,
)));

self.merge_compute_time(&elapsed_time);
self.merge_spill_count(&spill_count);
self.merge_spilled_bytes(&spilled_bytes);
self.merge_output_count(&output_count);
self.merge_start_time(&start_time);
self.merge_end_time(&end_time);
metrics
}
}
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_plan/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
mod baseline;
mod builder;
mod composite;
mod tracker;
mod value;

use parking_lot::Mutex;
Expand All @@ -33,6 +35,8 @@ use hashbrown::HashMap;
// public exports
pub use baseline::{BaselineMetrics, RecordOutput};
pub use builder::MetricBuilder;
pub use composite::CompositeMetricsSet;
pub use tracker::MemTrackingMetrics;
pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};

/// Something that tracks a value of interest (metric) of a DataFusion
Expand Down
Loading

0 comments on commit fc65a6e

Please sign in to comment.