Skip to content

Commit

Permalink
[SPARK-29397][CORE] Extend plugin interface to include the driver
Browse files Browse the repository at this point in the history
Spark 2.4 added the ability for executor plugins to be loaded into
Spark (see SPARK-24918). That feature intentionally skipped the
driver to keep changes small, and also because it is possible to
load code into the Spark driver using listeners + configuration.

But that is a bit awkward, because the listener interface does not
provide hooks into a lot of Spark functionality. This change reworks
the executor plugin interface to also extend to the driver.

- there's a "SparkPlugin" main interface that provides APIs to
  load driver and executor components.
- custom metric support (added in SPARK-28091) can be used by
  plugins to register metrics both in the driver process and in
  executors.
- a communication channel now exists that allows the plugin's
  executor components to send messages to the plugin's driver
  component easily, using the existing Spark RPC system.

The latter was a feature intentionally left out of the original
plugin design (also because it didn't include a driver component).

To avoid polluting the "org.apache.spark" namespace, I added the new
interfaces to the "org.apache.spark.api" package, which seems like
a better place in any case. The actual implementation is kept in
an internal package.

The change includes unit tests for the new interface and features,
but I've also been running a custom plugin that extends the new
API in real applications.

Closes #26170 from vanzin/SPARK-29397.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
Marcelo Vanzin committed Nov 4, 2019
1 parent 441d4c9 commit d51d228
Show file tree
Hide file tree
Showing 12 changed files with 899 additions and 4 deletions.
111 changes: 111 additions & 0 deletions core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.plugin;

import java.util.Collections;
import java.util.Map;

import org.apache.spark.SparkContext;
import org.apache.spark.annotation.DeveloperApi;

/**
* :: DeveloperApi ::
* Driver component of a {@link SparkPlugin}.
*
* @since 3.0.0
*/
@DeveloperApi
public interface DriverPlugin {

/**
* Initialize the plugin.
* <p>
* This method is called early in the initialization of the Spark driver. Explicitly, it is
* called before the Spark driver's task scheduler is initialized. This means that a lot
* of other Spark subsystems may yet not have been initialized. This call also blocks driver
* initialization.
* <p>
* It's recommended that plugins be careful about what operations are performed in this call,
* preferrably performing expensive operations in a separate thread, or postponing them until
* the application has fully started.
*
* @param sc The SparkContext loading the plugin.
* @param pluginContext Additional plugin-specific about the Spark application where the plugin
* is running.
* @return A map that will be provided to the {@link ExecutorPlugin#init(PluginContext,Map)}
* method.
*/
default Map<String, String> init(SparkContext sc, PluginContext pluginContext) {
return Collections.emptyMap();
}

/**
* Register metrics published by the plugin with Spark's metrics system.
* <p>
* This method is called later in the initialization of the Spark application, after most
* subsystems are up and the application ID is known. If there are metrics registered in
* the registry ({@link PluginContext#metricRegistry()}), then a metrics source with the
* plugin name will be created.
* <p>
* Note that even though the metric registry is still accessible after this method is called,
* registering new metrics after this method is called may result in the metrics not being
* available.
*
* @param appId The application ID from the cluster manager.
* @param pluginContext Additional plugin-specific about the Spark application where the plugin
* is running.
*/
default void registerMetrics(String appId, PluginContext pluginContext) {}

/**
* RPC message handler.
* <p>
* Plugins can use Spark's RPC system to send messages from executors to the driver (but not
* the other way around, currently). Messages sent by the executor component of the plugin will
* be delivered to this method, and the returned value will be sent back to the executor as
* the reply, if the executor has requested one.
* <p>
* Any exception thrown will be sent back to the executor as an error, in case it is expecting
* a reply. In case a reply is not expected, a log message will be written to the driver log.
* <p>
* The implementation of this handler should be thread-safe.
* <p>
* Note all plugins share RPC dispatch threads, and this method is called synchronously. So
* performing expensive operations in this handler may affect the operation of other active
* plugins. Internal Spark endpoints are not directly affected, though, since they use different
* threads.
* <p>
* Spark guarantees that the driver component will be ready to receive messages through this
* handler when executors are started.
*
* @param message The incoming message.
* @return Value to be returned to the caller. Ignored if the caller does not expect a reply.
*/
default Object receive(Object message) throws Exception {
throw new UnsupportedOperationException();
}

/**
* Informs the plugin that the Spark application is shutting down.
* <p>
* This method is called during the driver shutdown phase. It is recommended that plugins
* not use any Spark functions (e.g. send RPC messages) during this call.
*/
default void shutdown() {}

}
57 changes: 57 additions & 0 deletions core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.plugin;

import java.util.Map;

import org.apache.spark.annotation.DeveloperApi;

/**
* :: DeveloperApi ::
* Executor component of a {@link SparkPlugin}.
*
* @since 3.0.0
*/
@DeveloperApi
public interface ExecutorPlugin {

/**
* Initialize the executor plugin.
* <p>
* When a Spark plugin provides an executor plugin, this method will be called during the
* initialization of the executor process. It will block executor initialization until it
* returns.
* <p>
* Executor plugins that publish metrics should register all metrics with the context's
* registry ({@link PluginContext#metricRegistry()}) when this method is called. Metrics
* registered afterwards are not guaranteed to show up.
*
* @param ctx Context information for the executor where the plugin is running.
* @param extraConf Extra configuration provided by the driver component during its
* initialization.
*/
default void init(PluginContext ctx, Map<String, String> extraConf) {}

/**
* Clean up and terminate this plugin.
* <p>
* This method is called during the executor shutdown phase, and blocks executor shutdown.
*/
default void shutdown() {}

}
84 changes: 84 additions & 0 deletions core/src/main/java/org/apache/spark/api/plugin/PluginContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.plugin;

import java.io.IOException;

import com.codahale.metrics.MetricRegistry;

import org.apache.spark.SparkConf;
import org.apache.spark.annotation.DeveloperApi;

/**
* :: DeveloperApi ::
* Context information and operations for plugins loaded by Spark.
* <p>
* An instance of this class is provided to plugins in their initialization method. It is safe
* for plugins to keep a reference to the instance for later use (for example, to send messages
* to the plugin's driver component).
* <p>
* Context instances are plugin-specific, so metrics and messages are tied each plugin. It is
* not possible for a plugin to directly interact with other plugins.
*
* @since 3.0.0
*/
@DeveloperApi
public interface PluginContext {

/**
* Registry where to register metrics published by the plugin associated with this context.
*/
MetricRegistry metricRegistry();

/** Configuration of the Spark application. */
SparkConf conf();

/** Executor ID of the process. On the driver, this will identify the driver. */
String executorID();

/** The host name which is being used by the Spark process for communication. */
String hostname();

/**
* Send a message to the plugin's driver-side component.
* <p>
* This method sends a message to the driver-side component of the plugin, without expecting
* a reply. It returns as soon as the message is enqueued for sending.
* <p>
* The message must be serializable.
*
* @param message Message to be sent.
*/
void send(Object message) throws IOException;

/**
* Send an RPC to the plugin's driver-side component.
* <p>
* This method sends a message to the driver-side component of the plugin, and blocks until a
* reply arrives, or the configured RPC ask timeout (<code>spark.rpc.askTimeout</code>) elapses.
* <p>
* If the driver replies with an error, an exception with the corresponding error will be thrown.
* <p>
* The message must be serializable.
*
* @param message Message to be sent.
* @return The reply from the driver-side component.
*/
Object ask(Object message) throws Exception;

}
53 changes: 53 additions & 0 deletions core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.plugin;

import org.apache.spark.annotation.DeveloperApi;

/**
* :: DeveloperApi ::
* A plugin that can be dynamically loaded into a Spark application.
* <p>
* Plugins can be loaded by adding the plugin's class name to the appropriate Spark configuration.
* Check the Spark configuration documentation for details.
* <p>
* Plugins have two optional components: a driver-side component, of which a single instance is
* created per application, inside the Spark driver. And an executor-side component, of which one
* instance is created in each executor that is started by Spark. Details of each component can be
* found in the documentation for {@link DriverPlugin} and {@link ExecutorPlugin}.
*
* @since 3.0.0
*/
@DeveloperApi
public interface SparkPlugin {

/**
* Return the plugin's driver-side component.
*
* @return The driver-side component, or null if one is not needed.
*/
DriverPlugin driverPlugin();

/**
* Return the plugin's executor-side component.
*
* @return The executor-side component, or null if one is not needed.
*/
ExecutorPlugin executorPlugin();

}
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests._
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.plugin.PluginContainer
import org.apache.spark.io.CompressionCodec
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
Expand Down Expand Up @@ -220,6 +221,7 @@ class SparkContext(config: SparkConf) extends Logging {
private var _heartbeater: Heartbeater = _
private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _
private var _shuffleDriverComponents: ShuffleDriverComponents = _
private var _plugins: Option[PluginContainer] = None

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -539,6 +541,9 @@ class SparkContext(config: SparkConf) extends Logging {
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

// Initialize any plugins before the task scheduler is initialized.
_plugins = PluginContainer(this)

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
Expand Down Expand Up @@ -621,6 +626,7 @@ class SparkContext(config: SparkConf) extends Logging {
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
appStatusSource.foreach(_env.metricsSystem.registerSource(_))
_plugins.foreach(_.registerMetrics(applicationId))
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
Expand Down Expand Up @@ -1976,6 +1982,9 @@ class SparkContext(config: SparkConf) extends Logging {
_listenerBusStarted = false
}
}
Utils.tryLogNonFatalError {
_plugins.foreach(_.shutdown())
}
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.plugin.PluginContainer
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.rpc.RpcTimeout
Expand Down Expand Up @@ -165,6 +166,11 @@ private[spark] class Executor(
}
}

// Plugins need to load using a class loader that includes the executor's user classpath
private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(replClassLoader) {
PluginContainer(env)
}

// Max size of direct result. If task result is bigger than this, we use the block manager
// to send the result back.
private val maxDirectResultSize = Math.min(
Expand Down Expand Up @@ -297,6 +303,7 @@ private[spark] class Executor(
logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
}
}
plugins.foreach(_.shutdown())
}
if (!isLocal) {
env.stop()
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,17 @@ package object config {
s"The value must be in allowed range [1,048,576, ${MAX_BUFFER_SIZE_BYTES}].")
.createWithDefault(1024 * 1024)

private[spark] val DEFAULT_PLUGINS_LIST = "spark.plugins.defaultList"

private[spark] val PLUGINS =
ConfigBuilder("spark.plugins")
.withPrepended(DEFAULT_PLUGINS_LIST, separator = ",")
.doc("Comma-separated list of class names implementing " +
"org.apache.spark.api.plugin.SparkPlugin to load into the application.")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val EXECUTOR_PLUGINS =
ConfigBuilder("spark.executor.plugins")
.doc("Comma-separated list of class names for \"plugins\" implementing " +
Expand Down
Loading

0 comments on commit d51d228

Please sign in to comment.