diff --git a/core/pom.xml b/core/pom.xml
index ff7fa04c8dbdd..0a339e11a5d20 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -32,6 +32,7 @@
core
+ **/OpenTelemetry*.scala
@@ -515,6 +516,15 @@
target/scala-${scala.binary.version}/classes
target/scala-${scala.binary.version}/test-classes
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+ ${opentelemetry.exclude}
+
+
+
org.apache.maven.plugins
maven-dependency-plugin
@@ -616,6 +626,36 @@
.sh
+
+ opentelemetry
+
+
+
+
+
+ io.opentelemetry
+ opentelemetry-exporter-otlp
+ 1.41.0
+
+
+ io.opentelemetry
+ opentelemetry-sdk-extension-autoconfigure-spi
+
+
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+ 1.41.0
+
+
+ com.squareup.okhttp3
+ okhttp
+ 3.12.12
+ test
+
+
+
sparkr
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/opentelemetry/OpenTelemetryPushReporter.scala b/core/src/main/scala/org/apache/spark/metrics/sink/opentelemetry/OpenTelemetryPushReporter.scala
new file mode 100644
index 0000000000000..bab7023ecdf11
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/opentelemetry/OpenTelemetryPushReporter.scala
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink.opentelemetry
+
+import java.nio.file.{Files, Paths}
+import java.util.{Locale, SortedMap}
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics._
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.api.metrics.{DoubleGauge, DoubleHistogram, LongCounter}
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter
+import io.opentelemetry.sdk.OpenTelemetrySdk
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader
+import io.opentelemetry.sdk.resources.Resource
+
+private[spark] class OpenTelemetryPushReporter(
+ registry: MetricRegistry,
+ pollInterval: Int = 10,
+ pollUnit: TimeUnit = TimeUnit.SECONDS,
+ endpoint: String = "http://localhost:4317",
+ headersMap: Map[String, String] = Map(),
+ attributesMap: Map[String, String] = Map(),
+ trustedCertificatesPath: String,
+ privateKeyPemPath: String,
+ certificatePemPath: String)
+ extends ScheduledReporter (
+ registry,
+ "opentelemetry-push-reporter",
+ MetricFilter.ALL,
+ TimeUnit.SECONDS,
+ TimeUnit.MILLISECONDS)
+ with MetricRegistryListener {
+
+ val FIFTEEN_MINUTE_RATE = "_fifteen_minute_rate"
+ val FIVE_MINUTE_RATE = "_five_minute_rate"
+ val ONE_MINUTE_RATE = "_one_minute_rate"
+ val MEAN_RATE = "_mean_rate"
+ val METER = "_meter"
+ val TIMER = "_timer"
+ val COUNT = "_count"
+ val MAX = "_max"
+ val MIN = "_min"
+ val MEAN = "_mean"
+ val MEDIAN = "_50_percentile"
+ val SEVENTY_FIFTH_PERCENTILE = "_75_percentile"
+ val NINETY_FIFTH_PERCENTILE = "_95_percentile"
+ val NINETY_EIGHTH_PERCENTILE = "_98_percentile"
+ val NINETY_NINTH_PERCENTILE = "_99_percentile"
+ val NINE_HUNDRED_NINETY_NINTH_PERCENTILE = "_999_percentile"
+ val STD_DEV = "_std_dev"
+
+ val otlpGrpcMetricExporterBuilder = OtlpGrpcMetricExporter.builder()
+
+ for ((key, value) <- headersMap) {
+ otlpGrpcMetricExporterBuilder.addHeader(key, value)
+ }
+
+ if (trustedCertificatesPath != null) {
+ otlpGrpcMetricExporterBuilder
+ .setTrustedCertificates(Files.readAllBytes(Paths.get(trustedCertificatesPath)))
+ }
+
+ if (privateKeyPemPath != null && certificatePemPath != null) {
+ otlpGrpcMetricExporterBuilder
+ .setClientTls(
+ Files.readAllBytes(Paths.get(privateKeyPemPath)),
+ Files.readAllBytes(Paths.get(certificatePemPath)))
+ }
+
+ otlpGrpcMetricExporterBuilder.setEndpoint(endpoint)
+
+ val arrtributesBuilder = Attributes.builder()
+ for ((key, value) <- attributesMap) {
+ arrtributesBuilder.put(key, value)
+ }
+
+ val resource = Resource
+ .getDefault()
+ .merge(Resource.create(arrtributesBuilder.build()));
+
+ val metricReader = PeriodicMetricReader
+ .builder(otlpGrpcMetricExporterBuilder.build())
+ .setInterval(pollInterval, pollUnit)
+ .build()
+
+ val sdkMeterProvider: SdkMeterProvider = SdkMeterProvider
+ .builder()
+ .registerMetricReader(metricReader)
+ .setResource(resource)
+ .build()
+
+ val openTelemetryCounters = collection.mutable.Map[String, LongCounter]()
+ val openTelemetryHistograms = collection.mutable.Map[String, DoubleHistogram]()
+ val openTelemetryGauges = collection.mutable.Map[String, DoubleGauge]()
+ val codahaleCounters = collection.mutable.Map[String, Counter]()
+ val openTelemetry = OpenTelemetrySdk
+ .builder()
+ .setMeterProvider(sdkMeterProvider)
+ .build();
+ val openTelemetryMeter = openTelemetry.getMeter("apache-spark")
+
+ override def report(
+ gauges: SortedMap[String, Gauge[_]],
+ counters: SortedMap[String, Counter],
+ histograms: SortedMap[String, Histogram],
+ meters: SortedMap[String, Meter],
+ timers: SortedMap[String, Timer]): Unit = {
+ counters.forEach(this.reportCounter)
+ gauges.forEach(this.reportGauges)
+ histograms.forEach(this.reportHistograms)
+ meters.forEach(this.reportMeters)
+ timers.forEach(this.reportTimers)
+ sdkMeterProvider.forceFlush
+ }
+
+ override def onGaugeAdded(name: String, gauge: Gauge[_]): Unit = {
+ val metricName = normalizeMetricName(name)
+ generateGauge(metricName)
+ }
+
+ override def onGaugeRemoved(name: String): Unit = {
+ val metricName = normalizeMetricName(name)
+ openTelemetryGauges.remove(metricName)
+ }
+
+ override def onCounterAdded(name: String, counter: Counter): Unit = {
+ val metricName = normalizeMetricName(name)
+ val addedOpenTelemetryCounter =
+ openTelemetryMeter.counterBuilder(normalizeMetricName(metricName)).build
+ openTelemetryCounters.put(metricName, addedOpenTelemetryCounter)
+ codahaleCounters.put(metricName, registry.counter(metricName))
+ }
+
+ override def onCounterRemoved(name: String): Unit = {
+ val metricName = normalizeMetricName(name)
+ openTelemetryCounters.remove(metricName)
+ codahaleCounters.remove(metricName)
+ }
+
+ override def onHistogramAdded(name: String, histogram: Histogram): Unit = {
+ val metricName = normalizeMetricName(name)
+ generateHistogramGroup(metricName)
+ }
+
+ override def onHistogramRemoved(name: String): Unit = {
+ val metricName = normalizeMetricName(name)
+ cleanHistogramGroup(metricName)
+ }
+
+ override def onMeterAdded(name: String, meter: Meter): Unit = {
+ val metricName = normalizeMetricName(name) + METER
+ generateGauge(metricName + COUNT)
+ generateGauge(metricName + MEAN_RATE)
+ generateGauge(metricName + FIFTEEN_MINUTE_RATE)
+ generateGauge(metricName + FIVE_MINUTE_RATE)
+ generateGauge(metricName + ONE_MINUTE_RATE)
+ }
+
+ override def onMeterRemoved(name: String): Unit = {
+ val metricName = normalizeMetricName(name) + METER
+ openTelemetryGauges.remove(metricName + COUNT)
+ openTelemetryGauges.remove(metricName + MEAN_RATE)
+ openTelemetryGauges.remove(metricName + ONE_MINUTE_RATE)
+ openTelemetryGauges.remove(metricName + FIVE_MINUTE_RATE)
+ openTelemetryGauges.remove(metricName + FIFTEEN_MINUTE_RATE)
+ }
+
+ override def onTimerAdded(name: String, timer: Timer): Unit = {
+ val metricName = normalizeMetricName(name) + TIMER
+ generateHistogramGroup(metricName)
+ generateAdditionalHistogramGroupForTimers(metricName)
+ }
+
+ override def onTimerRemoved(name: String): Unit = {
+ val metricName = normalizeMetricName(name) + TIMER
+ cleanHistogramGroup(name)
+ cleanAdditionalHistogramGroupTimers(metricName)
+ }
+
+ override def stop(): Unit = {
+ super.stop()
+ sdkMeterProvider.close()
+ }
+
+ private def normalizeMetricName(name: String): String = {
+ name.toLowerCase(Locale.ROOT).replaceAll("[^a-z0-9]", "_")
+ }
+
+ private def generateHistogram(metricName: String): Unit = {
+ val openTelemetryHistogram =
+ openTelemetryMeter.histogramBuilder(metricName).build
+ openTelemetryHistograms.put(metricName, openTelemetryHistogram)
+ }
+
+ private def generateHistogramGroup(metricName: String): Unit = {
+ generateHistogram(metricName + COUNT)
+ generateHistogram(metricName + MAX)
+ generateHistogram(metricName + MIN)
+ generateHistogram(metricName + MEAN)
+ generateHistogram(metricName + MEDIAN)
+ generateHistogram(metricName + STD_DEV)
+ generateHistogram(metricName + SEVENTY_FIFTH_PERCENTILE)
+ generateHistogram(metricName + NINETY_FIFTH_PERCENTILE)
+ generateHistogram(metricName + NINETY_EIGHTH_PERCENTILE)
+ generateHistogram(metricName + NINETY_NINTH_PERCENTILE)
+ generateHistogram(metricName + NINE_HUNDRED_NINETY_NINTH_PERCENTILE)
+ }
+
+ private def generateAdditionalHistogramGroupForTimers(metricName: String): Unit = {
+ generateHistogram(metricName + FIFTEEN_MINUTE_RATE)
+ generateHistogram(metricName + FIVE_MINUTE_RATE)
+ generateHistogram(metricName + ONE_MINUTE_RATE)
+ generateHistogram(metricName + MEAN_RATE)
+ }
+
+ private def cleanHistogramGroup(metricName: String): Unit = {
+ openTelemetryHistograms.remove(metricName + COUNT)
+ openTelemetryHistograms.remove(metricName + MAX)
+ openTelemetryHistograms.remove(metricName + MIN)
+ openTelemetryHistograms.remove(metricName + MEAN)
+ openTelemetryHistograms.remove(metricName + MEDIAN)
+ openTelemetryHistograms.remove(metricName + STD_DEV)
+ openTelemetryHistograms.remove(metricName + SEVENTY_FIFTH_PERCENTILE)
+ openTelemetryHistograms.remove(metricName + NINETY_FIFTH_PERCENTILE)
+ openTelemetryHistograms.remove(metricName + NINETY_EIGHTH_PERCENTILE)
+ openTelemetryHistograms.remove(metricName + NINETY_NINTH_PERCENTILE)
+ openTelemetryHistograms.remove(metricName + NINE_HUNDRED_NINETY_NINTH_PERCENTILE)
+ }
+
+ private def cleanAdditionalHistogramGroupTimers(metricName: String): Unit = {
+ openTelemetryHistograms.remove(metricName + FIFTEEN_MINUTE_RATE)
+ openTelemetryHistograms.remove(metricName + FIVE_MINUTE_RATE)
+ openTelemetryHistograms.remove(metricName + ONE_MINUTE_RATE)
+ openTelemetryHistograms.remove(metricName + MEAN_RATE)
+ }
+
+ private def generateGauge(metricName: String): Unit = {
+ val addedOpenTelemetryGauge =
+ openTelemetryMeter.gaugeBuilder(normalizeMetricName(metricName)).build
+ openTelemetryGauges.put(metricName, addedOpenTelemetryGauge)
+ }
+
+ private def reportCounter(name: String, counter: Counter): Unit = {
+ val metricName = normalizeMetricName(name)
+ val openTelemetryCounter = openTelemetryCounters(metricName)
+ val codahaleCounter = codahaleCounters(metricName)
+ val diff = counter.getCount - codahaleCounter.getCount
+ openTelemetryCounter.add(diff)
+ codahaleCounter.inc(diff)
+ }
+
+ private def reportGauges(name: String, gauge: Gauge[_]): Unit = {
+ val metricName = normalizeMetricName(name)
+ gauge.getValue match {
+ case d: Double =>
+ openTelemetryGauges(metricName).set(d.doubleValue)
+ case d: Long =>
+ openTelemetryGauges(metricName).set(d.doubleValue)
+ case d: Int =>
+ openTelemetryGauges(metricName).set(d.doubleValue)
+ case _ => ()
+ }
+ }
+
+ private def reportHistograms(name: String, histogram: Histogram): Unit = {
+ val metricName = normalizeMetricName(name)
+ reportHistogramGroup(metricName, histogram)
+ }
+
+ private def reportMeters(name: String, meter: Meter): Unit = {
+ val metricName = normalizeMetricName(name) + METER
+ val openTelemetryGaugeCount = openTelemetryGauges(metricName + COUNT)
+ openTelemetryGaugeCount.set(meter.getCount.toDouble)
+ val openTelemetryGauge0neMinuteRate = openTelemetryGauges(metricName + ONE_MINUTE_RATE)
+ openTelemetryGauge0neMinuteRate.set(meter.getOneMinuteRate)
+ val openTelemetryGaugeFiveMinuteRate = openTelemetryGauges(metricName + FIVE_MINUTE_RATE)
+ openTelemetryGaugeFiveMinuteRate.set(meter.getFiveMinuteRate)
+ val openTelemetryGaugeFifteenMinuteRate = openTelemetryGauges(
+ metricName + FIFTEEN_MINUTE_RATE)
+ openTelemetryGaugeFifteenMinuteRate.set(meter.getFifteenMinuteRate)
+ val openTelemetryGaugeMeanRate = openTelemetryGauges(metricName + MEAN_RATE)
+ openTelemetryGaugeMeanRate.set(meter.getMeanRate)
+ }
+
+ private def reportTimers(name: String, timer: Timer): Unit = {
+ val metricName = normalizeMetricName(name) + TIMER
+ val openTelemetryHistogramMax = openTelemetryHistograms(metricName + MAX)
+ openTelemetryHistogramMax.record(timer.getCount.toDouble)
+ val openTelemetryHistogram0neMinuteRate = openTelemetryHistograms(
+ metricName + ONE_MINUTE_RATE)
+ openTelemetryHistogram0neMinuteRate.record(timer.getOneMinuteRate)
+ val openTelemetryHistogramFiveMinuteRate = openTelemetryHistograms(
+ metricName + FIVE_MINUTE_RATE)
+ openTelemetryHistogramFiveMinuteRate.record(timer.getFiveMinuteRate)
+ val openTelemetryHistogramFifteenMinuteRate = openTelemetryHistograms(
+ metricName + FIFTEEN_MINUTE_RATE)
+ openTelemetryHistogramFifteenMinuteRate.record(timer.getFifteenMinuteRate)
+ val openTelemetryHistogramMeanRate = openTelemetryHistograms(metricName + MEAN_RATE)
+ openTelemetryHistogramMeanRate.record(timer.getMeanRate)
+ val snapshot = timer.getSnapshot
+ reportHistogramGroup(metricName, snapshot)
+ }
+
+ private def reportHistogramGroup(metricName: String, histogram: Histogram): Unit = {
+ val openTelemetryHistogramCount = openTelemetryHistograms(metricName + COUNT)
+ openTelemetryHistogramCount.record(histogram.getCount.toDouble)
+ val snapshot = histogram.getSnapshot
+ reportHistogramGroup(metricName, snapshot)
+ }
+
+ private def reportHistogramGroup(metricName: String, snapshot: Snapshot): Unit = {
+ val openTelemetryHistogramMax = openTelemetryHistograms(metricName + MAX)
+ openTelemetryHistogramMax.record(snapshot.getMax.toDouble)
+ val openTelemetryHistogramMin = openTelemetryHistograms(metricName + MIN)
+ openTelemetryHistogramMin.record(snapshot.getMin.toDouble)
+ val openTelemetryHistogramMean = openTelemetryHistograms(metricName + MEAN)
+ openTelemetryHistogramMean.record(snapshot.getMean)
+ val openTelemetryHistogramMedian = openTelemetryHistograms(metricName + MEDIAN)
+ openTelemetryHistogramMedian.record(snapshot.getMedian)
+ val openTelemetryHistogramStdDev = openTelemetryHistograms(metricName + STD_DEV)
+ openTelemetryHistogramStdDev.record(snapshot.getStdDev)
+ val openTelemetryHistogram75Percentile = openTelemetryHistograms(
+ metricName + SEVENTY_FIFTH_PERCENTILE)
+ openTelemetryHistogram75Percentile.record(snapshot.get75thPercentile)
+ val openTelemetryHistogram95Percentile = openTelemetryHistograms(
+ metricName + NINETY_FIFTH_PERCENTILE)
+ openTelemetryHistogram95Percentile.record(snapshot.get95thPercentile)
+ val openTelemetryHistogram98Percentile = openTelemetryHistograms(
+ metricName + NINETY_EIGHTH_PERCENTILE)
+ openTelemetryHistogram98Percentile.record(snapshot.get98thPercentile)
+ val openTelemetryHistogram99Percentile = openTelemetryHistograms(
+ metricName + NINETY_NINTH_PERCENTILE)
+ openTelemetryHistogram99Percentile.record(snapshot.get99thPercentile)
+ val openTelemetryHistogram999Percentile = openTelemetryHistograms(
+ metricName + NINE_HUNDRED_NINETY_NINTH_PERCENTILE)
+ openTelemetryHistogram999Percentile.record(snapshot.get999thPercentile)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/opentelemetry/OpenTelemetryPushSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/opentelemetry/OpenTelemetryPushSink.scala
new file mode 100644
index 0000000000000..23d047f585efc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/opentelemetry/OpenTelemetryPushSink.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink.opentelemetry
+
+import java.util.{Locale, Properties}
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.MetricRegistry
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.sink.Sink
+
+private[spark] object OpenTelemetryPushSink {
+ private def fetchMapFromProperties(
+ properties: Properties,
+ keyPrefix: String): Map[String, String] = {
+ val propertiesMap = scala.collection.mutable.Map[String, String]()
+ val valueEnumeration = properties.propertyNames
+ val dotCount = keyPrefix.count(_ == '.')
+ while (valueEnumeration.hasMoreElements) {
+ val key = valueEnumeration.nextElement.asInstanceOf[String]
+ if (key.startsWith(keyPrefix)) {
+ val dotIndex = StringUtils.ordinalIndexOf(key, ".", dotCount + 1)
+ val mapKey = key.substring(dotIndex + 1)
+ propertiesMap(mapKey) = properties.getProperty(key)
+ }
+ }
+ propertiesMap.toMap
+ }
+}
+
+private[spark] class OpenTelemetryPushSink(val property: Properties, val registry: MetricRegistry)
+ extends Sink with Logging {
+
+ val OPEN_TELEMETRY_KEY_PERIOD = "period"
+ val OPEN_TELEMETRY_KEY_UNIT = "unit"
+ val OPEN_TELEMETRY_DEFAULT_PERIOD = "10"
+ val OPEN_TELEMETRY_DEFAULT_UNIT = "SECONDS"
+ val OPEN_TELEMETRY_KEY_ENDPOINT = "endpoint"
+ val GRPC_METRIC_EXPORTER_HEADER_KEY = "grpc.metric.exporter.header"
+ val GRPC_METRIC_EXPORTER_ATTRIBUTES_KEY = "grpc.metric.exporter.attributes"
+ val TRUSTED_CERTIFICATE_PATH = "trusted.certificate.path"
+ val PRIVATE_KEY_PEM_PATH = "private.key.pem.path"
+ val CERTIFICATE_PEM_PATH = "certificate.pem.path"
+
+ val pollPeriod = property
+ .getProperty(OPEN_TELEMETRY_KEY_PERIOD, OPEN_TELEMETRY_DEFAULT_PERIOD)
+ .toInt
+
+ val pollUnit = TimeUnit.valueOf(
+ property
+ .getProperty(OPEN_TELEMETRY_KEY_UNIT, OPEN_TELEMETRY_DEFAULT_UNIT)
+ .toUpperCase(Locale.ROOT))
+
+ val endpoint = property.getProperty(OPEN_TELEMETRY_KEY_ENDPOINT)
+
+ val headersMap =
+ OpenTelemetryPushSink.fetchMapFromProperties(property, GRPC_METRIC_EXPORTER_HEADER_KEY)
+ val attributesMap =
+ OpenTelemetryPushSink.fetchMapFromProperties(property, GRPC_METRIC_EXPORTER_ATTRIBUTES_KEY)
+
+ val trustedCertificatesPath: String =
+ property.getProperty(TRUSTED_CERTIFICATE_PATH)
+
+ val privateKeyPemPath: String = property.getProperty(PRIVATE_KEY_PEM_PATH)
+
+ val certificatePemPath: String = property.getProperty(CERTIFICATE_PEM_PATH)
+
+ val reporter = new OpenTelemetryPushReporter(
+ registry,
+ pollInterval = pollPeriod,
+ pollUnit,
+ endpoint,
+ headersMap,
+ attributesMap,
+ trustedCertificatesPath,
+ privateKeyPemPath,
+ certificatePemPath)
+
+ registry.addListener(reporter)
+
+ override def start(): Unit = {
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def stop(): Unit = {
+ reporter.stop()
+ }
+
+ override def report(): Unit = {
+ reporter.report()
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/metrics/sink/opentelemetry/OpenTelemetryPushReporterSuite.scala b/core/src/test/scala/org/apache/spark/metrics/sink/opentelemetry/OpenTelemetryPushReporterSuite.scala
new file mode 100644
index 0000000000000..3f9c75062f78f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/metrics/sink/opentelemetry/OpenTelemetryPushReporterSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink.opentelemetry
+
+import com.codahale.metrics._
+import org.junit.jupiter.api.Assertions.assertNotNull
+import org.scalatest.PrivateMethodTester
+
+import org.apache.spark.SparkFunSuite
+
+class OpenTelemetryPushReporterSuite
+ extends SparkFunSuite with PrivateMethodTester {
+ val reporter = new OpenTelemetryPushReporter(
+ registry = new MetricRegistry(),
+ trustedCertificatesPath = null,
+ privateKeyPemPath = null,
+ certificatePemPath = null
+ )
+
+ test("Normalize metric name key") {
+ val name = "local-1592132938718.driver.LiveListenerBus." +
+ "listenerProcessingTime.org.apache.spark.HeartbeatReceiver"
+ val metricsName = reporter invokePrivate PrivateMethod[String](
+ Symbol("normalizeMetricName")
+ )(name)
+ assert(
+ metricsName == "local_1592132938718_driver_livelistenerbus_" +
+ "listenerprocessingtime_org_apache_spark_heartbeatreceiver"
+ )
+ }
+
+ test("OpenTelemetry actions when one codahale gauge is added") {
+ val gauge = new Gauge[Double] {
+ override def getValue: Double = 1.23
+ }
+ reporter.onGaugeAdded("test-gauge", gauge)
+ assertNotNull(reporter.openTelemetryGauges("test_gauge"))
+ }
+
+ test("OpenTelemetry actions when one codahale counter is added") {
+ val counter = new Counter
+ reporter.onCounterAdded("test_counter", counter)
+ assertNotNull(reporter.openTelemetryCounters("test_counter"))
+ }
+
+ test("OpenTelemetry actions when one codahale histogram is added") {
+ val histogram = new Histogram(new UniformReservoir)
+ reporter.onHistogramAdded("test_hist", histogram)
+ assertNotNull(reporter.openTelemetryHistograms("test_hist_count"))
+ assertNotNull(reporter.openTelemetryHistograms("test_hist_max"))
+ assertNotNull(reporter.openTelemetryHistograms("test_hist_min"))
+ assertNotNull(reporter.openTelemetryHistograms("test_hist_mean"))
+ assertNotNull(reporter.openTelemetryHistograms("test_hist_std_dev"))
+ assertNotNull(reporter.openTelemetryHistograms("test_hist_50_percentile"))
+ assertNotNull(reporter.openTelemetryHistograms("test_hist_75_percentile"))
+ assertNotNull(reporter.openTelemetryHistograms("test_hist_95_percentile"))
+ assertNotNull(reporter.openTelemetryHistograms("test_hist_98_percentile"))
+ assertNotNull(reporter.openTelemetryHistograms("test_hist_99_percentile"))
+ assertNotNull(reporter.openTelemetryHistograms("test_hist_999_percentile"))
+ }
+
+ test("OpenTelemetry actions when one codahale meter is added") {
+ val meter = new Meter()
+ reporter.onMeterAdded("test_meter", meter)
+ assertNotNull(reporter.openTelemetryGauges("test_meter_meter_count"))
+ assertNotNull(reporter.openTelemetryGauges("test_meter_meter_mean_rate"))
+ assertNotNull(
+ reporter.openTelemetryGauges("test_meter_meter_one_minute_rate")
+ )
+ assertNotNull(
+ reporter.openTelemetryGauges("test_meter_meter_five_minute_rate")
+ )
+ assertNotNull(
+ reporter.openTelemetryGauges("test_meter_meter_fifteen_minute_rate")
+ )
+ }
+
+ test("OpenTelemetry actions when one codahale timer is added") {
+ val timer = new Timer()
+ reporter.onTimerAdded("test_timer", timer)
+ assertNotNull(reporter.openTelemetryHistograms("test_timer_timer_count"))
+ assertNotNull(reporter.openTelemetryHistograms("test_timer_timer_max"))
+ assertNotNull(reporter.openTelemetryHistograms("test_timer_timer_min"))
+ assertNotNull(reporter.openTelemetryHistograms("test_timer_timer_mean"))
+ assertNotNull(reporter.openTelemetryHistograms("test_timer_timer_std_dev"))
+ assertNotNull(
+ reporter.openTelemetryHistograms("test_timer_timer_50_percentile")
+ )
+ assertNotNull(
+ reporter.openTelemetryHistograms("test_timer_timer_75_percentile")
+ )
+ assertNotNull(
+ reporter.openTelemetryHistograms("test_timer_timer_95_percentile")
+ )
+ assertNotNull(
+ reporter.openTelemetryHistograms("test_timer_timer_95_percentile")
+ )
+ assertNotNull(
+ reporter.openTelemetryHistograms("test_timer_timer_99_percentile")
+ )
+ assertNotNull(
+ reporter.openTelemetryHistograms("test_timer_timer_999_percentile")
+ )
+
+ assertNotNull(
+ reporter.openTelemetryHistograms("test_timer_timer_fifteen_minute_rate")
+ )
+ assertNotNull(
+ reporter.openTelemetryHistograms("test_timer_timer_five_minute_rate")
+ )
+ assertNotNull(
+ reporter.openTelemetryHistograms("test_timer_timer_one_minute_rate")
+ )
+ assertNotNull(
+ reporter.openTelemetryHistograms("test_timer_timer_mean_rate")
+ )
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/metrics/sink/opentelemetry/OpenTelemetryPushSinkSuite.scala b/core/src/test/scala/org/apache/spark/metrics/sink/opentelemetry/OpenTelemetryPushSinkSuite.scala
new file mode 100644
index 0000000000000..25aca82a22c40
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/metrics/sink/opentelemetry/OpenTelemetryPushSinkSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink.opentelemetry
+
+import java.util.Properties
+
+import com.codahale.metrics._
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.scalatest.PrivateMethodTester
+
+import org.apache.spark.SparkFunSuite
+
+class OpenTelemetryPushSinkSuite
+ extends SparkFunSuite with PrivateMethodTester {
+ test("fetch properties map") {
+ val properties = new Properties
+ properties.put("foo1.foo2.foo3.foo4.header.key1.key2.key3", "value1")
+ properties.put("foo1.foo2.foo3.foo4.header.key2", "value2")
+ val keyPrefix = "foo1.foo2.foo3.foo4.header"
+ val propertiesMap: Map[String, String] = OpenTelemetryPushSink invokePrivate
+ PrivateMethod[Map[String, String]](Symbol("fetchMapFromProperties"))(
+ properties,
+ keyPrefix
+ )
+
+ assert("value1".equals(propertiesMap("key1.key2.key3")))
+ assert("value2".equals(propertiesMap("key2")))
+ }
+
+ test("OpenTelemetry sink with one counter added") {
+ val props = new Properties
+ props.put("endpoint", "http://127.0.0.1:10086")
+ val registry = new MetricRegistry
+ val sink = new OpenTelemetryPushSink(props, registry)
+ sink.start()
+ val reporter = sink.reporter
+ val counter = registry.counter("test-counter")
+ assertEquals(reporter.openTelemetryCounters.size, 1)
+ }
+}
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 34fbb8450d544..181cd28cda78d 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -206,6 +206,7 @@ def __hash__(self):
sbt_test_goals=[
"core/test",
],
+ build_profile_flags=["-Popentelemetry"],
)
api = Module(
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 51e36f78f2ff8..ba9564c165ca7 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -433,6 +433,10 @@ object SparkBuild extends PomBuild {
enable(SparkR.settings)(core)
}
+ if (!profiles.contains("opentelemetry")) {
+ enable(OpenTelemetry.settings)(core)
+ }
+
/**
* Adds the ability to run the spark shell directly from SBT without building an assembly
* jar.
@@ -1329,6 +1333,13 @@ object Volcano {
)
}
+object OpenTelemetry {
+ // Exclude all OpenTelemetry files for Compile and Test
+ lazy val settings = Seq(
+ unmanagedSources / excludeFilter := HiddenFileFilter || "OpenTelemetry*.scala"
+ )
+}
+
trait SharedUnidocSettings {
import BuildCommons._