Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve ScanExec native metrics #1133

Merged
merged 7 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,27 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin
To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set,
then any shuffle operations that cannot be supported in this mode will fall back to Spark.

## Metrics
## Metrics

### Spark SQL Metrics

Some Comet metrics are not directly comparable to Spark metrics in some cases:

- `CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times
between Spark and Comet.

Comet also adds some custom metrics:
### Native Metrics

Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are
logged for each native plan (and there is one plan per task, so this is very verbose).

Here is a guide to some of the native metrics.

### ShuffleWriterExec
### ScanExec

| Metric | Description |
| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. |
| Metric | Description |
| ----------------- | --------------------------------------------------------------------------------------------------- |
| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. |
| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. |
| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. |
26 changes: 6 additions & 20 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ impl ExecutionPlan for ShuffleWriterExec {
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, Arc::clone(&context))?;
let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0);
let jvm_fetch_time = MetricBuilder::new(&self.metrics).subset_time("jvm_fetch_time", 0);

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
Expand All @@ -152,7 +151,6 @@ impl ExecutionPlan for ShuffleWriterExec {
self.partitioning.clone(),
metrics,
context,
jvm_fetch_time,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand Down Expand Up @@ -1085,7 +1083,6 @@ impl Debug for ShuffleRepartitioner {
}
}

#[allow(clippy::too_many_arguments)]
async fn external_shuffle(
mut input: SendableRecordBatchStream,
partition_id: usize,
Expand All @@ -1094,7 +1091,6 @@ async fn external_shuffle(
partitioning: Partitioning,
metrics: ShuffleRepartitionerMetrics,
context: Arc<TaskContext>,
jvm_fetch_time: Time,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let mut repartitioner = ShuffleRepartitioner::new(
Expand All @@ -1108,23 +1104,13 @@ async fn external_shuffle(
context.session_config().batch_size(),
);

loop {
let mut timer = jvm_fetch_time.timer();
let b = input.next().await;
timer.stop();

match b {
Some(batch_result) => {
// Block on the repartitioner to insert the batch and shuffle the rows
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might overwrite the
// current batch in the repartitioner.
block_on(repartitioner.insert_batch(batch_result?))?;
}
_ => break,
}
while let Some(batch) = input.next().await {
// Block on the repartitioner to insert the batch and shuffle the rows
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might overwrite the
// current batch in the repartitioner.
block_on(repartitioner.insert_batch(batch?))?;
}

repartitioner.shuffle_write().await
}

Expand Down
38 changes: 36 additions & 2 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ pub struct ScanExec {
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
baseline_metrics: BaselineMetrics,
/// Time waiting for JVM input plan to execute and return batches
jvm_fetch_time: Time,
/// Time spent in FFI
arrow_ffi_time: Time,
}

impl ScanExec {
Expand All @@ -88,6 +92,8 @@ impl ScanExec {
) -> Result<Self, CometError> {
let metrics_set = ExecutionPlanMetricsSet::default();
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0);
let jvm_fetch_time = MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0);

// Scan's schema is determined by the input batch, so we need to set it before execution.
// Note that we determine if arrays are dictionary-encoded based on the
Expand All @@ -97,8 +103,13 @@ impl ScanExec {
// Dictionary-encoded primitive arrays are always unpacked.
let first_batch = if let Some(input_source) = input_source.as_ref() {
let mut timer = baseline_metrics.elapsed_compute().timer();
let batch =
ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())?;
let batch = ScanExec::get_next(
exec_context_id,
input_source.as_obj(),
data_types.len(),
&jvm_fetch_time,
&arrow_ffi_time,
)?;
timer.stop();
batch
} else {
Expand All @@ -124,6 +135,8 @@ impl ScanExec {
cache,
metrics: metrics_set,
baseline_metrics,
jvm_fetch_time,
arrow_ffi_time,
schema,
})
}
Expand Down Expand Up @@ -171,6 +184,8 @@ impl ScanExec {
self.exec_context_id,
self.input_source.as_ref().unwrap().as_obj(),
self.data_types.len(),
&self.jvm_fetch_time,
&self.arrow_ffi_time,
)?;
*current_batch = Some(next_batch);
}
Expand All @@ -185,6 +200,8 @@ impl ScanExec {
exec_context_id: i64,
iter: &JObject,
num_cols: usize,
jvm_fetch_time: &Time,
arrow_ffi_time: &Time,
) -> Result<InputBatch, CometError> {
if exec_context_id == TEST_EXEC_CONTEXT_ID {
// This is a unit test. We don't need to call JNI.
Expand All @@ -200,6 +217,21 @@ impl ScanExec {

let mut env = JVMClasses::get_env()?;

let mut timer = jvm_fetch_time.timer();

let num_rows: i32 = unsafe {
jni_call!(&mut env,
comet_batch_iterator(iter).has_next() -> i32)?
};
Comment on lines +222 to +225
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, I would have had has_next return a bool rather than i32 but I could not figure out how to make that happen with our jni macros. I am open to suggestions on how to improve this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a has_next in CometBatchIterator.java. Should this be comet_batch_iterator(iter).next() ?
And if you want to change the return value you can make the change in the Java class and also in the macro invocation.


timer.stop();

if num_rows == -1 {
return Ok(InputBatch::EOF);
}
kazuyukitanimura marked this conversation as resolved.
Show resolved Hide resolved

let mut timer = arrow_ffi_time.timer();

let mut array_addrs = Vec::with_capacity(num_cols);
let mut schema_addrs = Vec::with_capacity(num_cols);

Expand Down Expand Up @@ -255,6 +287,8 @@ impl ScanExec {
}
}

timer.stop();

Ok(InputBatch::new(inputs, Some(num_rows as usize)))
}
}
Expand Down
4 changes: 4 additions & 0 deletions native/core/src/jvm_bridge/batch_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use jni::{
/// A struct that holds all the JNI methods and fields for JVM `CometBatchIterator` class.
pub struct CometBatchIterator<'a> {
pub class: JClass<'a>,
pub method_has_next: JMethodID,
pub method_has_next_ret: ReturnType,
pub method_next: JMethodID,
pub method_next_ret: ReturnType,
}
Expand All @@ -38,6 +40,8 @@ impl<'a> CometBatchIterator<'a> {

Ok(CometBatchIterator {
class,
method_has_next: env.get_method_id(Self::JVM_CLASS, "hasNext", "()I")?,
method_has_next_ret: ReturnType::Primitive(Primitive::Int),
method_next: env.get_method_id(Self::JVM_CLASS, "next", "([J[J)I")?,
method_next_ret: ReturnType::Primitive(Primitive::Int),
})
Expand Down
25 changes: 18 additions & 7 deletions spark/src/main/java/org/apache/comet/CometBatchIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,27 @@
public class CometBatchIterator {
final Iterator<ColumnarBatch> input;
final NativeUtil nativeUtil;
private ColumnarBatch currentBatch = null;

CometBatchIterator(Iterator<ColumnarBatch> input, NativeUtil nativeUtil) {
this.input = input;
this.nativeUtil = nativeUtil;
}

/**
* Fetch the next input batch.
*
* @return Number of rows in next batch or -1 if no batches left.
*/
public int hasNext() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the earlier comment about returning a bool from this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Ignore my previous comment. Why would changing the return type of hasNext not work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I run into this error when trying to return bool:

224 |         comet_batch_iterator(iter).has_next() -> bool)?
    |                                                  ^^^^ the trait `std::convert::From<JValueGen<JObject<'_>>>` is not implemented for `bool`, which is required by `bool: TryFrom<JValueGen<JObject<'_>>>`

if (input.hasNext()) {
currentBatch = input.next();
kazuyukitanimura marked this conversation as resolved.
Show resolved Hide resolved
return currentBatch.numRows();
} else {
return -1;
}
}

/**
* Get the next batches of Arrow arrays.
*
Expand All @@ -47,12 +62,8 @@ public class CometBatchIterator {
* @return the number of rows of the current batch. -1 if there is no more batch.
*/
public int next(long[] arrayAddrs, long[] schemaAddrs) {
boolean hasBatch = input.hasNext();

if (!hasBatch) {
return -1;
}

return nativeUtil.exportBatch(arrayAddrs, schemaAddrs, input.next());
int numRows = nativeUtil.exportBatch(arrayAddrs, schemaAddrs, currentBatch);
kazuyukitanimura marked this conversation as resolved.
Show resolved Hide resolved
currentBatch = null;
return numRows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2909,7 +2909,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
case op if isCometSink(op) && op.output.forall(a => supportedDataType(a.dataType, true)) =>
// These operators are source of Comet native execution chain
val scanBuilder = OperatorOuterClass.Scan.newBuilder()
scanBuilder.setSource(op.simpleStringWithNodeId())
val source = op.simpleStringWithNodeId()
if (source.isEmpty) {
scanBuilder.setSource(op.getClass.getSimpleName)
} else {
scanBuilder.setSource(source)
}

val scanTypes = op.output.flatten { attr =>
serializeDataType(attr.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object CometExecUtils {
* child partition
*/
def getLimitNativePlan(outputAttributes: Seq[Attribute], limit: Int): Option[Operator] = {
val scanBuilder = OperatorOuterClass.Scan.newBuilder()
val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("LimitInput")
val scanOpBuilder = OperatorOuterClass.Operator.newBuilder()

val scanTypes = outputAttributes.flatten { attr =>
Expand Down Expand Up @@ -118,7 +118,7 @@ object CometExecUtils {
sortOrder: Seq[SortOrder],
child: SparkPlan,
limit: Int): Option[Operator] = {
val scanBuilder = OperatorOuterClass.Scan.newBuilder()
val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("TopKInput")
val scanOpBuilder = OperatorOuterClass.Operator.newBuilder()

val scanTypes = outputAttributes.flatten { attr =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ case class CometShuffleExchangeExec(
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time fetching batches from JVM"),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
Expand Down Expand Up @@ -485,14 +482,7 @@ class CometShuffleWriteProcessor(
"output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
"data_size" -> metrics("dataSize"),
"elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME))

val nativeMetrics = if (metrics.contains("jvm_fetch_time")) {
CometMetricNode(
nativeSQLMetrics ++ Map("jvm_fetch_time" ->
metrics("jvm_fetch_time")))
} else {
CometMetricNode(nativeSQLMetrics)
}
val nativeMetrics = CometMetricNode(nativeSQLMetrics)

// Getting rid of the fake partitionId
val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2)
Expand Down Expand Up @@ -538,7 +528,7 @@ class CometShuffleWriteProcessor(
}

def getNativePlan(dataFile: String, indexFile: String): Operator = {
val scanBuilder = OperatorOuterClass.Scan.newBuilder()
val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("ShuffleWriterInput")
val opBuilder = OperatorOuterClass.Operator.newBuilder()

val scanTypes = outputAttributes.flatten { attr =>
Expand Down
Loading