Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: add metrics to CopyExec and ScanExec #778

Merged
merged 3 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
if exec_context.debug_native {
if let Some(plan) = &exec_context.root_op {
let formatted_plan_str =
DisplayableExecutionPlan::with_full_metrics(plan.as_ref())
.indent(true);
DisplayableExecutionPlan::with_metrics(plan.as_ref()).indent(true);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with_full_metrics was too verbose

info!("Comet native query plan with metrics:\n {formatted_plan_str:}");
}
}
Expand Down
32 changes: 28 additions & 4 deletions native/core/src/execution/operators/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use futures::{Stream, StreamExt};
use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};

use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};

Expand All @@ -42,6 +43,7 @@ pub struct CopyExec {
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
cache: PlanProperties,
metrics: ExecutionPlanMetricsSet,
}

impl CopyExec {
Expand Down Expand Up @@ -70,6 +72,7 @@ impl CopyExec {
input,
schema,
cache,
metrics: ExecutionPlanMetricsSet::default(),
}
}
}
Expand Down Expand Up @@ -107,6 +110,7 @@ impl ExecutionPlan for CopyExec {
input: new_input,
schema: self.schema.clone(),
cache: self.cache.clone(),
metrics: self.metrics.clone(),
}))
}

Expand All @@ -116,7 +120,12 @@ impl ExecutionPlan for CopyExec {
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let child_stream = self.input.execute(partition, context)?;
Ok(Box::pin(CopyStream::new(self.schema(), child_stream)))
Ok(Box::pin(CopyStream::new(
self,
self.schema(),
child_stream,
partition,
)))
}

fn statistics(&self) -> DataFusionResult<Statistics> {
Expand All @@ -130,33 +139,48 @@ impl ExecutionPlan for CopyExec {
fn name(&self) -> &str {
"CopyExec"
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

struct CopyStream {
schema: SchemaRef,
child_stream: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
}

impl CopyStream {
fn new(schema: SchemaRef, child_stream: SendableRecordBatchStream) -> Self {
fn new(
exec: &CopyExec,
schema: SchemaRef,
child_stream: SendableRecordBatchStream,
partition: usize,
) -> Self {
Self {
schema,
child_stream,
baseline_metrics: BaselineMetrics::new(&exec.metrics, partition),
}
}

// TODO: replace copy_or_cast_array with copy_array if upstream sort kernel fixes
// dictionary array sorting issue.
fn copy(&self, batch: RecordBatch) -> DataFusionResult<RecordBatch> {
let mut timer = self.baseline_metrics.elapsed_compute().timer();
let vectors = batch
.columns()
.iter()
.map(|v| copy_or_cast_array(v))
.collect::<Result<Vec<ArrayRef>, _>>()?;

let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
RecordBatch::try_new_with_options(self.schema.clone(), vectors, &options)
.map_err(|e| arrow_datafusion_err!(e))
let maybe_batch = RecordBatch::try_new_with_options(self.schema.clone(), vectors, &options)
.map_err(|e| arrow_datafusion_err!(e));
timer.stop();
self.baseline_metrics.record_output(batch.num_rows());
maybe_batch
}
}

Expand Down
36 changes: 30 additions & 6 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::{
},
jvm_bridge::{jni_call, JVMClasses},
};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::{
execution::TaskContext,
physical_expr::*,
Expand All @@ -62,6 +63,7 @@ pub struct ScanExec {
/// It is also used in unit test to mock the input data from JVM.
pub batch: Arc<Mutex<Option<InputBatch>>>,
cache: PlanProperties,
metrics: ExecutionPlanMetricsSet,
}

impl ScanExec {
Expand Down Expand Up @@ -91,6 +93,7 @@ impl ScanExec {
data_types,
batch: Arc::new(Mutex::new(Some(first_batch))),
cache,
metrics: ExecutionPlanMetricsSet::default(),
})
}

Expand Down Expand Up @@ -261,10 +264,14 @@ impl ExecutionPlan for ScanExec {

fn execute(
&self,
_: usize,
partition: usize,
_: Arc<TaskContext>,
) -> datafusion::common::Result<SendableRecordBatchStream> {
Ok(Box::pin(ScanStream::new(self.clone(), self.schema())))
Ok(Box::pin(ScanStream::new(
self.clone(),
self.schema(),
partition,
)))
}

fn properties(&self) -> &PlanProperties {
Expand All @@ -274,6 +281,10 @@ impl ExecutionPlan for ScanExec {
fn name(&self) -> &str {
"ScanExec"
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

impl DisplayAs for ScanExec {
Expand All @@ -300,11 +311,18 @@ struct ScanStream {
scan: ScanExec,
/// Schema representing the data
schema: SchemaRef,
/// Metrics
baseline_metrics: BaselineMetrics,
}

impl ScanStream {
pub fn new(scan: ScanExec, schema: SchemaRef) -> Self {
Self { scan, schema }
pub fn new(scan: ScanExec, schema: SchemaRef, partition: usize) -> Self {
let baseline_metrics = BaselineMetrics::new(&scan.metrics, partition);
Self {
scan,
schema,
baseline_metrics,
}
}

fn build_record_batch(
Expand Down Expand Up @@ -338,9 +356,11 @@ impl Stream for ScanStream {
type Item = DataFusionResult<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut timer = self.baseline_metrics.elapsed_compute().timer();
let mut scan_batch = self.scan.batch.try_lock().unwrap();
let input_batch = &*scan_batch;
timer.stop();

let input_batch = &*scan_batch;
let input_batch = if let Some(batch) = input_batch {
batch
} else {
Expand All @@ -350,7 +370,11 @@ impl Stream for ScanStream {
let result = match input_batch {
InputBatch::EOF => Poll::Ready(None),
InputBatch::Batch(columns, num_rows) => {
Poll::Ready(Some(self.build_record_batch(columns, *num_rows)))
self.baseline_metrics.record_output(*num_rows);
let mut timer = self.baseline_metrics.elapsed_compute().timer();
let maybe_batch = self.build_record_batch(columns, *num_rows);
timer.stop();
Poll::Ready(Some(maybe_batch))
}
};

Expand Down
Loading