Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.util.concurrent.CountDownLatch
import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.collection.JavaConverters._

Expand All @@ -30,7 +30,33 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.server.{TransportServer, TransportServerBootstrap}
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.network.util.TransportConf
import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}


private
class MetricReportingService(metricsSystem: MetricsSystem,
reportingIntervalInSeconds: Int)
extends Runnable with Logging {

private val threadPool = ThreadUtils
.newDaemonSingleThreadScheduledExecutor("metrics-reporter")

override def run(): Unit = {
metricsSystem.report()
}

def start() : Unit = {
logInfo("Starting a metric reporting service. Interval="
+ reportingIntervalInSeconds + "s")
threadPool.scheduleAtFixedRate(this,
reportingIntervalInSeconds, reportingIntervalInSeconds, TimeUnit.SECONDS)
}

def stop() : Unit = {
threadPool.shutdownNow()
}
}


/**
* Provides a server from which Executors can read shuffle files (rather than reading directly from
Expand All @@ -45,6 +71,9 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
protected val masterMetricsSystem =
MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager)

private val metricReportingService = new MetricReportingService(masterMetricsSystem,
sparkConf.getInt("spark.shuffle.ess.metricReportingInterval", 10))

private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)

Expand Down Expand Up @@ -85,6 +114,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana

masterMetricsSystem.registerSource(shuffleServiceSource)
masterMetricsSystem.start()
// Start the metrics reporting service
metricReportingService.start()
}

/** Clean up all shuffle files associated with an application that has exited. */
Expand All @@ -97,6 +128,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
server.close()
server = null
}
metricReportingService.stop()
masterMetricsSystem.stop()
}
}

Expand Down