Skip to content

Commit

Permalink
Add baseline_metrics for FileStream to record metrics like elapsed ti…
Browse files Browse the repository at this point in the history
…me, record output, etc (#63) (#2965)

Co-authored-by: yangzhong <yangzhong@ebay.com>
  • Loading branch information
Ted-Jiang and kyotoYaho authored Jul 27, 2022
1 parent e47c4eb commit 16f8934
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 8 deletions.
16 changes: 14 additions & 2 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@ use crate::physical_plan::{
use arrow::datatypes::SchemaRef;

use crate::execution::context::TaskContext;
#[cfg(feature = "avro")]
use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use std::any::Any;
use std::sync::Arc;

use super::FileScanConfig;

/// Execution plan for scanning Avro data source
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct AvroExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl AvroExec {
Expand All @@ -46,6 +52,7 @@ impl AvroExec {
base_config,
projected_schema,
projected_statistics,
metrics: ExecutionPlanMetricsSet::new(),
}
}
/// Ref to the base configs
Expand Down Expand Up @@ -104,15 +111,20 @@ impl ExecutionPlan for AvroExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
use super::file_stream::FileStream;

let config = Arc::new(private::AvroConfig {
schema: Arc::clone(&self.base_config.file_schema),
batch_size: context.session_config().batch_size(),
projection: self.base_config.projected_file_column_names(),
});
let opener = private::AvroOpener { config };

let stream = FileStream::new(&self.base_config, partition, context, opener)?;
let stream = FileStream::new(
&self.base_config,
partition,
context,
opener,
BaselineMetrics::new(&self.metrics, partition),
)?;
Ok(Box::pin(stream))
}

Expand Down
12 changes: 11 additions & 1 deletion datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
};
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use arrow::csv;
use arrow::datatypes::SchemaRef;
use bytes::Buf;
Expand All @@ -50,6 +51,8 @@ pub struct CsvExec {
projected_schema: SchemaRef,
has_header: bool,
delimiter: u8,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl CsvExec {
Expand All @@ -63,6 +66,7 @@ impl CsvExec {
projected_statistics,
has_header,
delimiter,
metrics: ExecutionPlanMetricsSet::new(),
}
}

Expand Down Expand Up @@ -130,7 +134,13 @@ impl ExecutionPlan for CsvExec {
});

let opener = CsvOpener { config };
let stream = FileStream::new(&self.base_config, partition, context, opener)?;
let stream = FileStream::new(
&self.base_config,
partition,
context,
opener,
BaselineMetrics::new(&self.metrics, partition),
)?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}

Expand Down
16 changes: 14 additions & 2 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector};
use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::RecordBatchStream;

/// A fallible future that resolves to a stream of [`RecordBatch`]
Expand Down Expand Up @@ -74,6 +75,8 @@ pub struct FileStream<F: FormatReader> {
object_store: Arc<dyn ObjectStore>,
/// The stream state
state: FileStreamState,
/// runtime metrics recording
baseline_metrics: BaselineMetrics,
}

enum FileStreamState {
Expand Down Expand Up @@ -107,6 +110,7 @@ impl<F: FormatReader> FileStream<F> {
partition: usize,
context: Arc<TaskContext>,
file_reader: F,
baseline_metrics: BaselineMetrics,
) -> Result<Self> {
let (projected_schema, _) = config.project();
let pc_projector = PartitionColumnProjector::new(
Expand All @@ -128,6 +132,7 @@ impl<F: FormatReader> FileStream<F> {
pc_projector,
object_store,
state: FileStreamState::Idle,
baseline_metrics,
})
}

Expand Down Expand Up @@ -214,7 +219,11 @@ impl<F: FormatReader> Stream for FileStream<F> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.poll_inner(cx)
let cloned_time = self.baseline_metrics.elapsed_compute().clone();
let timer = cloned_time.timer();
let result = self.poll_inner(cx);
timer.done();
self.baseline_metrics.record_poll(result)
}
}

Expand All @@ -230,6 +239,7 @@ mod tests {

use super::*;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::prelude::SessionContext;
use crate::{
error::Result,
Expand Down Expand Up @@ -276,7 +286,9 @@ mod tests {
table_partition_cols: vec![],
};

let file_stream = FileStream::new(&config, 0, ctx.task_ctx(), reader).unwrap();
let metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let file_stream =
FileStream::new(&config, 0, ctx.task_ctx(), reader, metrics).unwrap();

file_stream
.map(|b| b.expect("No error expected in stream"))
Expand Down
12 changes: 11 additions & 1 deletion datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
};
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
Expand All @@ -48,6 +49,8 @@ pub struct NdJsonExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl NdJsonExec {
Expand All @@ -59,6 +62,7 @@ impl NdJsonExec {
base_config,
projected_schema,
projected_statistics,
metrics: ExecutionPlanMetricsSet::new(),
}
}
}
Expand Down Expand Up @@ -117,7 +121,13 @@ impl ExecutionPlan for NdJsonExec {
options,
};

let stream = FileStream::new(&self.base_config, partition, context, opener)?;
let stream = FileStream::new(
&self.base_config,
partition,
context,
opener,
BaselineMetrics::new(&self.metrics, partition),
)?;

Ok(Box::pin(stream) as SendableRecordBatchStream)
}
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
};
use crate::physical_plan::metrics::BaselineMetrics;
use crate::{
error::{DataFusionError, Result},
execution::context::{SessionState, TaskContext},
Expand Down Expand Up @@ -223,8 +224,13 @@ impl ExecutionPlan for ParquetExec {
metrics: self.metrics.clone(),
};

let stream =
FileStream::new(&self.base_config, partition_index, context, opener)?;
let stream = FileStream::new(
&self.base_config,
partition_index,
context,
opener,
BaselineMetrics::new(&self.metrics, partition_index),
)?;

Ok(Box::pin(stream))
}
Expand Down

0 comments on commit 16f8934

Please sign in to comment.