Skip to content

Commit

Permalink
[SPARK-24918][CORE] Executor Plugin API
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

A continuation of squito's executor plugin task. By his request I took his code and added testing and moved the plugin initialization to a separate thread.

Executor plugins now run on one separate thread, so the executor does not wait on them. Added testing.

## How was this patch tested?

Added test cases that test using a sample plugin.

Closes apache#22192 from NiharS/executorPlugin.

Lead-authored-by: Nihar Sheth <niharrsheth@gmail.com>
Co-authored-by: NiharS <niharrsheth@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
NiharS authored and Marcelo Vanzin committed Sep 20, 2018
1 parent a86f841 commit 2f51e72
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 0 deletions.
57 changes: 57 additions & 0 deletions core/src/main/java/org/apache/spark/ExecutorPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import org.apache.spark.annotation.DeveloperApi;

/**
* A plugin which can be automaticaly instantiated within each Spark executor. Users can specify
* plugins which should be created with the "spark.executor.plugins" configuration. An instance
* of each plugin will be created for every executor, including those created by dynamic allocation,
* before the executor starts running any tasks.
*
* The specific api exposed to the end users still considered to be very unstable. We will
* hopefully be able to keep compatability by providing default implementations for any methods
* added, but make no guarantees this will always be possible across all Spark releases.
*
* Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources
* it uses. A plugin acquires the same privileges as the user running the task. A bad plugin
* could also intefere with task execution and make the executor fail in unexpected ways.
*/
@DeveloperApi
public interface ExecutorPlugin {

/**
* Initialize the executor plugin.
*
* <p>Each executor will, during its initialization, invoke this method on each
* plugin provided in the spark.executor.plugins configuration.</p>
*
* <p>Plugins should create threads in their implementation of this method for
* any polling, blocking, or intensive computation.</p>
*/
default void init() {}

/**
* Clean up and terminate this plugin.
*
* <p>This function is called during the executor shutdown phase. The executor
* will wait for the plugin to terminate before continuing its own shutdown.</p>
*/
default void shutdown() {}
}
35 changes: 35 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,29 @@ private[spark] class Executor(
// for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too.
env.serializerManager.setDefaultClassLoader(replClassLoader)

private val executorPlugins: Seq[ExecutorPlugin] = {
val pluginNames = conf.get(EXECUTOR_PLUGINS)
if (pluginNames.nonEmpty) {
logDebug(s"Initializing the following plugins: ${pluginNames.mkString(", ")}")

// Plugins need to load using a class loader that includes the executor's user classpath
val pluginList: Seq[ExecutorPlugin] =
Utils.withContextClassLoader(replClassLoader) {
val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginNames, conf)
plugins.foreach { plugin =>
plugin.init()
logDebug(s"Successfully loaded plugin " + plugin.getClass().getCanonicalName())
}
plugins
}

logDebug("Finished initializing plugins")
pluginList
} else {
Nil
}
}

// Max size of direct result. If task result is bigger than this, we use the block manager
// to send the result back.
private val maxDirectResultSize = Math.min(
Expand Down Expand Up @@ -224,6 +247,18 @@ private[spark] class Executor(
logWarning("Unable to stop heartbeater", e)
}
threadPool.shutdown()

// Notify plugins that executor is shutting down so they can terminate cleanly
Utils.withContextClassLoader(replClassLoader) {
executorPlugins.foreach { plugin =>
try {
plugin.shutdown()
} catch {
case e: Exception =>
logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
}
}
}
if (!isLocal) {
env.stop()
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -623,4 +623,14 @@ package object config {
.intConf
.checkValue(v => v > 0, "The max failures should be a positive value.")
.createWithDefault(40)

private[spark] val EXECUTOR_PLUGINS =
ConfigBuilder("spark.executor.plugins")
.doc("Comma-separated list of class names for \"plugins\" implementing " +
"org.apache.spark.ExecutorPlugin. Plugins have the same privileges as any task " +
"in a Spark executor. They can also interfere with task execution and fail in " +
"unexpected ways. So be sure to only use this for trusted plugins.")
.stringConf
.toSequence
.createWithDefault(Nil)
}
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,19 @@ private[spark] object Utils extends Logging {
// scalastyle:on classforname
}

/**
* Run a segment of code using a different context class loader in the current thread
*/
def withContextClassLoader[T](ctxClassLoader: ClassLoader)(fn: => T): T = {
val oldClassLoader = Thread.currentThread().getContextClassLoader()
try {
Thread.currentThread().setContextClassLoader(ctxClassLoader)
fn
} finally {
Thread.currentThread().setContextClassLoader(oldClassLoader)
}
}

/**
* Primitive often used when writing [[java.nio.ByteBuffer]] to [[java.io.DataOutput]]
*/
Expand Down
139 changes: 139 additions & 0 deletions core/src/test/java/org/apache/spark/ExecutorPluginSuite.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import org.apache.spark.api.java.JavaSparkContext;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.*;

public class ExecutorPluginSuite {
private static final String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins";
private static final String testBadPluginName = TestBadShutdownPlugin.class.getName();
private static final String testPluginName = TestExecutorPlugin.class.getName();
private static final String testSecondPluginName = TestSecondPlugin.class.getName();

// Static value modified by testing plugins to ensure plugins loaded correctly.
public static int numSuccessfulPlugins = 0;

// Static value modified by testing plugins to verify plugins shut down properly.
public static int numSuccessfulTerminations = 0;

private JavaSparkContext sc;

@Before
public void setUp() {
sc = null;
numSuccessfulPlugins = 0;
numSuccessfulTerminations = 0;
}

@After
public void tearDown() {
if (sc != null) {
sc.stop();
sc = null;
}
}

private SparkConf initializeSparkConf(String pluginNames) {
return new SparkConf()
.setMaster("local")
.setAppName("test")
.set(EXECUTOR_PLUGIN_CONF_NAME, pluginNames);
}

@Test
public void testPluginClassDoesNotExist() {
SparkConf conf = initializeSparkConf("nonexistant.plugin");
try {
sc = new JavaSparkContext(conf);
fail("No exception thrown for nonexistant plugin");
} catch (Exception e) {
// We cannot catch ClassNotFoundException directly because Java doesn't think it'll be thrown
assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException"));
}
}

@Test
public void testAddPlugin() throws InterruptedException {
// Load the sample TestExecutorPlugin, which will change the value of numSuccessfulPlugins
SparkConf conf = initializeSparkConf(testPluginName);
sc = new JavaSparkContext(conf);
assertEquals(1, numSuccessfulPlugins);
sc.stop();
sc = null;
assertEquals(1, numSuccessfulTerminations);
}

@Test
public void testAddMultiplePlugins() throws InterruptedException {
// Load two plugins and verify they both execute.
SparkConf conf = initializeSparkConf(testPluginName + "," + testSecondPluginName);
sc = new JavaSparkContext(conf);
assertEquals(2, numSuccessfulPlugins);
sc.stop();
sc = null;
assertEquals(2, numSuccessfulTerminations);
}

@Test
public void testPluginShutdownWithException() {
// Verify an exception in one plugin shutdown does not affect the others
String pluginNames = testPluginName + "," + testBadPluginName + "," + testPluginName;
SparkConf conf = initializeSparkConf(pluginNames);
sc = new JavaSparkContext(conf);
assertEquals(3, numSuccessfulPlugins);
sc.stop();
sc = null;
assertEquals(2, numSuccessfulTerminations);
}

public static class TestExecutorPlugin implements ExecutorPlugin {
public void init() {
ExecutorPluginSuite.numSuccessfulPlugins++;
}

public void shutdown() {
ExecutorPluginSuite.numSuccessfulTerminations++;
}
}

public static class TestSecondPlugin implements ExecutorPlugin {
public void init() {
ExecutorPluginSuite.numSuccessfulPlugins++;
}

public void shutdown() {
ExecutorPluginSuite.numSuccessfulTerminations++;
}
}

public static class TestBadShutdownPlugin implements ExecutorPlugin {
public void init() {
ExecutorPluginSuite.numSuccessfulPlugins++;
}

public void shutdown() {
throw new RuntimeException("This plugin will fail to cleanly shut down");
}
}
}

0 comments on commit 2f51e72

Please sign in to comment.