Skip to content

Commit

Permalink
[SPARK-24918] Executor Plugin API
Browse files Browse the repository at this point in the history
This commit adds testing and moves the plugin initialization to
a separate thread.
  • Loading branch information
NiharS committed Aug 22, 2018
1 parent e59dd8f commit 44454dd
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 0 deletions.
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/spark/ExecutorPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import org.apache.spark.annotation.DeveloperApi;

/**
* A plugin which can be automaticaly instantiated within each Spark executor. Users can specify
* plugins which should be created with the "spark.executor.plugins" configuration. An instance
* of each plugin will be created for every executor, including those created by dynamic allocation,
* before the executor starts running any tasks.
*
* The specific api exposed to the end users still considered to be very unstable. We will
* *hopefully* be able to keep compatability by providing default implementations for any methods
* added, but make no guarantees this will always be possible across all spark releases.
*
* Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources
* it uses. A plugin acquires the same privileges as the user running the task. A bad plugin
* could also intefere with task execution and make the executor fail in unexpected ways.
*/
@DeveloperApi
public interface ExecutorPlugin {
}
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ private[spark] class Executor(
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)

// One thread will handle loading all of the plugins on this executor
val executorPluginThread = new Thread {
override def run: Unit = {
conf.get(EXECUTOR_PLUGINS).foreach { classes =>
Utils.loadExtensions(classOf[ExecutorPlugin], classes, conf)
}
}
}
executorPluginThread.start

// Set the classloader for serializer
env.serializer.setDefaultClassLoader(replClassLoader)
// SPARK-21928. SerializerManager's internal instance of Kryo might get used in netty threads
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -604,4 +604,15 @@ package object config {
.intConf
.checkValue(v => v > 0, "The max failures should be a positive value.")
.createWithDefault(40)

private[spark] val EXECUTOR_PLUGINS =
ConfigBuilder("spark.executor.plugins")
.internal()
.doc("Comma-separated list of class names for \"plugins\" implementing " +
"org.apache.spark.ExecutorPlugin. Plugins have the same privileges as any task " +
"in a spark executor. They can also interfere with task execution and fail in " +
"unexpected ways. So be sure to only use this for trusted plugins.")
.stringConf
.toSequence
.createOptional
}
105 changes: 105 additions & 0 deletions core/src/test/java/org/apache/spark/ExecutorPluginSuite.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.SparkConf;

import org.junit.Assert;
import org.junit.Test;

// Tests loading plugins into executors
public class ExecutorPluginSuite {
// Static value modified by testing plugin to ensure plugin loaded correctly.
public static int numSuccessfulPlugins = 0;

private SparkConf initializeSparkConf(String pluginNames) {
return new SparkConf()
.setMaster("local")
.setAppName("test")
.set("spark.executor.plugins", pluginNames);
}

@Test
public void testPluginClassDoesNotExist() {
JavaSparkContext sc = null;
SparkConf conf = initializeSparkConf("nonexistant.plugin");
try {
sc = new JavaSparkContext(conf);
} catch (Exception e) {
// We cannot catch ClassNotFoundException directly because Java doesn't think it'll be thrown
Assert.assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException"));
} finally {
if (sc != null) {
sc.stop();
sc = null;
}
}
}

@Test
public void testAddPlugin() throws InterruptedException {
JavaSparkContext sc = null;
numSuccessfulPlugins = 0;

// Load the sample TestExecutorPlugin, which will change the value of pluginExecutionSuccessful
SparkConf conf = initializeSparkConf("test.org.apache.spark.TestExecutorPlugin");

try {
sc = new JavaSparkContext(conf);
} catch (Exception e) {
Assert.fail("Failed to start SparkContext with exception " + e.toString());
}

// Wait a moment since plugins run on separate threads
Thread.sleep(500);

Assert.assertEquals(1, numSuccessfulPlugins);

if (sc != null) {
sc.stop();
sc = null;
}
}

@Test
public void testAddMultiplePlugins() throws InterruptedException {
JavaSparkContext sc = null;
numSuccessfulPlugins = 0;

// Load the sample TestExecutorPlugin twice
SparkConf conf = initializeSparkConf(
"test.org.apache.spark.TestExecutorPlugin,test.org.apache.spark.TestExecutorPlugin");

try {
sc = new JavaSparkContext(conf);
} catch (Exception e) {
Assert.fail("Failed to start SparkContext with exception " + e.toString());
}

// Wait a moment since plugins run on a separate thread
Thread.sleep(500);

Assert.assertEquals(2, numSuccessfulPlugins);

if (sc != null) {
sc.stop();
sc = null;
}
}
}
29 changes: 29 additions & 0 deletions core/src/test/java/test/org/apache/spark/TestExecutorPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package test.org.apache.spark;

import org.apache.spark.ExecutorPlugin;
import org.apache.spark.ExecutorPluginSuite;

// A test-only sample plugin, used by ExecutorPluginSuite to verify that
// plugins are correctly loaded from the spark.executor.plugins conf
public class TestExecutorPlugin implements ExecutorPlugin {
public TestExecutorPlugin() {
ExecutorPluginSuite.numSuccessfulPlugins++;
}
}

0 comments on commit 44454dd

Please sign in to comment.