From 4cbe86e653924892e73c6135e4c9646c84f4e92d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 17 Oct 2019 12:55:57 -0700 Subject: [PATCH 01/16] [SPARK-29397][core] Extend plugin interface to include the driver. Spark 2.4 added the ability for executor plugins to be loaded into Spark (see SPARK-24918). That feature intentionally skipped the driver to keep changes small, and also because it is possible to load code into the Spark driver using listeners + configuration. But that is a bit awkward, because the listener interface does not provide hooks into a lot of Spark functionality. This change reworks the executor plugin interface to also extend to the driver. - there's a "SparkPlugin" main interface that provides APIs to load driver and executor components. - custom metric support (added in SPARK-28091) can be used by plugins to register metrics both in the driver process and in executors. - a communication channel now exists that allows the plugin's executor components to send messages to the plugin's driver component easily, using the existing Spark RPC system. The latter was a feature intentionally left out of the original plugin design (also because it didn't include a driver component). To avoid polluting the "org.apache.spark" namespace, I added the new interfaces to the "org.apache.spark.api" package, which seems like a better place in any case. The actual implementation is kept in an internal package. The change includes unit tests for the new interface and features, but I've also been running a custom plugin that extends the new API in real applications. --- .../apache/spark/api/plugin/DriverPlugin.java | 91 +++++++++ .../spark/api/plugin/ExecutorPlugin.java | 53 +++++ .../spark/api/plugin/PluginContext.java | 92 +++++++++ .../apache/spark/api/plugin/SparkPlugin.java | 53 +++++ .../scala/org/apache/spark/SparkContext.scala | 8 + .../org/apache/spark/executor/Executor.scala | 4 + .../spark/internal/config/package.scala | 11 ++ .../internal/plugin/PluginContainer.scala | 141 +++++++++++++ .../internal/plugin/PluginContextImpl.scala | 84 ++++++++ .../internal/plugin/PluginEndpoint.scala | 64 ++++++ .../plugin/PluginContainerSuite.scala | 185 ++++++++++++++++++ docs/monitoring.md | 22 ++- 12 files changed, 804 insertions(+), 4 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java create mode 100644 core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java create mode 100644 core/src/main/java/org/apache/spark/api/plugin/PluginContext.java create mode 100644 core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java create mode 100644 core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala create mode 100644 core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala create mode 100644 core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala create mode 100644 core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala diff --git a/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java new file mode 100644 index 0000000000000..5d910ea8deee3 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.plugin; + +import java.util.Collections; +import java.util.Map; + +import org.apache.spark.SparkContext; +import org.apache.spark.annotation.DeveloperApi; + +/** + * :: DeveloperApi :: + * Driver component of a {@link SparkPlugin}. + * + * @since 3.0.0 + */ +@DeveloperApi +public interface DriverPlugin { + + /** + * Initialize the plugin. + *

+ * This method is called early in the initialization of the Spark driver. Explicitly, it is + * called before the application is submitted to the cluster manager. This means that a lot + * of Spark subsystems may yet not have been initialized. This call also blocks driver + * initialization. + *

+ * It's recommended that plugins be careful about what operations are performed in this call, + * preferrably performing expensive operations in a separate thread, or postponing them until + * the application has fully started. + * + * @param context Information about the Spark application where the plugin is running. + * @return A map containing configuration data for the executor-side component of the plugin. + * This map will be provided to the {@link ExecutorPlugin}'s initialization method. + */ + default Map init(SparkContext sc, PluginContext pluginContext) { + return Collections.emptyMap(); + } + + /** + * RPC message handler. + *

+ * Plugins can use Spark's RPC system to send messages from executors to the driver (but not + * the other way around, currently). Messages sent by the executor component of the plugin will + * be delivered to this method, and the returned value will be sent back to the executor as + * the reply, if the executor has requested one. + *

+ * Any exception thrown will be sent back to the executor as an error, in case it is expecting + * a reply. In case a reply is not expected, a log message will be written to the driver log. + *

+ * The implementation of this handler should be thread-safe. + *

+ * Note all plugins share RPC dispatch threads, and this method is called synchronously. So + * performing expensive operations in this handler may affect the operation of other active + * plugins. Internal Spark endpoints are not directly affected, though, since they use different + * threads. + *

+ * Spark guarantees that the driver component will be ready to receive messages through this + * handler when executors are started. + * + * @param message The incoming message. + * @return Value to be returned to the caller. Ignored if the caller does not expect a reply. + */ + default Object receive(Object message) throws Exception { + throw new UnsupportedOperationException(); + } + + /** + * Informs the plugin that the Spark application is shutting down. + *

+ * This method is called during the driver shutdown phase. It is recommended that plugins + * not use any Spark functions (e.g. send RPC messages) during this call. + */ + default void shutdown() {} + +} diff --git a/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java new file mode 100644 index 0000000000000..00402d01adf25 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.plugin; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; + +/** + * :: DeveloperApi :: + * Executor component of a {@link SparkPlugin}. + * + * @since 3.0.0 + */ +@DeveloperApi +public interface ExecutorPlugin { + + /** + * Initialize the executor plugin. + *

+ * When a Spark plugin provides an executor plugin, this method will be called during the + * initialization of the executor process. It will block executor initialization until it + * returns. + * + * @param ctx Context information for the executor where the plugin is running. + * @param extraConf Extra configuration provided by the driver component during its + * initialization. + */ + default void init(PluginContext ctx, Map extraConf) {} + + /** + * Clean up and terminate this plugin. + *

+ * This method is called during the executor shutdown phase, and blocks executor shutdown. + */ + default void shutdown() {} + +} diff --git a/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java b/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java new file mode 100644 index 0000000000000..1151debc87feb --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.plugin; + +import java.io.IOException; + +import com.codahale.metrics.MetricRegistry; + +import org.apache.spark.SparkConf; +import org.apache.spark.annotation.DeveloperApi; + +/** + * :: DeveloperApi :: + * Context information and operations for plugins loaded by Spark. + *

+ * An instance of this class is provided to plugins in their initialization method. It is safe + * for plugins to keep a reference to the instance for later use (for example, to send messages + * to the plugin's driver component). + *

+ * Context instances are plugin-specific, so metrics and messages are tied each plugin. It is + * not possible for a plugin to directly interact with other plugins. + * + * @since 3.0.0 + */ +@DeveloperApi +public interface PluginContext { + + /** + * Registry where to register metrics published by the plugin associated with this context. + *

+ * Plugins should register all needed metrics in their initialization callback, otherwise + * Spark's metrics system may not properly report them. It's safe for plugins to access this + * registry later to interface with the registered metrics, but adding or removing metrics + * after initialization may not have the desired effect. + *

+ * If the plugin does not register any metrics during its initialization call, a metrics + * source for the plugin will not be generated. + */ + MetricRegistry metricRegistry(); + + /** Configuration of the Spark application. */ + SparkConf conf(); + + /** Executor ID of the process. On the driver, this will identify the driver. */ + String executorID(); + + /** The host name which is being used by the Spark process for communication. */ + String hostname(); + + /** + * Send a message to the plugin's driver-side component. + *

+ * This method sends a message to the driver-side component of the plugin, without expecting + * a reply. It returns as soon as the message is enqueued for sending. + *

+ * The message must be serializable. + * + * @param message Message to be sent. + */ + void send(Object message) throws IOException; + + /** + * Send an RPC to the plugin's driver-side component. + *

+ * This method sends a message to the driver-side component of the plugin, and blocks until a + * reply arrives, or the configured RPC ask timeout (spark.rpc.askTimeout) elapses. + *

+ * If the driver replies with an error, an exception with the corresponding error will be thrown. + *

+ * The message must be serializable. + * + * @param message Message to be sent. + * @return The reply from the driver-side component. + */ + Object ask(Object message) throws Exception; + +} diff --git a/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java new file mode 100644 index 0000000000000..2b1866c9f425b --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.plugin; + +import org.apache.spark.annotation.DeveloperApi; + +/** + * :: DeveloperApi :: + * A plugin that can be dynamically loaded into a Spark application. + *

+ * Plugins can be loaded by adding the plugin's class name to the appropriate Spark configuration. + * Check the Spark configuration documentation for details. + *

+ * Plugins have two components: a driver-side component, of which a single instance is created + * per application, inside the Spark driver. And an executor-side component, of which one instance + * is created in each executor that is started by Spark. Details of each component can be found + * in the documentation for {@link DriverPlugin} and {@link ExecutorPlugin}. + * + * @since 3.0.0 + */ +@DeveloperApi +public interface SparkPlugin { + + /** + * Return the plugin's driver-side component. + * + * @return The driver-side component, or null if one is not needed. + */ + DriverPlugin driverPlugin(); + + /** + * Return the plugin's executor-side component. + * + * @return The executor-side component, or null if one is not needed. + */ + ExecutorPlugin executorPlugin(); + +} diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2db880976c3a1..4cb790ba2cf94 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -48,6 +48,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests._ import org.apache.spark.internal.config.UI._ +import org.apache.spark.internal.plugin.PluginContainer import org.apache.spark.io.CompressionCodec import org.apache.spark.metrics.source.JVMCPUSource import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} @@ -220,6 +221,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _heartbeater: Heartbeater = _ private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _ private var _shuffleDriverComponents: ShuffleDriverComponents = _ + private var _plugins: Option[PluginContainer] = None /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -539,6 +541,9 @@ class SparkContext(config: SparkConf) extends Logging { _heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) + // Initialize any plugins before the app is initialized in the cluster manager. + _plugins = PluginContainer(this) + // Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched @@ -1976,6 +1981,9 @@ class SparkContext(config: SparkConf) extends Logging { _listenerBusStarted = false } } + Utils.tryLogNonFatalError { + _plugins.foreach(_.shutdown()) + } Utils.tryLogNonFatalError { _eventLogger.foreach(_.stop()) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index ce6d0322bafd5..763e911dcc52a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -37,6 +37,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.internal.plugin.PluginContainer import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} import org.apache.spark.metrics.source.JVMCPUSource import org.apache.spark.rpc.RpcTimeout @@ -165,6 +166,8 @@ private[spark] class Executor( } } + private val plugins: Option[PluginContainer] = PluginContainer(env) + // Max size of direct result. If task result is bigger than this, we use the block manager // to send the result back. private val maxDirectResultSize = Math.min( @@ -298,6 +301,7 @@ private[spark] class Executor( } } } + plugins.foreach(_.shutdown()) if (!isLocal) { env.stop() } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 36211dc2ed4f8..b3fa216bbc32b 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1159,6 +1159,17 @@ package object config { s"The value must be in allowed range [1,048,576, ${MAX_BUFFER_SIZE_BYTES}].") .createWithDefault(1024 * 1024) + private[spark] val STATIC_PLUGINS_LIST = "spark.plugins.static" + + private[spark] val PLUGINS = + ConfigBuilder("spark.plugins") + .withPrepended(STATIC_PLUGINS_LIST, separator = ",") + .doc("Comma-separated list of class names implementing " + + "org.apache.spark.api.plugin.SparkPlugin to load into the application.") + .stringConf + .toSequence + .createWithDefault(Nil) + private[spark] val EXECUTOR_PLUGINS = ConfigBuilder("spark.executor.plugins") .doc("Comma-separated list of class names for \"plugins\" implementing " + diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala new file mode 100644 index 0000000000000..5d423a0c7ebf0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.plugin + +import scala.collection.JavaConverters._ +import scala.util.{Either, Left, Right} + +import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.api.plugin._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils + +sealed abstract class PluginContainer { + + def shutdown(): Unit + +} + +private class DriverPluginContainer(sc: SparkContext, plugins: Seq[SparkPlugin]) + extends PluginContainer with Logging { + + private val driverPlugins: Seq[(String, DriverPlugin)] = plugins.flatMap { p => + val driverPlugin = p.driverPlugin() + if (driverPlugin != null) { + val name = p.getClass().getName() + val ctx = new PluginContextImpl(name, sc.env.rpcEnv, sc.env.metricsSystem, sc.conf, + sc.env.executorId) + + val extraConf = driverPlugin.init(sc, ctx) + if (extraConf != null) { + extraConf.asScala.foreach { case (k, v) => + sc.conf.set(s"${PluginContainer.EXTRA_CONF_PREFIX}$name.$k", v) + } + } + ctx.registerMetrics() + logInfo(s"Initialized driver component for plugin $name.") + Some(p.getClass().getName() -> driverPlugin) + } else { + None + } + } + + if (driverPlugins.nonEmpty) { + sc.env.rpcEnv.setupEndpoint(classOf[PluginEndpoint].getName(), + new PluginEndpoint(driverPlugins.toMap, sc.env.rpcEnv)) + } + + override def shutdown(): Unit = { + driverPlugins.foreach { case (name, plugin) => + try { + plugin.shutdown() + } catch { + case t: Throwable => + logInfo(s"Exception while shutting down plugin $name.", t) + } + } + } + +} + +private class ExecutorPluginContainer(env: SparkEnv, plugins: Seq[SparkPlugin]) + extends PluginContainer with Logging { + + private val executorPlugins: Seq[(String, ExecutorPlugin)] = { + val allExtraConf = env.conf.getAllWithPrefix(PluginContainer.EXTRA_CONF_PREFIX) + + plugins.flatMap { p => + val executorPlugin = p.executorPlugin() + if (executorPlugin != null) { + val name = p.getClass().getName() + val prefix = name + "." + val extraConf = allExtraConf + .filter { case (k, v) => k.startsWith(prefix) } + .map { case (k, v) => k.substring(prefix.length()) -> v } + .toMap + .asJava + val ctx = new PluginContextImpl(name, env.rpcEnv, env.metricsSystem, env.conf, + env.executorId) + executorPlugin.init(ctx, extraConf) + ctx.registerMetrics() + + logInfo(s"Initialized executor component for plugin $name.") + Some(p.getClass().getName() -> executorPlugin) + } else { + None + } + } + } + + override def shutdown(): Unit = { + executorPlugins.foreach { case (name, plugin) => + try { + logDebug(s"Stopping plugin $name.") + plugin.shutdown() + } catch { + case t: Throwable => + logInfo(s"Exception while shutting down plugin $name.", t) + } + } + } + +} + +object PluginContainer { + + val EXTRA_CONF_PREFIX = "spark.plugins.__conf__." + + def apply(sc: SparkContext): Option[PluginContainer] = PluginContainer(Left(sc)) + + def apply(env: SparkEnv): Option[PluginContainer] = PluginContainer(Right(env)) + + private def apply(ctx: Either[SparkContext, SparkEnv]): Option[PluginContainer] = { + val conf = ctx.fold(_.conf, _.conf) + val plugins = Utils.loadExtensions(classOf[SparkPlugin], conf.get(PLUGINS).distinct, conf) + if (plugins.nonEmpty) { + ctx match { + case Left(sc) => Some(new DriverPluginContainer(sc, plugins)) + case Right(env) => Some(new ExecutorPluginContainer(env, plugins)) + } + } else { + None + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala new file mode 100644 index 0000000000000..61da7b6748aa4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.plugin + +import com.codahale.metrics.MetricRegistry + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.api.plugin.PluginContext +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.source.Source +import org.apache.spark.rpc.RpcEnv +import org.apache.spark.util.RpcUtils + +private class PluginContextImpl( + pluginName: String, + rpcEnv: RpcEnv, + metricsSystem: MetricsSystem, + override val conf: SparkConf, + override val executorID: String) + extends PluginContext with Logging { + + override def hostname(): String = rpcEnv.address.hostPort.split(":")(0) + + private val registry = new MetricRegistry() + + private lazy val driverEndpoint = try { + RpcUtils.makeDriverRef(classOf[PluginEndpoint].getName(), conf, rpcEnv) + } catch { + case e: Exception => + logWarning(s"Failed to create driver plugin endpoint ref.", e) + null + } + + override def metricRegistry(): MetricRegistry = registry + + override def send(message: AnyRef): Unit = { + if (driverEndpoint == null) { + throw new IllegalStateException("Driver endpoint is not known.") + } + driverEndpoint.send(PluginMessage(pluginName, message)) + } + + override def ask(message: AnyRef): AnyRef = { + try { + if (driverEndpoint != null) { + driverEndpoint.askSync[AnyRef](PluginMessage(pluginName, message)) + } else { + throw new IllegalStateException("Driver endpoint is not known.") + } + } catch { + case e: SparkException if e.getCause() != null => + throw e.getCause() + } + } + + def registerMetrics(): Unit = { + if (!registry.getMetrics().isEmpty()) { + val src = new PluginMetricsSource(pluginName, registry) + metricsSystem.registerSource(src) + } + } + + class PluginMetricsSource( + override val sourceName: String, + override val metricRegistry: MetricRegistry) + extends Source + +} diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala new file mode 100644 index 0000000000000..9a59b6bf678f9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.plugin + +import org.apache.spark.api.plugin.DriverPlugin +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEnv} + +case class PluginMessage(pluginName: String, message: AnyRef) + +private class PluginEndpoint( + plugins: Map[String, DriverPlugin], + override val rpcEnv: RpcEnv) + extends IsolatedRpcEndpoint with Logging { + + override def receive: PartialFunction[Any, Unit] = { + case PluginMessage(pluginName, message) => + plugins.get(pluginName) match { + case Some(plugin) => + try { + val reply = plugin.receive(message) + if (reply != null) { + logInfo( + s"Plugin $pluginName returned reply for one-way message of type " + + s"${message.getClass().getName()}.") + } + } catch { + case e: Exception => + logWarning(s"Error in plugin $pluginName when handling message of type " + + s"${message.getClass().getName()}.", e) + } + + case None => + throw new IllegalArgumentException(s"Received message for unknown plugin $pluginName.") + } + } + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case PluginMessage(pluginName, message) => + plugins.get(pluginName) match { + case Some(plugin) => + context.reply(plugin.receive(message)) + + case None => + throw new IllegalArgumentException(s"Received message for unknown plugin $pluginName.") + } + } + +} diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala new file mode 100644 index 0000000000000..bbbeb7ff5ba59 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.plugin + +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ + +import com.codahale.metrics.Gauge +import org.mockito.ArgumentMatchers.{any, eq => meq} +import org.mockito.Mockito.{mock, spy, verify, when} +import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} + +import org.apache.spark.{ExecutorPlugin => _, _} +import org.apache.spark.api.plugin._ +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher + +class PluginContainerSuite extends SparkFunSuite with BeforeAndAfterEach with LocalSparkContext { + + override def afterEach(): Unit = { + TestSparkPlugin.reset() + super.afterEach() + } + + test("plugin initialization and communication") { + val conf = new SparkConf() + .setAppName(getClass().getName()) + .set(SparkLauncher.SPARK_MASTER, "local[1]") + .set(PLUGINS, Seq(classOf[TestSparkPlugin].getName())) + + TestSparkPlugin.extraConf = Map("foo" -> "bar", "bar" -> "baz").asJava + + sc = new SparkContext(conf) + + assert(TestSparkPlugin.driverPlugin != null) + verify(TestSparkPlugin.driverPlugin).init(meq(sc), any()) + + assert(TestSparkPlugin.executorPlugin != null) + verify(TestSparkPlugin.executorPlugin).init(any(), meq(TestSparkPlugin.extraConf)) + + assert(TestSparkPlugin.executorContext != null) + + // One way messages don't block, so need to loop checking whether it arrives. + TestSparkPlugin.executorContext.send("oneway") + eventually(timeout(10.seconds), interval(10.millis)) { + verify(TestSparkPlugin.driverPlugin).receive("oneway") + } + + assert(TestSparkPlugin.executorContext.ask("ask") === "reply") + + val err = intercept[Exception] { + TestSparkPlugin.executorContext.ask("unknown message") + } + assert(err.getMessage().contains("unknown message")) + + // It should be possible for the driver plugin to send a message to itself, even if that doesn't + // make a whole lot of sense. It at least allows the same context class to be used on both + // sides. + assert(TestSparkPlugin.driverContext != null) + assert(TestSparkPlugin.driverContext.ask("ask") === "reply") + + val metricSources = sc.env.metricsSystem.getSourcesByName(classOf[TestSparkPlugin].getName()) + assert(metricSources.size === 2) + + def findMetric(name: String): Int = { + val allFound = metricSources.filter(_.metricRegistry.getGauges().containsKey(name)) + assert(allFound.size === 1) + allFound.head.metricRegistry.getGauges().get(name).asInstanceOf[Gauge[Int]].getValue() + } + + assert(findMetric("driverMetric") === 42) + assert(findMetric("executorMetric") === 84) + + sc.stop() + sc = null + + verify(TestSparkPlugin.driverPlugin).shutdown() + verify(TestSparkPlugin.executorPlugin).shutdown() + } + + test("do nothing if plugins are not configured") { + val conf = new SparkConf() + val env = mock(classOf[SparkEnv]) + when(env.conf).thenReturn(conf) + assert(PluginContainer(env) === None) + } + + test("merging of config options") { + val conf = new SparkConf() + .setAppName(getClass().getName()) + .set(SparkLauncher.SPARK_MASTER, "local[1]") + .set(PLUGINS, Seq(classOf[TestSparkPlugin].getName())) + .set(STATIC_PLUGINS_LIST, classOf[TestSparkPlugin].getName()) + + assert(conf.get(PLUGINS).size === 2) + + sc = new SparkContext(conf) + // Just check plugin is loaded. The plugin code below checks whether a single copy was loaded. + assert(TestSparkPlugin.driverPlugin != null) + } + +} + +class TestSparkPlugin extends SparkPlugin { + + override def driverPlugin(): DriverPlugin = { + val p = new TestDriverPlugin() + require(TestSparkPlugin.driverPlugin == null, "Driver plugin already initialized.") + TestSparkPlugin.driverPlugin = spy(p) + TestSparkPlugin.driverPlugin + } + + override def executorPlugin(): ExecutorPlugin = { + val p = new TestExecutorPlugin() + require(TestSparkPlugin.executorPlugin == null, "Executor plugin already initialized.") + TestSparkPlugin.executorPlugin = spy(p) + TestSparkPlugin.executorPlugin + } + +} + +private class TestDriverPlugin extends DriverPlugin { + + override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { + ctx.metricRegistry().register("driverMetric", new Gauge[Int] { + override def getValue(): Int = 42 + }) + TestSparkPlugin.driverContext = ctx + TestSparkPlugin.extraConf + } + + override def receive(msg: AnyRef): AnyRef = msg match { + case "oneway" => null + case "ask" => "reply" + case other => throw new IllegalArgumentException(s"unknown: $other") + } + +} + +private class TestExecutorPlugin extends ExecutorPlugin { + + override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = { + ctx.metricRegistry().register("executorMetric", new Gauge[Int] { + override def getValue(): Int = 84 + }) + TestSparkPlugin.executorContext = ctx + } + +} + +private object TestSparkPlugin { + var driverPlugin: TestDriverPlugin = _ + var driverContext: PluginContext = _ + + var executorPlugin: TestExecutorPlugin = _ + var executorContext: PluginContext = _ + + var extraConf: JMap[String, String] = _ + + def reset(): Unit = { + driverPlugin = null + driverContext = null + executorPlugin = null + executorContext = null + extraConf = null + } +} diff --git a/docs/monitoring.md b/docs/monitoring.md index 8cb237df0ba70..9d68388980c72 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1060,10 +1060,10 @@ when running in local mode. - hiveClientCalls.count - sourceCodeSize (histogram) -- namespace= - - Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and - configured using the Spark executor plugin infrastructure. - See also the configuration parameter `spark.executor.plugins` +- namespace= + - Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and + configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load + custom plugins into Spark. ### Source = JVM Source Notes: @@ -1141,3 +1141,17 @@ can provide fine-grained profiling on individual nodes. * JVM utilities such as `jstack` for providing stack traces, `jmap` for creating heap-dumps, `jstat` for reporting time-series statistics and `jconsole` for visually exploring various JVM properties are useful for those comfortable with JVM internals. + +Spark also provides a plugin API so that custom instrumentation code can be added to Spark +applications. There are two configuration available for loading plugins into Spark: + +- spark.plugins +- spark.plugins.static + +Both do the same thing: they take a comma-separated list of class names that implement the +org.apache.spark.api.plugin.SparkPlugin interface. The two names exist so that +it's possible for one "static" list to be placed in the Spark default config file, allowing +users to easily add other plugins from the command line option without overwriting the +"static" list. + +Duplicate plugins are ignored. From 6a0cb1ce4e086c1eda95e97d4080c83b9f6d435a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 18 Oct 2019 15:06:55 -0700 Subject: [PATCH 02/16] Javadoc fix. --- .../main/java/org/apache/spark/api/plugin/DriverPlugin.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java index 5d910ea8deee3..4cd246e0ac4d1 100644 --- a/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java +++ b/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java @@ -44,7 +44,9 @@ public interface DriverPlugin { * preferrably performing expensive operations in a separate thread, or postponing them until * the application has fully started. * - * @param context Information about the Spark application where the plugin is running. + * @param sc The SparkContext loading the plugin. + * @param pluginContext Additional plugin-specific about the Spark application where the plugin + * is running. * @return A map containing configuration data for the executor-side component of the plugin. * This map will be provided to the {@link ExecutorPlugin}'s initialization method. */ From 94329b2e48e93d395fa7460d0b6c6b5c31216118 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 18 Oct 2019 19:54:59 -0700 Subject: [PATCH 03/16] Grammar. --- docs/monitoring.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/monitoring.md b/docs/monitoring.md index 9d68388980c72..9223fcfd953ea 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1143,7 +1143,7 @@ can provide fine-grained profiling on individual nodes. properties are useful for those comfortable with JVM internals. Spark also provides a plugin API so that custom instrumentation code can be added to Spark -applications. There are two configuration available for loading plugins into Spark: +applications. There are two configuration keys available for loading plugins into Spark: - spark.plugins - spark.plugins.static From 7cc7536d65efcd94abfb2dd385b0d380a23864ff Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Sat, 19 Oct 2019 12:44:55 -0700 Subject: [PATCH 04/16] Grammar. --- docs/monitoring.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/monitoring.md b/docs/monitoring.md index 9223fcfd953ea..4d3b6032d4064 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1148,10 +1148,9 @@ applications. There are two configuration keys available for loading plugins int - spark.plugins - spark.plugins.static -Both do the same thing: they take a comma-separated list of class names that implement the -org.apache.spark.api.plugin.SparkPlugin interface. The two names exist so that -it's possible for one "static" list to be placed in the Spark default config file, allowing -users to easily add other plugins from the command line option without overwriting the -"static" list. +Both take a comma-separated list of class names that implement the +org.apache.spark.api.plugin.SparkPlugin interface. The two names exist so that it's +possible for one "static" list to be placed in the Spark default config file, allowing users to +easily add other plugins from the command line without overwriting the "static" list. Duplicate plugins are ignored. From 5dd6afdcbf373a1064d92b7fc5a658d3a11b66fc Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 23 Oct 2019 09:05:09 -0700 Subject: [PATCH 05/16] Feedback. --- .../java/org/apache/spark/api/plugin/DriverPlugin.java | 4 ++-- .../java/org/apache/spark/api/plugin/SparkPlugin.java | 8 ++++---- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/internal/config/package.scala | 4 ++-- .../apache/spark/internal/plugin/PluginContainer.scala | 4 +--- .../spark/internal/plugin/PluginContainerSuite.scala | 2 +- docs/monitoring.md | 6 +++--- 7 files changed, 14 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java index 4cd246e0ac4d1..d7e1c3cb05c14 100644 --- a/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java +++ b/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java @@ -36,8 +36,8 @@ public interface DriverPlugin { * Initialize the plugin. *

* This method is called early in the initialization of the Spark driver. Explicitly, it is - * called before the application is submitted to the cluster manager. This means that a lot - * of Spark subsystems may yet not have been initialized. This call also blocks driver + * called before the Spark driver's task scheduler is initialized. This means that a lot + * of other Spark subsystems may yet not have been initialized. This call also blocks driver * initialization. *

* It's recommended that plugins be careful about what operations are performed in this call, diff --git a/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java index 2b1866c9f425b..a500f5d2188f0 100644 --- a/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java +++ b/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java @@ -26,10 +26,10 @@ * Plugins can be loaded by adding the plugin's class name to the appropriate Spark configuration. * Check the Spark configuration documentation for details. *

- * Plugins have two components: a driver-side component, of which a single instance is created - * per application, inside the Spark driver. And an executor-side component, of which one instance - * is created in each executor that is started by Spark. Details of each component can be found - * in the documentation for {@link DriverPlugin} and {@link ExecutorPlugin}. + * Plugins have two optional components: a driver-side component, of which a single instance is + * created per application, inside the Spark driver. And an executor-side component, of which one + * instance is created in each executor that is started by Spark. Details of each component can be + * found in the documentation for {@link DriverPlugin} and {@link ExecutorPlugin}. * * @since 3.0.0 */ diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4cb790ba2cf94..a947373ef78ae 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -541,7 +541,7 @@ class SparkContext(config: SparkConf) extends Logging { _heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) - // Initialize any plugins before the app is initialized in the cluster manager. + // Initialize any plugins before the task scheduler is initialized. _plugins = PluginContainer(this) // Create and start the scheduler diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b3fa216bbc32b..35811aeec1988 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1159,11 +1159,11 @@ package object config { s"The value must be in allowed range [1,048,576, ${MAX_BUFFER_SIZE_BYTES}].") .createWithDefault(1024 * 1024) - private[spark] val STATIC_PLUGINS_LIST = "spark.plugins.static" + private[spark] val DEFAULT_PLUGINS_LIST = "spark.plugins.defaultList" private[spark] val PLUGINS = ConfigBuilder("spark.plugins") - .withPrepended(STATIC_PLUGINS_LIST, separator = ",") + .withPrepended(DEFAULT_PLUGINS_LIST, separator = ",") .doc("Comma-separated list of class names implementing " + "org.apache.spark.api.plugin.SparkPlugin to load into the application.") .stringConf diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala index 5d423a0c7ebf0..a03923189dbbe 100644 --- a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala +++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala @@ -114,12 +114,11 @@ private class ExecutorPluginContainer(env: SparkEnv, plugins: Seq[SparkPlugin]) } } } - } object PluginContainer { - val EXTRA_CONF_PREFIX = "spark.plugins.__conf__." + val EXTRA_CONF_PREFIX = "spark.plugins.__internal_conf__." def apply(sc: SparkContext): Option[PluginContainer] = PluginContainer(Left(sc)) @@ -137,5 +136,4 @@ object PluginContainer { None } } - } diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala index bbbeb7ff5ba59..f2bd90def437b 100644 --- a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala @@ -108,7 +108,7 @@ class PluginContainerSuite extends SparkFunSuite with BeforeAndAfterEach with Lo .setAppName(getClass().getName()) .set(SparkLauncher.SPARK_MASTER, "local[1]") .set(PLUGINS, Seq(classOf[TestSparkPlugin].getName())) - .set(STATIC_PLUGINS_LIST, classOf[TestSparkPlugin].getName()) + .set(DEFAULT_PLUGINS_LIST, classOf[TestSparkPlugin].getName()) assert(conf.get(PLUGINS).size === 2) diff --git a/docs/monitoring.md b/docs/monitoring.md index 4d3b6032d4064..af4b07acbc4a7 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1146,11 +1146,11 @@ Spark also provides a plugin API so that custom instrumentation code can be adde applications. There are two configuration keys available for loading plugins into Spark: - spark.plugins -- spark.plugins.static +- spark.plugins.defaultList Both take a comma-separated list of class names that implement the org.apache.spark.api.plugin.SparkPlugin interface. The two names exist so that it's -possible for one "static" list to be placed in the Spark default config file, allowing users to -easily add other plugins from the command line without overwriting the "static" list. +possible for one list to be placed in the Spark default config file, allowing users to +easily add other plugins from the command line without overwriting the config file's list. Duplicate plugins are ignored. From ccb950e75cfcf3a50f08a73ddf7a445df5098553 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 23 Oct 2019 09:07:27 -0700 Subject: [PATCH 06/16] Missing log. --- .../scala/org/apache/spark/internal/plugin/PluginContainer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala index a03923189dbbe..6507ba12a3191 100644 --- a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala +++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala @@ -64,6 +64,7 @@ private class DriverPluginContainer(sc: SparkContext, plugins: Seq[SparkPlugin]) override def shutdown(): Unit = { driverPlugins.foreach { case (name, plugin) => try { + logDebug(s"Stopping plugin $name.") plugin.shutdown() } catch { case t: Throwable => From e38a8da5d7e3f2bb87eb86e943ec5e7ddbf45858 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 23 Oct 2019 09:57:20 -0700 Subject: [PATCH 07/16] Add non-local test. --- .../plugin/PluginContainerSuite.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala index f2bd90def437b..6548695651186 100644 --- a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala @@ -17,12 +17,15 @@ package org.apache.spark.internal.plugin +import java.io.File +import java.nio.charset.StandardCharsets import java.util.{Map => JMap} import scala.collection.JavaConverters._ import scala.concurrent.duration._ import com.codahale.metrics.Gauge +import com.google.common.io.Files import org.mockito.ArgumentMatchers.{any, eq => meq} import org.mockito.Mockito.{mock, spy, verify, when} import org.scalatest.BeforeAndAfterEach @@ -32,6 +35,7 @@ import org.apache.spark.{ExecutorPlugin => _, _} import org.apache.spark.api.plugin._ import org.apache.spark.internal.config._ import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.util.Utils class PluginContainerSuite extends SparkFunSuite with BeforeAndAfterEach with LocalSparkContext { @@ -117,6 +121,53 @@ class PluginContainerSuite extends SparkFunSuite with BeforeAndAfterEach with Lo assert(TestSparkPlugin.driverPlugin != null) } + test("plugin initialization in non-local mode") { + val path = Utils.createTempDir() + + val conf = new SparkConf() + .setAppName(getClass().getName()) + .set(SparkLauncher.SPARK_MASTER, "local-cluster[2,1,1024]") + .set(PLUGINS, Seq(classOf[NonLocalModeSparkPlugin].getName())) + .set(NonLocalModeSparkPlugin.TEST_PATH_CONF, path.getAbsolutePath()) + + sc = new SparkContext(conf) + TestUtils.waitUntilExecutorsUp(sc, 2, 10000) + + eventually(timeout(10.seconds), interval(100.millis)) { + val children = path.listFiles() + assert(children != null) + assert(children.length >= 3) + } + } +} + +class NonLocalModeSparkPlugin extends SparkPlugin { + + override def driverPlugin(): DriverPlugin = { + new DriverPlugin() { + override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { + NonLocalModeSparkPlugin.writeFile(ctx.conf(), ctx.executorID()) + Map.empty.asJava + } + } + } + + override def executorPlugin(): ExecutorPlugin = { + new ExecutorPlugin() { + override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = { + NonLocalModeSparkPlugin.writeFile(ctx.conf(), ctx.executorID()) + } + } + } +} + +object NonLocalModeSparkPlugin { + val TEST_PATH_CONF = "spark.nonLocalPlugin.path" + + def writeFile(conf: SparkConf, id: String): Unit = { + val path = conf.get(TEST_PATH_CONF) + Files.write(id, new File(path, id), StandardCharsets.UTF_8) + } } class TestSparkPlugin extends SparkPlugin { From 76a5b0dd950fa4d1cab21f04a27b51f6b1a81317 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 24 Oct 2019 09:53:21 -0700 Subject: [PATCH 08/16] Use the executor class loader when loading plugins. --- .../main/scala/org/apache/spark/executor/Executor.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 763e911dcc52a..0f595d095a229 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -166,7 +166,10 @@ private[spark] class Executor( } } - private val plugins: Option[PluginContainer] = PluginContainer(env) + // Plugins need to load using a class loader that includes the executor's user classpath + private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(replClassLoader) { + PluginContainer(env) + } // Max size of direct result. If task result is bigger than this, we use the block manager // to send the result back. @@ -300,8 +303,8 @@ private[spark] class Executor( logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e) } } + plugins.foreach(_.shutdown()) } - plugins.foreach(_.shutdown()) if (!isLocal) { env.stop() } From 2c81c3162d51de520e9f31c38c89926902145cce Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 24 Oct 2019 09:55:07 -0700 Subject: [PATCH 09/16] Config names... --- .../org/apache/spark/internal/plugin/PluginContainer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala index 6507ba12a3191..cfac52e214cd1 100644 --- a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala +++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala @@ -119,7 +119,7 @@ private class ExecutorPluginContainer(env: SparkEnv, plugins: Seq[SparkPlugin]) object PluginContainer { - val EXTRA_CONF_PREFIX = "spark.plugins.__internal_conf__." + val EXTRA_CONF_PREFIX = "spark.plugins.internal." def apply(sc: SparkContext): Option[PluginContainer] = PluginContainer(Left(sc)) From 6b59f5c23e517794dc52dc86cddd76d59b525c3b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 24 Oct 2019 09:55:59 -0700 Subject: [PATCH 10/16] Simplify a comment. --- .../main/java/org/apache/spark/api/plugin/DriverPlugin.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java index d7e1c3cb05c14..87a47fae013f2 100644 --- a/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java +++ b/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java @@ -47,8 +47,8 @@ public interface DriverPlugin { * @param sc The SparkContext loading the plugin. * @param pluginContext Additional plugin-specific about the Spark application where the plugin * is running. - * @return A map containing configuration data for the executor-side component of the plugin. - * This map will be provided to the {@link ExecutorPlugin}'s initialization method. + * @return A map that will be provided to the {@link ExecutorPlugin#init(PluginContext,Map)} + * method. */ default Map init(SparkContext sc, PluginContext pluginContext) { return Collections.emptyMap(); From e04e0eb973ece9caeccc1c42101ceb2d91c0d54e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 24 Oct 2019 10:00:50 -0700 Subject: [PATCH 11/16] More config name. --- .../org/apache/spark/internal/plugin/PluginContainer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala index cfac52e214cd1..a46a50448ee3a 100644 --- a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala +++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala @@ -119,7 +119,7 @@ private class ExecutorPluginContainer(env: SparkEnv, plugins: Seq[SparkPlugin]) object PluginContainer { - val EXTRA_CONF_PREFIX = "spark.plugins.internal." + val EXTRA_CONF_PREFIX = "spark.plugins.internal.conf." def apply(sc: SparkContext): Option[PluginContainer] = PluginContainer(Left(sc)) From 2dd8dff2e12027eada464ffba0b1daf05dfbdd23 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 24 Oct 2019 15:03:29 -0700 Subject: [PATCH 12/16] Wait until later to register driver metrics. --- .../apache/spark/api/plugin/DriverPlugin.java | 18 +++++++++++++++ .../spark/api/plugin/ExecutorPlugin.java | 4 ++++ .../spark/api/plugin/PluginContext.java | 8 ------- .../scala/org/apache/spark/SparkContext.scala | 1 + .../internal/plugin/PluginContainer.scala | 22 ++++++++++++++----- .../plugin/PluginContainerSuite.scala | 7 ++++-- 6 files changed, 45 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java index 87a47fae013f2..0c0d0df8ae682 100644 --- a/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java +++ b/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java @@ -54,6 +54,24 @@ default Map init(SparkContext sc, PluginContext pluginContext) { return Collections.emptyMap(); } + /** + * Register metrics published by the plugin with Spark's metrics system. + *

+ * This method is called later in the initialization of the Spark application, after most + * subsystems are up and the application ID is known. If there are metrics registered in + * the registry ({@link PluginContext#metricRegistry()}), then a metrics source with the + * plugin name will be created. + *

+ * Note that even though the metric registry is still accessible after this method is called, + * registering new metrics after this method is called may result in the metrics not being + * available. + * + * @param appId The application ID from the cluster manager. + * @param pluginContext Additional plugin-specific about the Spark application where the plugin + * is running. + */ + default void registerMetrics(String appId, PluginContext pluginContext) {} + /** * RPC message handler. *

diff --git a/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java index 00402d01adf25..4961308035163 100644 --- a/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java +++ b/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java @@ -36,6 +36,10 @@ public interface ExecutorPlugin { * When a Spark plugin provides an executor plugin, this method will be called during the * initialization of the executor process. It will block executor initialization until it * returns. + *

+ * Executor plugins that publish metrics should register all metrics with the context's + * registry ({@link PluginContext#metricRegistry()}) when this method is called. Metrics + * registered afterwards are not guaranteed to show up. * * @param ctx Context information for the executor where the plugin is running. * @param extraConf Extra configuration provided by the driver component during its diff --git a/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java b/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java index 1151debc87feb..b9413cf828aa1 100644 --- a/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java +++ b/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java @@ -42,14 +42,6 @@ public interface PluginContext { /** * Registry where to register metrics published by the plugin associated with this context. - *

- * Plugins should register all needed metrics in their initialization callback, otherwise - * Spark's metrics system may not properly report them. It's safe for plugins to access this - * registry later to interface with the registered metrics, but adding or removing metrics - * after initialization may not have the desired effect. - *

- * If the plugin does not register any metrics during its initialization call, a metrics - * source for the plugin will not be generated. */ MetricRegistry metricRegistry(); diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a947373ef78ae..cad88ad8aec67 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -626,6 +626,7 @@ class SparkContext(config: SparkConf) extends Logging { _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } appStatusSource.foreach(_env.metricsSystem.registerSource(_)) + _plugins.foreach(_.registerMetrics(applicationId)) // Make sure the context is stopped if the user forgets about it. This avoids leaving // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM // is killed, though. diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala index a46a50448ee3a..fc7a9d85957c0 100644 --- a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala +++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala @@ -29,13 +29,14 @@ import org.apache.spark.util.Utils sealed abstract class PluginContainer { def shutdown(): Unit + def registerMetrics(appId: String): Unit } private class DriverPluginContainer(sc: SparkContext, plugins: Seq[SparkPlugin]) extends PluginContainer with Logging { - private val driverPlugins: Seq[(String, DriverPlugin)] = plugins.flatMap { p => + private val driverPlugins: Seq[(String, DriverPlugin, PluginContextImpl)] = plugins.flatMap { p => val driverPlugin = p.driverPlugin() if (driverPlugin != null) { val name = p.getClass().getName() @@ -48,21 +49,28 @@ private class DriverPluginContainer(sc: SparkContext, plugins: Seq[SparkPlugin]) sc.conf.set(s"${PluginContainer.EXTRA_CONF_PREFIX}$name.$k", v) } } - ctx.registerMetrics() logInfo(s"Initialized driver component for plugin $name.") - Some(p.getClass().getName() -> driverPlugin) + Some((p.getClass().getName(), driverPlugin, ctx)) } else { None } } if (driverPlugins.nonEmpty) { + val pluginsByName = driverPlugins.map { case (name, plugin, _) => (name, plugin) }.toMap sc.env.rpcEnv.setupEndpoint(classOf[PluginEndpoint].getName(), - new PluginEndpoint(driverPlugins.toMap, sc.env.rpcEnv)) + new PluginEndpoint(pluginsByName, sc.env.rpcEnv)) + } + + override def registerMetrics(appId: String): Unit = { + driverPlugins.foreach { case (_, plugin, ctx) => + plugin.registerMetrics(appId, ctx) + ctx.registerMetrics() + } } override def shutdown(): Unit = { - driverPlugins.foreach { case (name, plugin) => + driverPlugins.foreach { case (name, plugin, _) => try { logDebug(s"Stopping plugin $name.") plugin.shutdown() @@ -104,6 +112,10 @@ private class ExecutorPluginContainer(env: SparkEnv, plugins: Seq[SparkPlugin]) } } + override def registerMetrics(appId: String): Unit = { + throw new IllegalStateException("Should not be called for the executor container.") + } + override def shutdown(): Unit = { executorPlugins.foreach { case (name, plugin) => try { diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala index 6548695651186..be87c8735ca4e 100644 --- a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala @@ -191,11 +191,14 @@ class TestSparkPlugin extends SparkPlugin { private class TestDriverPlugin extends DriverPlugin { override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { + TestSparkPlugin.driverContext = ctx + TestSparkPlugin.extraConf + } + + override def registerMetrics(appId: String, ctx: PluginContext): Unit = { ctx.metricRegistry().register("driverMetric", new Gauge[Int] { override def getValue(): Int = 42 }) - TestSparkPlugin.driverContext = ctx - TestSparkPlugin.extraConf } override def receive(msg: AnyRef): AnyRef = msg match { From e12eedfdff7c15de00990dcfa76202e0f9819a8c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 28 Oct 2019 10:20:16 -0700 Subject: [PATCH 13/16] Feedback. --- .../apache/spark/internal/plugin/PluginContextImpl.scala | 2 +- .../spark/internal/plugin/PluginContainerSuite.scala | 3 ++- docs/monitoring.md | 7 ++++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala index 61da7b6748aa4..279f3d388fb2e 100644 --- a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala @@ -71,7 +71,7 @@ private class PluginContextImpl( def registerMetrics(): Unit = { if (!registry.getMetrics().isEmpty()) { - val src = new PluginMetricsSource(pluginName, registry) + val src = new PluginMetricsSource(s"plugin.$pluginName", registry) metricsSystem.registerSource(src) } } diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala index be87c8735ca4e..24fa017363654 100644 --- a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala @@ -81,7 +81,8 @@ class PluginContainerSuite extends SparkFunSuite with BeforeAndAfterEach with Lo assert(TestSparkPlugin.driverContext != null) assert(TestSparkPlugin.driverContext.ask("ask") === "reply") - val metricSources = sc.env.metricsSystem.getSourcesByName(classOf[TestSparkPlugin].getName()) + val metricSources = sc.env.metricsSystem + .getSourcesByName(s"plugin.${classOf[TestSparkPlugin].getName()}") assert(metricSources.size === 2) def findMetric(name: String): Int = { diff --git a/docs/monitoring.md b/docs/monitoring.md index af4b07acbc4a7..4c186b39898ae 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -991,6 +991,11 @@ This is the component with the largest amount of instrumented metrics - namespace=JVMCPU - jvmCpuTime +- namespace=plugin. + - Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and + configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load + custom plugins into Spark. + ### Component instance = Executor These metrics are exposed by Spark executors. Note, currently they are not available when running in local mode. @@ -1060,7 +1065,7 @@ when running in local mode. - hiveClientCalls.count - sourceCodeSize (histogram) -- namespace= +- namespace=plugin. - Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load custom plugins into Spark. From 8352e28b0850aeca3bdaaabb912685dc9098be60 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 28 Oct 2019 13:53:38 -0700 Subject: [PATCH 14/16] Doc syntax. --- docs/monitoring.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/monitoring.md b/docs/monitoring.md index 4c186b39898ae..3835dd8a6a7c7 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -991,7 +991,7 @@ This is the component with the largest amount of instrumented metrics - namespace=JVMCPU - jvmCpuTime -- namespace=plugin. +- namespace=plugin.\ - Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load custom plugins into Spark. @@ -1065,7 +1065,7 @@ when running in local mode. - hiveClientCalls.count - sourceCodeSize (histogram) -- namespace=plugin. +- namespace=plugin.\ - Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load custom plugins into Spark. From 69600e6bee7382d157490f6e4deabded248ea809 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 29 Oct 2019 09:33:03 -0700 Subject: [PATCH 15/16] Add blurb about jar distribution (or lack thereof). --- docs/monitoring.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/monitoring.md b/docs/monitoring.md index 3835dd8a6a7c7..f24eee8152193 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1156,6 +1156,11 @@ applications. There are two configuration keys available for loading plugins int Both take a comma-separated list of class names that implement the org.apache.spark.api.plugin.SparkPlugin interface. The two names exist so that it's possible for one list to be placed in the Spark default config file, allowing users to -easily add other plugins from the command line without overwriting the config file's list. - -Duplicate plugins are ignored. +easily add other plugins from the command line without overwriting the config file's list. Duplicate +plugins are ignored. + +Distribution of the jar files containing the plugin code is currently not done by Spark. The user +or admin should make sure that the jar file is available to Spark applications, for example, by +including the plugin jar with the Spark distribution. The exception to this rule is the YARN +backend, where the --jars command line option (or equivalent config entry) can be +used to make the plugin code available to both executors and cluster-mode drivers. From 37ad680ec33ec6afac9d031897a549321e782d9c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 29 Oct 2019 09:43:00 -0700 Subject: [PATCH 16/16] Grammar. --- docs/monitoring.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/monitoring.md b/docs/monitoring.md index f24eee8152193..4062e16a25d34 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1160,7 +1160,7 @@ easily add other plugins from the command line without overwriting the config fi plugins are ignored. Distribution of the jar files containing the plugin code is currently not done by Spark. The user -or admin should make sure that the jar file is available to Spark applications, for example, by +or admin should make sure that the jar files are available to Spark applications, for example, by including the plugin jar with the Spark distribution. The exception to this rule is the YARN backend, where the --jars command line option (or equivalent config entry) can be used to make the plugin code available to both executors and cluster-mode drivers.