Skip to content

Commit

Permalink
Add opening, scanning, processing metrics in file stream (#3070)
Browse files Browse the repository at this point in the history
* Fix file stream metrics

* Update datafusion/core/src/physical_plan/file_format/file_stream.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* fix comment

* fix ut.

Co-authored-by: yangzhong <yangzhong@ebay.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
3 people authored Aug 9, 2022
1 parent 31381bf commit 098f0b0
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 20 deletions.
4 changes: 1 addition & 3 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use crate::physical_plan::{
use arrow::datatypes::SchemaRef;

use crate::execution::context::TaskContext;
#[cfg(feature = "avro")]
use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -123,7 +121,7 @@ impl ExecutionPlan for AvroExec {
partition,
context,
opener,
BaselineMetrics::new(&self.metrics, partition),
self.metrics.clone(),
)?;
Ok(Box::pin(stream))
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use arrow::csv;
use arrow::datatypes::SchemaRef;
use bytes::Buf;
Expand Down Expand Up @@ -139,7 +139,7 @@ impl ExecutionPlan for CsvExec {
partition,
context,
opener,
BaselineMetrics::new(&self.metrics, partition),
self.metrics.clone(),
)?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}
Expand Down
95 changes: 84 additions & 11 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;

use arrow::datatypes::SchemaRef;
use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
Expand All @@ -39,7 +40,9 @@ use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector};
use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
use crate::physical_plan::RecordBatchStream;

/// A fallible future that resolves to a stream of [`RecordBatch`]
Expand Down Expand Up @@ -75,7 +78,9 @@ pub struct FileStream<F: FileOpener> {
object_store: Arc<dyn ObjectStore>,
/// The stream state
state: FileStreamState,
/// runtime metrics recording
/// File stream specific metrics
file_stream_metrics: FileStreamMetrics,
/// runtime baseline metrics
baseline_metrics: BaselineMetrics,
}

Expand Down Expand Up @@ -104,13 +109,69 @@ enum FileStreamState {
Limit,
}

struct StartableTime {
metrics: Time,
// use for record each part cost time, will eventually add into 'metrics'.
start: Option<Instant>,
}

impl StartableTime {
fn start(&mut self) {
assert!(self.start.is_none());
self.start = Some(Instant::now());
}

fn stop(&mut self) {
if let Some(start) = self.start.take() {
self.metrics.add_elapsed(start);
}
}
}

struct FileStreamMetrics {
/// Time elapsed for file opening
pub time_opening: StartableTime,
/// Time elapsed for file scanning + first record batch of decompression + decoding
pub time_scanning: StartableTime,
/// Time elapsed for data decompression + decoding
pub time_processing: StartableTime,
}

impl FileStreamMetrics {
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let time_opening = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_opening", partition),
start: None,
};

let time_scanning = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_scanning", partition),
start: None,
};

let time_processing = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_processing", partition),
start: None,
};

Self {
time_opening,
time_scanning,
time_processing,
}
}
}

impl<F: FileOpener> FileStream<F> {
pub fn new(
config: &FileScanConfig,
partition: usize,
context: Arc<TaskContext>,
file_reader: F,
baseline_metrics: BaselineMetrics,
metrics: ExecutionPlanMetricsSet,
) -> Result<Self> {
let (projected_schema, _) = config.project();
let pc_projector = PartitionColumnProjector::new(
Expand All @@ -132,7 +193,8 @@ impl<F: FileOpener> FileStream<F> {
pc_projector,
object_store,
state: FileStreamState::Idle,
baseline_metrics,
file_stream_metrics: FileStreamMetrics::new(&metrics, partition),
baseline_metrics: BaselineMetrics::new(&metrics, partition),
})
}

Expand All @@ -148,6 +210,7 @@ impl<F: FileOpener> FileStream<F> {
None => return Poll::Ready(None),
};

self.file_stream_metrics.time_opening.start();
let future = self.file_reader.open(
self.object_store.clone(),
file.object_meta,
Expand All @@ -164,6 +227,8 @@ impl<F: FileOpener> FileStream<F> {
partition_values,
} => match ready!(future.poll_unpin(cx)) {
Ok(reader) => {
self.file_stream_metrics.time_opening.stop();
self.file_stream_metrics.time_scanning.start();
self.state = FileStreamState::Scan {
partition_values: std::mem::take(partition_values),
reader,
Expand All @@ -179,6 +244,7 @@ impl<F: FileOpener> FileStream<F> {
partition_values,
} => match ready!(reader.poll_next_unpin(cx)) {
Some(result) => {
self.file_stream_metrics.time_scanning.stop();
let result = result
.and_then(|b| self.pc_projector.project(b, partition_values))
.map(|batch| match &mut self.remain {
Expand All @@ -202,7 +268,10 @@ impl<F: FileOpener> FileStream<F> {

return Poll::Ready(Some(result));
}
None => self.state = FileStreamState::Idle,
None => {
self.file_stream_metrics.time_scanning.stop();
self.state = FileStreamState::Idle;
}
},
FileStreamState::Error | FileStreamState::Limit => {
return Poll::Ready(None)
Expand All @@ -219,10 +288,9 @@ impl<F: FileOpener> Stream for FileStream<F> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let cloned_time = self.baseline_metrics.elapsed_compute().clone();
let timer = cloned_time.timer();
self.file_stream_metrics.time_processing.start();
let result = self.poll_inner(cx);
timer.done();
self.file_stream_metrics.time_processing.stop();
self.baseline_metrics.record_poll(result)
}
}
Expand Down Expand Up @@ -286,9 +354,14 @@ mod tests {
table_partition_cols: vec![],
};

let metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let file_stream =
FileStream::new(&config, 0, ctx.task_ctx(), reader, metrics).unwrap();
let file_stream = FileStream::new(
&config,
0,
ctx.task_ctx(),
reader,
ExecutionPlanMetricsSet::new(),
)
.unwrap();

file_stream
.map(|b| b.expect("No error expected in stream"))
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
Expand Down Expand Up @@ -126,7 +126,7 @@ impl ExecutionPlan for NdJsonExec {
partition,
context,
opener,
BaselineMetrics::new(&self.metrics, partition),
self.metrics.clone(),
)?;

Ok(Box::pin(stream) as SendableRecordBatchStream)
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::metrics::BaselineMetrics;
use crate::{
error::{DataFusionError, Result},
execution::context::{SessionState, TaskContext},
Expand Down Expand Up @@ -232,7 +231,7 @@ impl ExecutionPlan for ParquetExec {
partition_index,
context,
opener,
BaselineMetrics::new(&self.metrics, partition_index),
self.metrics.clone(),
)?;

Ok(Box::pin(stream))
Expand Down

0 comments on commit 098f0b0

Please sign in to comment.