From 44454dd586e35bdf16492c4a8969494bd3b7f8f5 Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Mon, 20 Aug 2018 10:53:37 -0700 Subject: [PATCH 01/15] [SPARK-24918] Executor Plugin API This commit adds testing and moves the plugin initialization to a separate thread. --- .../java/org/apache/spark/ExecutorPlugin.java | 38 +++++++ .../org/apache/spark/executor/Executor.scala | 10 ++ .../spark/internal/config/package.scala | 11 ++ .../org/apache/spark/ExecutorPluginSuite.java | 105 ++++++++++++++++++ .../org/apache/spark/TestExecutorPlugin.java | 29 +++++ 5 files changed, 193 insertions(+) create mode 100644 core/src/main/java/org/apache/spark/ExecutorPlugin.java create mode 100644 core/src/test/java/org/apache/spark/ExecutorPluginSuite.java create mode 100644 core/src/test/java/test/org/apache/spark/TestExecutorPlugin.java diff --git a/core/src/main/java/org/apache/spark/ExecutorPlugin.java b/core/src/main/java/org/apache/spark/ExecutorPlugin.java new file mode 100644 index 0000000000000..7e99ef844d5bc --- /dev/null +++ b/core/src/main/java/org/apache/spark/ExecutorPlugin.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.annotation.DeveloperApi; + +/** + * A plugin which can be automaticaly instantiated within each Spark executor. Users can specify + * plugins which should be created with the "spark.executor.plugins" configuration. An instance + * of each plugin will be created for every executor, including those created by dynamic allocation, + * before the executor starts running any tasks. + * + * The specific api exposed to the end users still considered to be very unstable. We will + * *hopefully* be able to keep compatability by providing default implementations for any methods + * added, but make no guarantees this will always be possible across all spark releases. + * + * Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources + * it uses. A plugin acquires the same privileges as the user running the task. A bad plugin + * could also intefere with task execution and make the executor fail in unexpected ways. + */ +@DeveloperApi +public interface ExecutorPlugin { +} diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b1856ff0f3247..3efbbecb817fa 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -130,6 +130,16 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + // One thread will handle loading all of the plugins on this executor + val executorPluginThread = new Thread { + override def run: Unit = { + conf.get(EXECUTOR_PLUGINS).foreach { classes => + Utils.loadExtensions(classOf[ExecutorPlugin], classes, conf) + } + } + } + executorPluginThread.start + // Set the classloader for serializer env.serializer.setDefaultClassLoader(replClassLoader) // SPARK-21928. SerializerManager's internal instance of Kryo might get used in netty threads diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index a8aa6914ffdae..e491d607a6f65 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -604,4 +604,15 @@ package object config { .intConf .checkValue(v => v > 0, "The max failures should be a positive value.") .createWithDefault(40) + + private[spark] val EXECUTOR_PLUGINS = + ConfigBuilder("spark.executor.plugins") + .internal() + .doc("Comma-separated list of class names for \"plugins\" implementing " + + "org.apache.spark.ExecutorPlugin. Plugins have the same privileges as any task " + + "in a spark executor. They can also interfere with task execution and fail in " + + "unexpected ways. So be sure to only use this for trusted plugins.") + .stringConf + .toSequence + .createOptional } diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java new file mode 100644 index 0000000000000..65992a15ee75f --- /dev/null +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.SparkConf; + +import org.junit.Assert; +import org.junit.Test; + +// Tests loading plugins into executors +public class ExecutorPluginSuite { + // Static value modified by testing plugin to ensure plugin loaded correctly. + public static int numSuccessfulPlugins = 0; + + private SparkConf initializeSparkConf(String pluginNames) { + return new SparkConf() + .setMaster("local") + .setAppName("test") + .set("spark.executor.plugins", pluginNames); + } + + @Test + public void testPluginClassDoesNotExist() { + JavaSparkContext sc = null; + SparkConf conf = initializeSparkConf("nonexistant.plugin"); + try { + sc = new JavaSparkContext(conf); + } catch (Exception e) { + // We cannot catch ClassNotFoundException directly because Java doesn't think it'll be thrown + Assert.assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException")); + } finally { + if (sc != null) { + sc.stop(); + sc = null; + } + } + } + + @Test + public void testAddPlugin() throws InterruptedException { + JavaSparkContext sc = null; + numSuccessfulPlugins = 0; + + // Load the sample TestExecutorPlugin, which will change the value of pluginExecutionSuccessful + SparkConf conf = initializeSparkConf("test.org.apache.spark.TestExecutorPlugin"); + + try { + sc = new JavaSparkContext(conf); + } catch (Exception e) { + Assert.fail("Failed to start SparkContext with exception " + e.toString()); + } + + // Wait a moment since plugins run on separate threads + Thread.sleep(500); + + Assert.assertEquals(1, numSuccessfulPlugins); + + if (sc != null) { + sc.stop(); + sc = null; + } + } + + @Test + public void testAddMultiplePlugins() throws InterruptedException { + JavaSparkContext sc = null; + numSuccessfulPlugins = 0; + + // Load the sample TestExecutorPlugin twice + SparkConf conf = initializeSparkConf( + "test.org.apache.spark.TestExecutorPlugin,test.org.apache.spark.TestExecutorPlugin"); + + try { + sc = new JavaSparkContext(conf); + } catch (Exception e) { + Assert.fail("Failed to start SparkContext with exception " + e.toString()); + } + + // Wait a moment since plugins run on a separate thread + Thread.sleep(500); + + Assert.assertEquals(2, numSuccessfulPlugins); + + if (sc != null) { + sc.stop(); + sc = null; + } + } +} diff --git a/core/src/test/java/test/org/apache/spark/TestExecutorPlugin.java b/core/src/test/java/test/org/apache/spark/TestExecutorPlugin.java new file mode 100644 index 0000000000000..b7e9305b491ab --- /dev/null +++ b/core/src/test/java/test/org/apache/spark/TestExecutorPlugin.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark; + +import org.apache.spark.ExecutorPlugin; +import org.apache.spark.ExecutorPluginSuite; + +// A test-only sample plugin, used by ExecutorPluginSuite to verify that +// plugins are correctly loaded from the spark.executor.plugins conf +public class TestExecutorPlugin implements ExecutorPlugin { + public TestExecutorPlugin() { + ExecutorPluginSuite.numSuccessfulPlugins++; + } +} From 7c86fc54c36954f1345eccc066873f7f90832657 Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Wed, 22 Aug 2018 22:09:33 -0700 Subject: [PATCH 02/15] fixed java style issue and ran java linter --- core/src/test/java/org/apache/spark/ExecutorPluginSuite.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index 65992a15ee75f..aed3f6b914c8e 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -18,7 +18,6 @@ package org.apache.spark; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.SparkConf; import org.junit.Assert; import org.junit.Test; From 44aa63442f7f2eb33d67fa1dedb64c9374ba0de0 Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Fri, 24 Aug 2018 17:17:20 -0700 Subject: [PATCH 03/15] cleaned up tests, doc fixes, removed separate testing class, put static class in main testing file --- .../java/org/apache/spark/ExecutorPlugin.java | 5 +- .../org/apache/spark/executor/Executor.scala | 14 +++--- .../spark/internal/config/package.scala | 1 - .../org/apache/spark/ExecutorPluginSuite.java | 46 ++++++++++++------- .../org/apache/spark/TestExecutorPlugin.java | 29 ------------ 5 files changed, 38 insertions(+), 57 deletions(-) delete mode 100644 core/src/test/java/test/org/apache/spark/TestExecutorPlugin.java diff --git a/core/src/main/java/org/apache/spark/ExecutorPlugin.java b/core/src/main/java/org/apache/spark/ExecutorPlugin.java index 7e99ef844d5bc..bd93d519bb66c 100644 --- a/core/src/main/java/org/apache/spark/ExecutorPlugin.java +++ b/core/src/main/java/org/apache/spark/ExecutorPlugin.java @@ -26,8 +26,8 @@ * before the executor starts running any tasks. * * The specific api exposed to the end users still considered to be very unstable. We will - * *hopefully* be able to keep compatability by providing default implementations for any methods - * added, but make no guarantees this will always be possible across all spark releases. + * hopefully be able to keep compatability by providing default implementations for any methods + * added, but make no guarantees this will always be possible across all Spark releases. * * Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources * it uses. A plugin acquires the same privileges as the user running the task. A bad plugin @@ -35,4 +35,5 @@ */ @DeveloperApi public interface ExecutorPlugin { + default void init() {} } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 3efbbecb817fa..cee5179c2e3d8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -130,15 +130,13 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) - // One thread will handle loading all of the plugins on this executor - val executorPluginThread = new Thread { - override def run: Unit = { - conf.get(EXECUTOR_PLUGINS).foreach { classes => - Utils.loadExtensions(classOf[ExecutorPlugin], classes, conf) - } - } + // Load plugins in the current thread, they are expected to not block. + // Heavy computation in plugin initialization should be done async. + Thread.currentThread().setContextClassLoader(replClassLoader) + conf.get(EXECUTOR_PLUGINS).foreach { classes => + Utils.loadExtensions(classOf[ExecutorPlugin], classes, conf) + .foreach { _.init() } } - executorPluginThread.start // Set the classloader for serializer env.serializer.setDefaultClassLoader(replClassLoader) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e491d607a6f65..bd3b08de24a6c 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -607,7 +607,6 @@ package object config { private[spark] val EXECUTOR_PLUGINS = ConfigBuilder("spark.executor.plugins") - .internal() .doc("Comma-separated list of class names for \"plugins\" implementing " + "org.apache.spark.ExecutorPlugin. Plugins have the same privileges as any task " + "in a spark executor. They can also interfere with task execution and fail in " + diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index aed3f6b914c8e..4d0e07ba25679 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -19,30 +19,43 @@ import org.apache.spark.api.java.JavaSparkContext; -import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import scala.collection.Seq; + +import static org.junit.Assert.*; + // Tests loading plugins into executors public class ExecutorPluginSuite { // Static value modified by testing plugin to ensure plugin loaded correctly. public static int numSuccessfulPlugins = 0; + private JavaSparkContext sc; + + private String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; + private String testPluginName = "org.apache.spark.ExecutorPluginSuite$TestExecutorPlugin"; + + @Before + public void setUp() { + sc = null; + numSuccessfulPlugins = 0; + } private SparkConf initializeSparkConf(String pluginNames) { return new SparkConf() .setMaster("local") .setAppName("test") - .set("spark.executor.plugins", pluginNames); + .set(EXECUTOR_PLUGIN_CONF_NAME, pluginNames); } @Test public void testPluginClassDoesNotExist() { - JavaSparkContext sc = null; SparkConf conf = initializeSparkConf("nonexistant.plugin"); try { sc = new JavaSparkContext(conf); } catch (Exception e) { // We cannot catch ClassNotFoundException directly because Java doesn't think it'll be thrown - Assert.assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException")); + assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException")); } finally { if (sc != null) { sc.stop(); @@ -53,22 +66,19 @@ public void testPluginClassDoesNotExist() { @Test public void testAddPlugin() throws InterruptedException { - JavaSparkContext sc = null; - numSuccessfulPlugins = 0; - // Load the sample TestExecutorPlugin, which will change the value of pluginExecutionSuccessful - SparkConf conf = initializeSparkConf("test.org.apache.spark.TestExecutorPlugin"); + SparkConf conf = initializeSparkConf(testPluginName); try { sc = new JavaSparkContext(conf); } catch (Exception e) { - Assert.fail("Failed to start SparkContext with exception " + e.toString()); + fail("Failed to start SparkContext with exception " + e.toString()); } // Wait a moment since plugins run on separate threads Thread.sleep(500); - Assert.assertEquals(1, numSuccessfulPlugins); + assertEquals(1, numSuccessfulPlugins); if (sc != null) { sc.stop(); @@ -78,27 +88,29 @@ public void testAddPlugin() throws InterruptedException { @Test public void testAddMultiplePlugins() throws InterruptedException { - JavaSparkContext sc = null; - numSuccessfulPlugins = 0; - // Load the sample TestExecutorPlugin twice - SparkConf conf = initializeSparkConf( - "test.org.apache.spark.TestExecutorPlugin,test.org.apache.spark.TestExecutorPlugin"); + SparkConf conf = initializeSparkConf(testPluginName + "," + testPluginName); try { sc = new JavaSparkContext(conf); } catch (Exception e) { - Assert.fail("Failed to start SparkContext with exception " + e.toString()); + fail("Failed to start SparkContext with exception " + e.toString()); } // Wait a moment since plugins run on a separate thread Thread.sleep(500); - Assert.assertEquals(2, numSuccessfulPlugins); + assertEquals(2, numSuccessfulPlugins); if (sc != null) { sc.stop(); sc = null; } } + + public static class TestExecutorPlugin implements ExecutorPlugin { + public void init() { + ExecutorPluginSuite.numSuccessfulPlugins++; + } + } } diff --git a/core/src/test/java/test/org/apache/spark/TestExecutorPlugin.java b/core/src/test/java/test/org/apache/spark/TestExecutorPlugin.java deleted file mode 100644 index b7e9305b491ab..0000000000000 --- a/core/src/test/java/test/org/apache/spark/TestExecutorPlugin.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark; - -import org.apache.spark.ExecutorPlugin; -import org.apache.spark.ExecutorPluginSuite; - -// A test-only sample plugin, used by ExecutorPluginSuite to verify that -// plugins are correctly loaded from the spark.executor.plugins conf -public class TestExecutorPlugin implements ExecutorPlugin { - public TestExecutorPlugin() { - ExecutorPluginSuite.numSuccessfulPlugins++; - } -} From 284beafa03010ccdacf6f905914fb91f340d09b6 Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Tue, 28 Aug 2018 11:49:25 -0700 Subject: [PATCH 04/15] fixed java style issue --- core/src/test/java/org/apache/spark/ExecutorPluginSuite.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index 4d0e07ba25679..535a981403eeb 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -22,8 +22,6 @@ import org.junit.Before; import org.junit.Test; -import scala.collection.Seq; - import static org.junit.Assert.*; // Tests loading plugins into executors From 2907c6b62495f8d25c0016883202239634685fec Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Wed, 29 Aug 2018 14:38:50 -0700 Subject: [PATCH 05/15] added finally clauses to all tests, changed conf to be nil instead of optional --- .../org/apache/spark/executor/Executor.scala | 9 ++--- .../spark/internal/config/package.scala | 2 +- .../org/apache/spark/ExecutorPluginSuite.java | 36 ++++++++----------- 3 files changed, 18 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index cee5179c2e3d8..cca14c385f2a7 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -130,13 +130,10 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) - // Load plugins in the current thread, they are expected to not block. - // Heavy computation in plugin initialization should be done async. + // Load executor plugins Thread.currentThread().setContextClassLoader(replClassLoader) - conf.get(EXECUTOR_PLUGINS).foreach { classes => - Utils.loadExtensions(classOf[ExecutorPlugin], classes, conf) - .foreach { _.init() } - } + Utils.loadExtensions(classOf[ExecutorPlugin], conf.get(EXECUTOR_PLUGINS), conf) + .foreach(_.init()) // Set the classloader for serializer env.serializer.setDefaultClassLoader(replClassLoader) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index bd3b08de24a6c..2c17143dd9375 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -613,5 +613,5 @@ package object config { "unexpected ways. So be sure to only use this for trusted plugins.") .stringConf .toSequence - .createOptional + .createWithDefault(Nil) } diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index 535a981403eeb..3bd27ebf645fa 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -31,7 +31,7 @@ public class ExecutorPluginSuite { private JavaSparkContext sc; private String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; - private String testPluginName = "org.apache.spark.ExecutorPluginSuite$TestExecutorPlugin"; + private String testPluginName = TestExecutorPlugin.class.getName(); @Before public void setUp() { @@ -64,23 +64,19 @@ public void testPluginClassDoesNotExist() { @Test public void testAddPlugin() throws InterruptedException { - // Load the sample TestExecutorPlugin, which will change the value of pluginExecutionSuccessful + // Load the sample TestExecutorPlugin, which will change the value of numSuccessfulPlugins SparkConf conf = initializeSparkConf(testPluginName); try { sc = new JavaSparkContext(conf); + assertEquals(1, numSuccessfulPlugins); } catch (Exception e) { fail("Failed to start SparkContext with exception " + e.toString()); - } - - // Wait a moment since plugins run on separate threads - Thread.sleep(500); - - assertEquals(1, numSuccessfulPlugins); - - if (sc != null) { - sc.stop(); - sc = null; + } finally { + if (sc != null) { + sc.stop(); + sc = null; + } } } @@ -91,18 +87,14 @@ public void testAddMultiplePlugins() throws InterruptedException { try { sc = new JavaSparkContext(conf); + assertEquals(2, numSuccessfulPlugins); } catch (Exception e) { fail("Failed to start SparkContext with exception " + e.toString()); - } - - // Wait a moment since plugins run on a separate thread - Thread.sleep(500); - - assertEquals(2, numSuccessfulPlugins); - - if (sc != null) { - sc.stop(); - sc = null; + } finally { + if (sc != null) { + sc.stop(); + sc = null; + } } } From a968b0ad23b4c3f1a21a4153416515f5025d4fe9 Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Fri, 31 Aug 2018 10:36:25 -0700 Subject: [PATCH 06/15] added stop, logging before and after plugins, testing for stop --- .../java/org/apache/spark/ExecutorPlugin.java | 11 +++++++++++ .../org/apache/spark/executor/Executor.scala | 19 ++++++++++++++----- .../org/apache/spark/ExecutorPluginSuite.java | 8 ++++++++ 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/spark/ExecutorPlugin.java b/core/src/main/java/org/apache/spark/ExecutorPlugin.java index bd93d519bb66c..5da25480437fc 100644 --- a/core/src/main/java/org/apache/spark/ExecutorPlugin.java +++ b/core/src/main/java/org/apache/spark/ExecutorPlugin.java @@ -35,5 +35,16 @@ */ @DeveloperApi public interface ExecutorPlugin { + + /** + * Initialization method that will be called during executor startup, in the same thread as + * the executor. Plugins should override this method to add in their initialization logic. + */ default void init() {} + + /** + * Stop method, to be called when the executor is shutting down. Plugins should clean up + * their resources and prepare to terminate. + */ + default void stop() {} } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index cca14c385f2a7..10919edeef42a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -130,17 +130,25 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) - // Load executor plugins - Thread.currentThread().setContextClassLoader(replClassLoader) - Utils.loadExtensions(classOf[ExecutorPlugin], conf.get(EXECUTOR_PLUGINS), conf) - .foreach(_.init()) - // Set the classloader for serializer env.serializer.setDefaultClassLoader(replClassLoader) // SPARK-21928. SerializerManager's internal instance of Kryo might get used in netty threads // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. env.serializerManager.setDefaultClassLoader(replClassLoader) + private val pluginList = conf.get(EXECUTOR_PLUGINS) + if (pluginList != Nil) { + logDebug(s"Loading the following plugins: ${pluginList.mkString(", ")}") + + // Load executor plugins + Thread.currentThread().setContextClassLoader(replClassLoader) + private val executorPlugins = + Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, conf) + executorPlugins.foreach(_.init()) + + logDebug("Finished loading plugins") + } + // Max size of direct result. If task result is bigger than this, we use the block manager // to send the result back. private val maxDirectResultSize = Math.min( @@ -223,6 +231,7 @@ private[spark] class Executor( env.metricsSystem.report() heartbeater.shutdown() heartbeater.awaitTermination(10, TimeUnit.SECONDS) + executorPlugins.foreach(_.stop()) threadPool.shutdown() if (!isLocal) { env.stop() diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index 3bd27ebf645fa..377de45b39f62 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -28,6 +28,8 @@ public class ExecutorPluginSuite { // Static value modified by testing plugin to ensure plugin loaded correctly. public static int numSuccessfulPlugins = 0; + // Static value modified by testing plugin to verify plugins shut down properly. + public static int numSuccessfulTerminations = 0; private JavaSparkContext sc; private String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; @@ -37,6 +39,7 @@ public class ExecutorPluginSuite { public void setUp() { sc = null; numSuccessfulPlugins = 0; + numSuccessfulTerminations = 0; } private SparkConf initializeSparkConf(String pluginNames) { @@ -76,6 +79,7 @@ public void testAddPlugin() throws InterruptedException { if (sc != null) { sc.stop(); sc = null; + assertEquals(1, numSuccessfulTerminations); } } } @@ -94,6 +98,7 @@ public void testAddMultiplePlugins() throws InterruptedException { if (sc != null) { sc.stop(); sc = null; + assertEquals(2, numSuccessfulTerminations); } } } @@ -102,5 +107,8 @@ public static class TestExecutorPlugin implements ExecutorPlugin { public void init() { ExecutorPluginSuite.numSuccessfulPlugins++; } + public void stop() { + ExecutorPluginSuite.numSuccessfulTerminations++; + } } } From fa19ea880f4fc7eafb736e372479d4df2bffb74b Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Fri, 31 Aug 2018 10:48:48 -0700 Subject: [PATCH 07/15] fixed bug and scalastyle --- .../org/apache/spark/executor/Executor.scala | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 10919edeef42a..a3c5bc134d9db 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -137,17 +137,13 @@ private[spark] class Executor( env.serializerManager.setDefaultClassLoader(replClassLoader) private val pluginList = conf.get(EXECUTOR_PLUGINS) - if (pluginList != Nil) { - logDebug(s"Loading the following plugins: ${pluginList.mkString(", ")}") - - // Load executor plugins - Thread.currentThread().setContextClassLoader(replClassLoader) - private val executorPlugins = - Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, conf) - executorPlugins.foreach(_.init()) - - logDebug("Finished loading plugins") - } + if (pluginList != Nil) logDebug(s"Loading the following plugins: ${pluginList.mkString(", ")}") + // Load executor plugins + Thread.currentThread().setContextClassLoader(replClassLoader) + private val executorPlugins = + Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, conf) + executorPlugins.foreach(_.init()) + if (pluginList != Nil) logDebug("Finished loading plugins") // Max size of direct result. If task result is bigger than this, we use the block manager // to send the result back. From 9cd662292d63d4280be1c7924bcb297d9f8feeb0 Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Fri, 31 Aug 2018 15:33:46 -0700 Subject: [PATCH 08/15] style and doc fixes --- .../main/scala/org/apache/spark/executor/Executor.scala | 8 ++++++-- .../scala/org/apache/spark/internal/config/package.scala | 2 +- .../test/java/org/apache/spark/ExecutorPluginSuite.java | 7 ++++--- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a3c5bc134d9db..95987a6bfc2c7 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -137,13 +137,17 @@ private[spark] class Executor( env.serializerManager.setDefaultClassLoader(replClassLoader) private val pluginList = conf.get(EXECUTOR_PLUGINS) - if (pluginList != Nil) logDebug(s"Loading the following plugins: ${pluginList.mkString(", ")}") + if (pluginList.nonEmpty) { + logDebug(s"Loading the following plugins: ${pluginList.mkString(", ")}") + } // Load executor plugins Thread.currentThread().setContextClassLoader(replClassLoader) private val executorPlugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, conf) executorPlugins.foreach(_.init()) - if (pluginList != Nil) logDebug("Finished loading plugins") + if (pluginList.nonEmpty) { + logDebug("Finished loading plugins") + } // Max size of direct result. If task result is bigger than this, we use the block manager // to send the result back. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2c17143dd9375..abdeaf909db08 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -609,7 +609,7 @@ package object config { ConfigBuilder("spark.executor.plugins") .doc("Comma-separated list of class names for \"plugins\" implementing " + "org.apache.spark.ExecutorPlugin. Plugins have the same privileges as any task " + - "in a spark executor. They can also interfere with task execution and fail in " + + "in a Spark executor. They can also interfere with task execution and fail in " + "unexpected ways. So be sure to only use this for trusted plugins.") .stringConf .toSequence diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index 377de45b39f62..9cfb18549930e 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -26,14 +26,15 @@ // Tests loading plugins into executors public class ExecutorPluginSuite { + private final static String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; + private final static String testPluginName = TestExecutorPlugin.class.getName(); + // Static value modified by testing plugin to ensure plugin loaded correctly. public static int numSuccessfulPlugins = 0; // Static value modified by testing plugin to verify plugins shut down properly. public static int numSuccessfulTerminations = 0; - private JavaSparkContext sc; - private String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; - private String testPluginName = TestExecutorPlugin.class.getName(); + private JavaSparkContext sc; @Before public void setUp() { From 5a2852fb587278ab6a81c94de4226306a2e0da70 Mon Sep 17 00:00:00 2001 From: NiharS Date: Sat, 1 Sep 2018 18:05:39 -0700 Subject: [PATCH 09/15] doc and style fixes --- .../main/scala/org/apache/spark/executor/Executor.scala | 8 ++++++-- .../scala/org/apache/spark/internal/config/package.scala | 2 +- .../test/java/org/apache/spark/ExecutorPluginSuite.java | 8 +++++--- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a3c5bc134d9db..95987a6bfc2c7 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -137,13 +137,17 @@ private[spark] class Executor( env.serializerManager.setDefaultClassLoader(replClassLoader) private val pluginList = conf.get(EXECUTOR_PLUGINS) - if (pluginList != Nil) logDebug(s"Loading the following plugins: ${pluginList.mkString(", ")}") + if (pluginList.nonEmpty) { + logDebug(s"Loading the following plugins: ${pluginList.mkString(", ")}") + } // Load executor plugins Thread.currentThread().setContextClassLoader(replClassLoader) private val executorPlugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, conf) executorPlugins.foreach(_.init()) - if (pluginList != Nil) logDebug("Finished loading plugins") + if (pluginList.nonEmpty) { + logDebug("Finished loading plugins") + } // Max size of direct result. If task result is bigger than this, we use the block manager // to send the result back. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2c17143dd9375..abdeaf909db08 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -609,7 +609,7 @@ package object config { ConfigBuilder("spark.executor.plugins") .doc("Comma-separated list of class names for \"plugins\" implementing " + "org.apache.spark.ExecutorPlugin. Plugins have the same privileges as any task " + - "in a spark executor. They can also interfere with task execution and fail in " + + "in a Spark executor. They can also interfere with task execution and fail in " + "unexpected ways. So be sure to only use this for trusted plugins.") .stringConf .toSequence diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index 377de45b39f62..073bc8e5c2455 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -26,14 +26,16 @@ // Tests loading plugins into executors public class ExecutorPluginSuite { + private static final String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; + private static final String testPluginName = TestExecutorPlugin.class.getName(); + // Static value modified by testing plugin to ensure plugin loaded correctly. public static int numSuccessfulPlugins = 0; + // Static value modified by testing plugin to verify plugins shut down properly. public static int numSuccessfulTerminations = 0; - private JavaSparkContext sc; - private String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; - private String testPluginName = TestExecutorPlugin.class.getName(); + private JavaSparkContext sc; @Before public void setUp() { From e409466c4a5ddae96c7e022b01985c2c0df36053 Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Thu, 6 Sep 2018 11:49:23 -0700 Subject: [PATCH 10/15] plugin init to separate thread, javadoc and comment fixes --- .../java/org/apache/spark/ExecutorPlugin.java | 19 ++++++++---- .../org/apache/spark/executor/Executor.scala | 29 ++++++++++++++----- .../org/apache/spark/ExecutorPluginSuite.java | 10 ++++++- 3 files changed, 44 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/spark/ExecutorPlugin.java b/core/src/main/java/org/apache/spark/ExecutorPlugin.java index 5da25480437fc..d276a5768b960 100644 --- a/core/src/main/java/org/apache/spark/ExecutorPlugin.java +++ b/core/src/main/java/org/apache/spark/ExecutorPlugin.java @@ -37,14 +37,23 @@ public interface ExecutorPlugin { /** - * Initialization method that will be called during executor startup, in the same thread as - * the executor. Plugins should override this method to add in their initialization logic. + * Initialize the executor plugin. + * + *

Each executor will, during its initialization, invoke this method on each + * plugin provided in the spark.executor.plugins configuration. These invocations + * will occur asynchronously from the executor initialization, but synchronously + * with other plugin initialization.

+ * + *

Plugins should create threads in their implementation of this method for + * any polling, blocking, or intensive computation.

*/ default void init() {} /** - * Stop method, to be called when the executor is shutting down. Plugins should clean up - * their resources and prepare to terminate. + * Clean up and terminate this plugin. + * + *

This function is called during the executor shutdown phase. The executor + * will wait for the plugin to terminate before continuing its own shutdown.

*/ - default void stop() {} + default void shutdown() {} } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 95987a6bfc2c7..c2e79c21dadba 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -138,15 +138,27 @@ private[spark] class Executor( private val pluginList = conf.get(EXECUTOR_PLUGINS) if (pluginList.nonEmpty) { - logDebug(s"Loading the following plugins: ${pluginList.mkString(", ")}") + logDebug(s"Initializing the following plugins: ${pluginList.mkString(", ")}") } - // Load executor plugins - Thread.currentThread().setContextClassLoader(replClassLoader) - private val executorPlugins = - Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, conf) - executorPlugins.foreach(_.init()) + val executorPluginThread = new Thread { + var plugins: Seq[ExecutorPlugin] = Nil + + override def run(): Unit = { + plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, conf) + plugins.foreach(_.init()) + } + + override def interrupt(): Unit = { + plugins.foreach(_.shutdown()) + super.interrupt() + } + } + + executorPluginThread.setContextClassLoader(replClassLoader) + executorPluginThread.start() + if (pluginList.nonEmpty) { - logDebug("Finished loading plugins") + logDebug("Finished initializing plugins") } // Max size of direct result. If task result is bigger than this, we use the block manager @@ -231,7 +243,8 @@ private[spark] class Executor( env.metricsSystem.report() heartbeater.shutdown() heartbeater.awaitTermination(10, TimeUnit.SECONDS) - executorPlugins.foreach(_.stop()) + executorPluginThread.interrupt() + executorPluginThread.join() threadPool.shutdown() if (!isLocal) { env.stop() diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index 9cfb18549930e..a9230145ab64f 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -73,6 +73,10 @@ public void testAddPlugin() throws InterruptedException { try { sc = new JavaSparkContext(conf); + + // Sleep briefly because plugins initialize on a separate thread + Thread.sleep(500); + assertEquals(1, numSuccessfulPlugins); } catch (Exception e) { fail("Failed to start SparkContext with exception " + e.toString()); @@ -92,6 +96,10 @@ public void testAddMultiplePlugins() throws InterruptedException { try { sc = new JavaSparkContext(conf); + + // Sleep briefly because plugins initialize on a separate thread + Thread.sleep(500); + assertEquals(2, numSuccessfulPlugins); } catch (Exception e) { fail("Failed to start SparkContext with exception " + e.toString()); @@ -108,7 +116,7 @@ public static class TestExecutorPlugin implements ExecutorPlugin { public void init() { ExecutorPluginSuite.numSuccessfulPlugins++; } - public void stop() { + public void shutdown() { ExecutorPluginSuite.numSuccessfulTerminations++; } } From c725a536c45bc73f40cd92c782d3f4975c298517 Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Fri, 7 Sep 2018 11:16:34 -0700 Subject: [PATCH 11/15] load plugins in same thread as executor by temp changing classloader --- .../org/apache/spark/executor/Executor.scala | 32 ++++++++----------- .../org/apache/spark/ExecutorPluginSuite.java | 12 ++----- 2 files changed, 16 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 7f26cff51d4f1..9741398e424c0 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -141,22 +141,13 @@ private[spark] class Executor( logDebug(s"Initializing the following plugins: ${pluginList.mkString(", ")}") } - val executorPluginThread = new Thread { - var plugins: Seq[ExecutorPlugin] = Nil - - override def run(): Unit = { - plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, conf) - plugins.foreach(_.init()) - } - - override def interrupt(): Unit = { - plugins.foreach(_.shutdown()) - super.interrupt() - } - } - - executorPluginThread.setContextClassLoader(replClassLoader) - executorPluginThread.start() + // Before loading the executor plugins, we need to temporarily change the class loader + // to one that can discover and load jars passed into Spark using --jars + private val oldContextClassLoader = Utils.getContextOrSparkClassLoader + Thread.currentThread.setContextClassLoader(replClassLoader) + private val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, conf) + plugins.foreach(_.init()) + Thread.currentThread.setContextClassLoader(oldContextClassLoader) if (pluginList.nonEmpty) { logDebug("Finished initializing plugins") @@ -244,9 +235,14 @@ private[spark] class Executor( env.metricsSystem.report() heartbeater.shutdown() heartbeater.awaitTermination(10, TimeUnit.SECONDS) - executorPluginThread.interrupt() - executorPluginThread.join() threadPool.shutdown() + + // Notify plugins that executor is shutting down so they can terminate cleanly + // oldContextClassLoader is redefined in case the class loader changed during execution + val oldContextClassLoader = Utils.getContextOrSparkClassLoader + Thread.currentThread.setContextClassLoader(replClassLoader) + plugins.foreach(_.shutdown()) + Thread.currentThread.setContextClassLoader(oldContextClassLoader) if (!isLocal) { env.stop() } diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index 02b60aeba6339..b0f1e93287fcc 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -26,8 +26,8 @@ // Tests loading plugins into executors public class ExecutorPluginSuite { - private final static String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; - private final static String testPluginName = TestExecutorPlugin.class.getName(); + private static final String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; + private static final String testPluginName = TestExecutorPlugin.class.getName(); // Static value modified by testing plugin to ensure plugin loaded correctly. public static int numSuccessfulPlugins = 0; @@ -74,10 +74,6 @@ public void testAddPlugin() throws InterruptedException { try { sc = new JavaSparkContext(conf); - - // Sleep briefly because plugins initialize on a separate thread - Thread.sleep(500); - assertEquals(1, numSuccessfulPlugins); } catch (Exception e) { fail("Failed to start SparkContext with exception " + e.toString()); @@ -97,10 +93,6 @@ public void testAddMultiplePlugins() throws InterruptedException { try { sc = new JavaSparkContext(conf); - - // Sleep briefly because plugins initialize on a separate thread - Thread.sleep(500); - assertEquals(2, numSuccessfulPlugins); } catch (Exception e) { fail("Failed to start SparkContext with exception " + e.toString()); From 10481f043639745eb4db24636febcbe732be9921 Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Fri, 7 Sep 2018 14:40:57 -0700 Subject: [PATCH 12/15] util function for ctxclassloader, isolate plugin shutdown with test --- .../org/apache/spark/executor/Executor.scala | 46 +++++++------ .../scala/org/apache/spark/util/Utils.scala | 10 +++ .../org/apache/spark/ExecutorPluginSuite.java | 68 ++++++++++--------- 3 files changed, 74 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 9741398e424c0..d912cc0788d4f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -136,21 +136,24 @@ private[spark] class Executor( // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. env.serializerManager.setDefaultClassLoader(replClassLoader) - private val pluginList = conf.get(EXECUTOR_PLUGINS) - if (pluginList.nonEmpty) { - logDebug(s"Initializing the following plugins: ${pluginList.mkString(", ")}") - } - - // Before loading the executor plugins, we need to temporarily change the class loader - // to one that can discover and load jars passed into Spark using --jars - private val oldContextClassLoader = Utils.getContextOrSparkClassLoader - Thread.currentThread.setContextClassLoader(replClassLoader) - private val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, conf) - plugins.foreach(_.init()) - Thread.currentThread.setContextClassLoader(oldContextClassLoader) + private val plugins: Seq[ExecutorPlugin] = { + val pluginNames = conf.get(EXECUTOR_PLUGINS) + if (pluginNames.nonEmpty) { + logDebug(s"Initializing the following plugins: ${pluginNames.mkString(", ")}") + + // Before loading the executor plugins, we need to temporarily change the class loader + // to one that can discover and load jars passed into Spark using --jars + var pluginList: Seq[ExecutorPlugin] = Nil + Utils.withContextClassLoader(replClassLoader) { + pluginList = Utils.loadExtensions(classOf[ExecutorPlugin], pluginNames, conf) + pluginList.foreach(_.init()) + } - if (pluginList.nonEmpty) { - logDebug("Finished initializing plugins") + logDebug("Finished initializing plugins") + pluginList + } else { + Nil + } } // Max size of direct result. If task result is bigger than this, we use the block manager @@ -238,11 +241,16 @@ private[spark] class Executor( threadPool.shutdown() // Notify plugins that executor is shutting down so they can terminate cleanly - // oldContextClassLoader is redefined in case the class loader changed during execution - val oldContextClassLoader = Utils.getContextOrSparkClassLoader - Thread.currentThread.setContextClassLoader(replClassLoader) - plugins.foreach(_.shutdown()) - Thread.currentThread.setContextClassLoader(oldContextClassLoader) + Utils.withContextClassLoader(replClassLoader) { + plugins.foreach(plugin => { + try { + plugin.shutdown() + } catch { + case e: Exception => + logWarning(s"Plugin ${plugin.getClass} failed to shutdown: ${e.getMessage()}") + } + }) + } if (!isLocal) { env.stop() } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7ec707d94ed87..783e82ae9b511 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -240,6 +240,16 @@ private[spark] object Utils extends Logging { // scalastyle:on classforname } + /** + * Run a segment of code using a different context class loader in the current thread + */ + def withContextClassLoader(ctxClassLoader: ClassLoader)(fn: => Unit): Unit = { + val oldClassLoader = getContextOrSparkClassLoader + Thread.currentThread().setContextClassLoader(ctxClassLoader) + fn + Thread.currentThread().setContextClassLoader(oldClassLoader) + } + /** * Primitive often used when writing [[java.nio.ByteBuffer]] to [[java.io.DataOutput]] */ diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index b0f1e93287fcc..ca33f4bbae30f 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -19,14 +19,15 @@ import org.apache.spark.api.java.JavaSparkContext; +import org.junit.After; import org.junit.Before; import org.junit.Test; import static org.junit.Assert.*; -// Tests loading plugins into executors public class ExecutorPluginSuite { private static final String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; + private static final String testBadPluginName = TestBadShutdownPlugin.class.getName(); private static final String testPluginName = TestExecutorPlugin.class.getName(); // Static value modified by testing plugin to ensure plugin loaded correctly. @@ -44,6 +45,14 @@ public void setUp() { numSuccessfulTerminations = 0; } + @After + public void tearDown() { + if (sc != null) { + sc.stop(); + sc = null; + } + } + private SparkConf initializeSparkConf(String pluginNames) { return new SparkConf() .setMaster("local") @@ -59,11 +68,6 @@ public void testPluginClassDoesNotExist() { } catch (Exception e) { // We cannot catch ClassNotFoundException directly because Java doesn't think it'll be thrown assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException")); - } finally { - if (sc != null) { - sc.stop(); - sc = null; - } } } @@ -71,38 +75,34 @@ public void testPluginClassDoesNotExist() { public void testAddPlugin() throws InterruptedException { // Load the sample TestExecutorPlugin, which will change the value of numSuccessfulPlugins SparkConf conf = initializeSparkConf(testPluginName); - - try { - sc = new JavaSparkContext(conf); - assertEquals(1, numSuccessfulPlugins); - } catch (Exception e) { - fail("Failed to start SparkContext with exception " + e.toString()); - } finally { - if (sc != null) { - sc.stop(); - sc = null; - assertEquals(1, numSuccessfulTerminations); - } - } + sc = new JavaSparkContext(conf); + assertEquals(1, numSuccessfulPlugins); + sc.stop(); + sc = null; + assertEquals(1, numSuccessfulTerminations); } @Test public void testAddMultiplePlugins() throws InterruptedException { // Load the sample TestExecutorPlugin twice SparkConf conf = initializeSparkConf(testPluginName + "," + testPluginName); + sc = new JavaSparkContext(conf); + assertEquals(2, numSuccessfulPlugins); + sc.stop(); + sc = null; + assertEquals(2, numSuccessfulTerminations); + } - try { - sc = new JavaSparkContext(conf); - assertEquals(2, numSuccessfulPlugins); - } catch (Exception e) { - fail("Failed to start SparkContext with exception " + e.toString()); - } finally { - if (sc != null) { - sc.stop(); - sc = null; - assertEquals(2, numSuccessfulTerminations); - } - } + @Test + public void testPluginShutdownWithException() { + // Verify an exception in one plugin shutdown does not affect the others + String pluginNames = testPluginName + "," + testBadPluginName + "," + testPluginName; + SparkConf conf = initializeSparkConf(pluginNames); + sc = new JavaSparkContext(conf); + assertEquals(2, numSuccessfulPlugins); + sc.stop(); + sc = null; + assertEquals(2, numSuccessfulTerminations); } public static class TestExecutorPlugin implements ExecutorPlugin { @@ -113,4 +113,10 @@ public void shutdown() { ExecutorPluginSuite.numSuccessfulTerminations++; } } + + public static class TestBadShutdownPlugin implements ExecutorPlugin { + public void shutdown() { + throw new RuntimeException("This plugin will fail to cleanly shut down"); + } + } } From cf30ed52c0aefc1eda8efffc11e73cafc93a032d Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Fri, 7 Sep 2018 17:26:10 -0700 Subject: [PATCH 13/15] Generalized withCtxClassLoader, comment updates, style fixes --- .../java/org/apache/spark/ExecutorPlugin.java | 4 +-- .../org/apache/spark/executor/Executor.scala | 25 +++++++++++-------- .../scala/org/apache/spark/util/Utils.scala | 11 +++++--- .../org/apache/spark/ExecutorPluginSuite.java | 9 ++++++- 4 files changed, 30 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/apache/spark/ExecutorPlugin.java b/core/src/main/java/org/apache/spark/ExecutorPlugin.java index d276a5768b960..ec0b57f1a2819 100644 --- a/core/src/main/java/org/apache/spark/ExecutorPlugin.java +++ b/core/src/main/java/org/apache/spark/ExecutorPlugin.java @@ -40,9 +40,7 @@ public interface ExecutorPlugin { * Initialize the executor plugin. * *

Each executor will, during its initialization, invoke this method on each - * plugin provided in the spark.executor.plugins configuration. These invocations - * will occur asynchronously from the executor initialization, but synchronously - * with other plugin initialization.

+ * plugin provided in the spark.executor.plugins configuration.

* *

Plugins should create threads in their implementation of this method for * any polling, blocking, or intensive computation.

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d912cc0788d4f..58df724fdbe26 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -136,18 +136,18 @@ private[spark] class Executor( // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. env.serializerManager.setDefaultClassLoader(replClassLoader) - private val plugins: Seq[ExecutorPlugin] = { + private val executorPlugins: Seq[ExecutorPlugin] = { val pluginNames = conf.get(EXECUTOR_PLUGINS) if (pluginNames.nonEmpty) { logDebug(s"Initializing the following plugins: ${pluginNames.mkString(", ")}") - // Before loading the executor plugins, we need to temporarily change the class loader - // to one that can discover and load jars passed into Spark using --jars - var pluginList: Seq[ExecutorPlugin] = Nil - Utils.withContextClassLoader(replClassLoader) { - pluginList = Utils.loadExtensions(classOf[ExecutorPlugin], pluginNames, conf) - pluginList.foreach(_.init()) - } + // Plugins need to load using a class loader that includes the executor's user classpath + val pluginList: Seq[ExecutorPlugin] = + Utils.withContextClassLoader(replClassLoader) { + val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginNames, conf) + plugins.foreach(_.init()) + plugins + } logDebug("Finished initializing plugins") pluginList @@ -242,14 +242,17 @@ private[spark] class Executor( // Notify plugins that executor is shutting down so they can terminate cleanly Utils.withContextClassLoader(replClassLoader) { - plugins.foreach(plugin => { + executorPlugins.foreach { plugin => try { plugin.shutdown() } catch { case e: Exception => - logWarning(s"Plugin ${plugin.getClass} failed to shutdown: ${e.getMessage()}") + logWarning( + s"""Plugin ${plugin.getClass().getCanonicalName()} failed to shutdown: + |${e.toString} + |${e.getStackTrace().mkString("\n")}""".stripMargin) } - }) + } } if (!isLocal) { env.stop() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 783e82ae9b511..703ea1af84750 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -243,11 +243,14 @@ private[spark] object Utils extends Logging { /** * Run a segment of code using a different context class loader in the current thread */ - def withContextClassLoader(ctxClassLoader: ClassLoader)(fn: => Unit): Unit = { + def withContextClassLoader[T](ctxClassLoader: ClassLoader)(fn: => T): T = { val oldClassLoader = getContextOrSparkClassLoader - Thread.currentThread().setContextClassLoader(ctxClassLoader) - fn - Thread.currentThread().setContextClassLoader(oldClassLoader) + try { + Thread.currentThread().setContextClassLoader(ctxClassLoader) + fn + } finally { + Thread.currentThread().setContextClassLoader(oldClassLoader) + } } /** diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index ca33f4bbae30f..72de0ed157577 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -68,7 +68,9 @@ public void testPluginClassDoesNotExist() { } catch (Exception e) { // We cannot catch ClassNotFoundException directly because Java doesn't think it'll be thrown assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException")); + return; } + fail("No exception thrown for nonexistant plugin"); } @Test @@ -99,7 +101,7 @@ public void testPluginShutdownWithException() { String pluginNames = testPluginName + "," + testBadPluginName + "," + testPluginName; SparkConf conf = initializeSparkConf(pluginNames); sc = new JavaSparkContext(conf); - assertEquals(2, numSuccessfulPlugins); + assertEquals(3, numSuccessfulPlugins); sc.stop(); sc = null; assertEquals(2, numSuccessfulTerminations); @@ -109,12 +111,17 @@ public static class TestExecutorPlugin implements ExecutorPlugin { public void init() { ExecutorPluginSuite.numSuccessfulPlugins++; } + public void shutdown() { ExecutorPluginSuite.numSuccessfulTerminations++; } } public static class TestBadShutdownPlugin implements ExecutorPlugin { + public void init() { + ExecutorPluginSuite.numSuccessfulPlugins++; + } + public void shutdown() { throw new RuntimeException("This plugin will fail to cleanly shut down"); } From 447c5e5974ca2a176026e63518a7a6cf29b78008 Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Sun, 9 Sep 2018 23:47:53 -0700 Subject: [PATCH 14/15] changed util function to classloader only, fixed test style and error msg --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 5 +---- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- core/src/test/java/org/apache/spark/ExecutorPluginSuite.java | 3 +-- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 58df724fdbe26..f2ce4f360a376 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -247,10 +247,7 @@ private[spark] class Executor( plugin.shutdown() } catch { case e: Exception => - logWarning( - s"""Plugin ${plugin.getClass().getCanonicalName()} failed to shutdown: - |${e.toString} - |${e.getStackTrace().mkString("\n")}""".stripMargin) + logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e) } } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 703ea1af84750..f6062d2732f98 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -244,7 +244,7 @@ private[spark] object Utils extends Logging { * Run a segment of code using a different context class loader in the current thread */ def withContextClassLoader[T](ctxClassLoader: ClassLoader)(fn: => T): T = { - val oldClassLoader = getContextOrSparkClassLoader + val oldClassLoader = Thread.currentThread().getContextClassLoader() try { Thread.currentThread().setContextClassLoader(ctxClassLoader) fn diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index 72de0ed157577..791b47ed52dba 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -65,12 +65,11 @@ public void testPluginClassDoesNotExist() { SparkConf conf = initializeSparkConf("nonexistant.plugin"); try { sc = new JavaSparkContext(conf); + fail("No exception thrown for nonexistant plugin"); } catch (Exception e) { // We cannot catch ClassNotFoundException directly because Java doesn't think it'll be thrown assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException")); - return; } - fail("No exception thrown for nonexistant plugin"); } @Test From f853fe5fd17a8bbed367ba93c3cddcb7e0e535f3 Mon Sep 17 00:00:00 2001 From: Nihar Sheth Date: Wed, 19 Sep 2018 11:18:57 -0700 Subject: [PATCH 15/15] add debug msg on successful plugin load, multiple plugins in test --- .../org/apache/spark/executor/Executor.scala | 5 ++++- .../org/apache/spark/ExecutorPluginSuite.java | 19 +++++++++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index f2ce4f360a376..e7dd07b875652 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -145,7 +145,10 @@ private[spark] class Executor( val pluginList: Seq[ExecutorPlugin] = Utils.withContextClassLoader(replClassLoader) { val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginNames, conf) - plugins.foreach(_.init()) + plugins.foreach { plugin => + plugin.init() + logDebug(s"Successfully loaded plugin " + plugin.getClass().getCanonicalName()) + } plugins } diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java index 791b47ed52dba..686eb28010c6a 100644 --- a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java +++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java @@ -29,11 +29,12 @@ public class ExecutorPluginSuite { private static final String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; private static final String testBadPluginName = TestBadShutdownPlugin.class.getName(); private static final String testPluginName = TestExecutorPlugin.class.getName(); + private static final String testSecondPluginName = TestSecondPlugin.class.getName(); - // Static value modified by testing plugin to ensure plugin loaded correctly. + // Static value modified by testing plugins to ensure plugins loaded correctly. public static int numSuccessfulPlugins = 0; - // Static value modified by testing plugin to verify plugins shut down properly. + // Static value modified by testing plugins to verify plugins shut down properly. public static int numSuccessfulTerminations = 0; private JavaSparkContext sc; @@ -85,8 +86,8 @@ public void testAddPlugin() throws InterruptedException { @Test public void testAddMultiplePlugins() throws InterruptedException { - // Load the sample TestExecutorPlugin twice - SparkConf conf = initializeSparkConf(testPluginName + "," + testPluginName); + // Load two plugins and verify they both execute. + SparkConf conf = initializeSparkConf(testPluginName + "," + testSecondPluginName); sc = new JavaSparkContext(conf); assertEquals(2, numSuccessfulPlugins); sc.stop(); @@ -116,6 +117,16 @@ public void shutdown() { } } + public static class TestSecondPlugin implements ExecutorPlugin { + public void init() { + ExecutorPluginSuite.numSuccessfulPlugins++; + } + + public void shutdown() { + ExecutorPluginSuite.numSuccessfulTerminations++; + } + } + public static class TestBadShutdownPlugin implements ExecutorPlugin { public void init() { ExecutorPluginSuite.numSuccessfulPlugins++;