Skip to content

Commit

Permalink
Wait until later to register driver metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Oct 24, 2019
1 parent e04e0eb commit 2dd8dff
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 15 deletions.
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ default Map<String, String> init(SparkContext sc, PluginContext pluginContext) {
return Collections.emptyMap();
}

/**
* Register metrics published by the plugin with Spark's metrics system.
* <p>
* This method is called later in the initialization of the Spark application, after most
* subsystems are up and the application ID is known. If there are metrics registered in
* the registry ({@link PluginContext#metricRegistry()}), then a metrics source with the
* plugin name will be created.
* <p>
* Note that even though the metric registry is still accessible after this method is called,
* registering new metrics after this method is called may result in the metrics not being
* available.
*
* @param appId The application ID from the cluster manager.
* @param pluginContext Additional plugin-specific about the Spark application where the plugin
* is running.
*/
default void registerMetrics(String appId, PluginContext pluginContext) {}

/**
* RPC message handler.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public interface ExecutorPlugin {
* When a Spark plugin provides an executor plugin, this method will be called during the
* initialization of the executor process. It will block executor initialization until it
* returns.
* <p>
* Executor plugins that publish metrics should register all metrics with the context's
* registry ({@link PluginContext#metricRegistry()}) when this method is called. Metrics
* registered afterwards are not guaranteed to show up.
*
* @param ctx Context information for the executor where the plugin is running.
* @param extraConf Extra configuration provided by the driver component during its
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,6 @@ public interface PluginContext {

/**
* Registry where to register metrics published by the plugin associated with this context.
* <p>
* Plugins should register all needed metrics in their initialization callback, otherwise
* Spark's metrics system may not properly report them. It's safe for plugins to access this
* registry later to interface with the registered metrics, but adding or removing metrics
* after initialization may not have the desired effect.
* <p>
* If the plugin does not register any metrics during its initialization call, a metrics
* source for the plugin will not be generated.
*/
MetricRegistry metricRegistry();

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ class SparkContext(config: SparkConf) extends Logging {
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
appStatusSource.foreach(_env.metricsSystem.registerSource(_))
_plugins.foreach(_.registerMetrics(applicationId))
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ import org.apache.spark.util.Utils
sealed abstract class PluginContainer {

def shutdown(): Unit
def registerMetrics(appId: String): Unit

}

private class DriverPluginContainer(sc: SparkContext, plugins: Seq[SparkPlugin])
extends PluginContainer with Logging {

private val driverPlugins: Seq[(String, DriverPlugin)] = plugins.flatMap { p =>
private val driverPlugins: Seq[(String, DriverPlugin, PluginContextImpl)] = plugins.flatMap { p =>
val driverPlugin = p.driverPlugin()
if (driverPlugin != null) {
val name = p.getClass().getName()
Expand All @@ -48,21 +49,28 @@ private class DriverPluginContainer(sc: SparkContext, plugins: Seq[SparkPlugin])
sc.conf.set(s"${PluginContainer.EXTRA_CONF_PREFIX}$name.$k", v)
}
}
ctx.registerMetrics()
logInfo(s"Initialized driver component for plugin $name.")
Some(p.getClass().getName() -> driverPlugin)
Some((p.getClass().getName(), driverPlugin, ctx))
} else {
None
}
}

if (driverPlugins.nonEmpty) {
val pluginsByName = driverPlugins.map { case (name, plugin, _) => (name, plugin) }.toMap
sc.env.rpcEnv.setupEndpoint(classOf[PluginEndpoint].getName(),
new PluginEndpoint(driverPlugins.toMap, sc.env.rpcEnv))
new PluginEndpoint(pluginsByName, sc.env.rpcEnv))
}

override def registerMetrics(appId: String): Unit = {
driverPlugins.foreach { case (_, plugin, ctx) =>
plugin.registerMetrics(appId, ctx)
ctx.registerMetrics()
}
}

override def shutdown(): Unit = {
driverPlugins.foreach { case (name, plugin) =>
driverPlugins.foreach { case (name, plugin, _) =>
try {
logDebug(s"Stopping plugin $name.")
plugin.shutdown()
Expand Down Expand Up @@ -104,6 +112,10 @@ private class ExecutorPluginContainer(env: SparkEnv, plugins: Seq[SparkPlugin])
}
}

override def registerMetrics(appId: String): Unit = {
throw new IllegalStateException("Should not be called for the executor container.")
}

override def shutdown(): Unit = {
executorPlugins.foreach { case (name, plugin) =>
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,14 @@ class TestSparkPlugin extends SparkPlugin {
private class TestDriverPlugin extends DriverPlugin {

override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
TestSparkPlugin.driverContext = ctx
TestSparkPlugin.extraConf
}

override def registerMetrics(appId: String, ctx: PluginContext): Unit = {
ctx.metricRegistry().register("driverMetric", new Gauge[Int] {
override def getValue(): Int = 42
})
TestSparkPlugin.driverContext = ctx
TestSparkPlugin.extraConf
}

override def receive(msg: AnyRef): AnyRef = msg match {
Expand Down

0 comments on commit 2dd8dff

Please sign in to comment.