Skip to content

Commit

Permalink
Add jvmGCTime metrics (opensearch-project#889)
Browse files Browse the repository at this point in the history
* Add jvmGCTime metrics

Signed-off-by: Tomoyuki Morita <moritato@amazon.com>

* Fix style

Signed-off-by: Tomoyuki Morita <moritato@amazon.com>

---------

Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
  • Loading branch information
ykmr1224 authored and 14yapkc1 committed Dec 11, 2024
1 parent 0f08cfa commit efa0008
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ public final class MetricConstants {
*/
public static final String INITIAL_CONDITION_CHECK_FAILED_PREFIX = "initialConditionCheck.failed.";

/**
* Metric for tracking the JVM GC time per task
*/
public static final String TASK_JVM_GC_TIME_METRIC = "task.jvmGCTime.count";

/**
* Metric for tracking the total JVM GC time for query
*/
public static final String TOTAL_JVM_GC_TIME_METRIC = "query.totalJvmGCTime.count";

private MetricConstants() {
// Private constructor to prevent instantiation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@
package org.opensearch.flint.core.metrics

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd}
import org.apache.spark.sql.SparkSession

/**
* Collect and emit bytesRead/Written and recordsRead/Written metrics
* Collect and emit metrics by listening spark events
*/
class ReadWriteBytesSparkListener extends SparkListener with Logging {
class MetricsSparkListener extends SparkListener with Logging {
var bytesRead: Long = 0
var recordsRead: Long = 0
var bytesWritten: Long = 0
var recordsWritten: Long = 0
var totalJvmGcTime: Long = 0

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val inputMetrics = taskEnd.taskMetrics.inputMetrics
Expand All @@ -31,21 +32,28 @@ class ReadWriteBytesSparkListener extends SparkListener with Logging {
recordsRead += inputMetrics.recordsRead
bytesWritten += outputMetrics.bytesWritten
recordsWritten += outputMetrics.recordsWritten
totalJvmGcTime += taskEnd.taskMetrics.jvmGCTime

MetricsUtil.addHistoricGauge(
MetricConstants.TASK_JVM_GC_TIME_METRIC,
taskEnd.taskMetrics.jvmGCTime)
}

def emitMetrics(): Unit = {
logInfo(s"Input: totalBytesRead=${bytesRead}, totalRecordsRead=${recordsRead}")
logInfo(s"Output: totalBytesWritten=${bytesWritten}, totalRecordsWritten=${recordsWritten}")
logInfo(s"totalJvmGcTime=${totalJvmGcTime}")
MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_BYTES_READ, bytesRead)
MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_RECORDS_READ, recordsRead)
MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_BYTES_WRITTEN, bytesWritten)
MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_RECORDS_WRITTEN, recordsWritten)
MetricsUtil.addHistoricGauge(MetricConstants.TOTAL_JVM_GC_TIME_METRIC, totalJvmGcTime)
}
}

object ReadWriteBytesSparkListener {
object MetricsSparkListener {
def withMetrics[T](spark: SparkSession, lambda: () => T): T = {
val listener = new ReadWriteBytesSparkListener()
val listener = new MetricsSparkListener()
spark.sparkContext.addSparkListener(listener)

val result = lambda()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.refresh

import java.util.Collections

import org.opensearch.flint.core.metrics.ReadWriteBytesSparkListener
import org.opensearch.flint.core.metrics.MetricsSparkListener
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode}
Expand Down Expand Up @@ -68,7 +68,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
// Flint index has specialized logic and capability for incremental refresh
case refresh: StreamingRefresh =>
logInfo("Start refreshing index in streaming style")
val job = ReadWriteBytesSparkListener.withMetrics(
val job = MetricsSparkListener.withMetrics(
spark,
() =>
refresh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.codahale.metrics.Timer
import org.opensearch.flint.common.model.{FlintStatement, InteractiveSession, SessionStates}
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.logging.CustomLogging
import org.opensearch.flint.core.metrics.{MetricConstants, ReadWriteBytesSparkListener}
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsSparkListener}
import org.opensearch.flint.core.metrics.MetricsUtil.{getTimerContext, incrementCounter, registerGauge, stopTimer}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -525,7 +525,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
val statementTimerContext = getTimerContext(
MetricConstants.STATEMENT_PROCESSING_TIME_METRIC)
val (dataToWrite, returnedVerificationResult) =
ReadWriteBytesSparkListener.withMetrics(
MetricsSparkListener.withMetrics(
spark,
() => {
processStatementOnVerification(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import scala.util.{Failure, Success, Try}

import org.opensearch.flint.common.model.FlintStatement
import org.opensearch.flint.common.scheduler.model.LangType
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil, ReadWriteBytesSparkListener}
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsSparkListener, MetricsUtil}
import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter
import org.opensearch.flint.spark.FlintSpark

Expand Down Expand Up @@ -70,7 +70,7 @@ case class JobOperator(
val statementExecutionManager =
instantiateStatementExecutionManager(commandContext, resultIndex, osClient)

val readWriteBytesSparkListener = new ReadWriteBytesSparkListener()
val readWriteBytesSparkListener = new MetricsSparkListener()
sparkSession.sparkContext.addSparkListener(readWriteBytesSparkListener)

val statement =
Expand Down

0 comments on commit efa0008

Please sign in to comment.