From 4de1658f2dea72ded4c86d699f90432b8d965370 Mon Sep 17 00:00:00 2001 From: Raajay Viswanathan Date: Thu, 20 Jul 2017 10:21:23 -0700 Subject: [PATCH] Add metrics reporting service. --- .../spark/deploy/ExternalShuffleService.scala | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 8d491ddf6e09..b56340156a1c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.collection.JavaConverters._ @@ -30,7 +30,33 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.{TransportServer, TransportServerBootstrap} import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.network.util.TransportConf -import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} + + +private +class MetricReportingService(metricsSystem: MetricsSystem, + reportingIntervalInSeconds: Int) + extends Runnable with Logging { + + private val threadPool = ThreadUtils + .newDaemonSingleThreadScheduledExecutor("metrics-reporter") + + override def run(): Unit = { + metricsSystem.report() + } + + def start() : Unit = { + logInfo("Starting a metric reporting service. Interval=" + + reportingIntervalInSeconds + "s") + threadPool.scheduleAtFixedRate(this, + reportingIntervalInSeconds, reportingIntervalInSeconds, TimeUnit.SECONDS) + } + + def stop() : Unit = { + threadPool.shutdownNow() + } +} + /** * Provides a server from which Executors can read shuffle files (rather than reading directly from @@ -45,6 +71,9 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana protected val masterMetricsSystem = MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager) + private val metricReportingService = new MetricReportingService(masterMetricsSystem, + sparkConf.getInt("spark.shuffle.ess.metricReportingInterval", 10)) + private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false) private val port = sparkConf.getInt("spark.shuffle.service.port", 7337) @@ -85,6 +114,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana masterMetricsSystem.registerSource(shuffleServiceSource) masterMetricsSystem.start() + // Start the metrics reporting service + metricReportingService.start() } /** Clean up all shuffle files associated with an application that has exited. */ @@ -97,6 +128,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana server.close() server = null } + metricReportingService.stop() + masterMetricsSystem.stop() } }