Skip to content

Commit

Permalink
chore: Add spilling metrics of SortMergeJoin (apache#878)
Browse files Browse the repository at this point in the history
(cherry picked from commit f4400f5)
  • Loading branch information
viirya authored and huaxingao committed Aug 29, 2024
1 parent 539e62d commit a30ed86
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ class RowPartition(initialSize: Int) {
rowSizes += size
}

def getNumRows: Int = rowAddresses.size
def getNumRows: Int = if (rowAddresses == null) {
0
} else {
rowAddresses.size
}

def getRowAddresses: Array[Long] = {
val array = rowAddresses.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,25 @@ object CometMetricNode {
"join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"))
}

/**
* SQL Metrics for DataFusion SortMergeJoin
*/
def sortMergeJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = {
Map(
"peak_mem_used" ->
SQLMetrics.createSizeMetric(sc, "Memory used by build-side"),
"input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"),
"input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"),
"output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"),
"output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"),
"spill_count" -> SQLMetrics.createMetric(sc, "Count of spills"),
"spilled_bytes" -> SQLMetrics.createSizeMetric(sc, "Total spilled bytes"),
"spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows"))
}

/**
* Creates a [[CometMetricNode]] from a [[CometPlan]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,14 +933,7 @@ case class CometSortMergeJoinExec(
Objects.hashCode(leftKeys, rightKeys, condition, left, right)

override lazy val metrics: Map[String, SQLMetric] =
Map(
"input_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches consumed"),
"input_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows consumed"),
"output_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches produced"),
"output_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows produced"),
"peak_mem_used" ->
SQLMetrics.createSizeMetric(sparkContext, "Peak memory used for buffered data"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Total time for joining"))
CometMetricNode.sortMergeJoinMetrics(sparkContext)
}

case class CometScanWrapper(override val nativeOp: Operator, override val originalPlan: SparkPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ class CometExecSuite extends CometTestBase {
assert(metrics("peak_mem_used").value > 1L)
assert(metrics.contains("join_time"))
assert(metrics("join_time").value > 1L)
assert(metrics.contains("spill_count"))
assert(metrics("spill_count").value == 0)
}
}
}
Expand Down

0 comments on commit a30ed86

Please sign in to comment.