-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24918][Core] Executor Plugin API #22192
Closed
Closed
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
44454dd
[SPARK-24918] Executor Plugin API
NiharS 7c86fc5
fixed java style issue and ran java linter
NiharS 44aa634
cleaned up tests, doc fixes, removed separate testing class, put stat…
NiharS 284beaf
fixed java style issue
NiharS 2907c6b
added finally clauses to all tests, changed conf to be nil instead of…
NiharS a968b0a
added stop, logging before and after plugins, testing for stop
NiharS fa19ea8
fixed bug and scalastyle
NiharS 9cd6622
style and doc fixes
NiharS 5a2852f
doc and style fixes
NiharS e409466
plugin init to separate thread, javadoc and comment fixes
NiharS b0be9e5
fix merge issues
NiharS c725a53
load plugins in same thread as executor by temp changing classloader
NiharS 10481f0
util function for ctxclassloader, isolate plugin shutdown with test
NiharS cf30ed5
Generalized withCtxClassLoader, comment updates, style fixes
NiharS 447c5e5
changed util function to classloader only, fixed test style and error…
NiharS f853fe5
add debug msg on successful plugin load, multiple plugins in test
NiharS File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark; | ||
|
||
import org.apache.spark.annotation.DeveloperApi; | ||
|
||
/** | ||
* A plugin which can be automaticaly instantiated within each Spark executor. Users can specify | ||
* plugins which should be created with the "spark.executor.plugins" configuration. An instance | ||
* of each plugin will be created for every executor, including those created by dynamic allocation, | ||
* before the executor starts running any tasks. | ||
* | ||
* The specific api exposed to the end users still considered to be very unstable. We will | ||
* hopefully be able to keep compatability by providing default implementations for any methods | ||
* added, but make no guarantees this will always be possible across all Spark releases. | ||
* | ||
* Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources | ||
* it uses. A plugin acquires the same privileges as the user running the task. A bad plugin | ||
* could also intefere with task execution and make the executor fail in unexpected ways. | ||
*/ | ||
@DeveloperApi | ||
public interface ExecutorPlugin { | ||
vanzin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/** | ||
* Initialize the executor plugin. | ||
* | ||
* <p>Each executor will, during its initialization, invoke this method on each | ||
* plugin provided in the spark.executor.plugins configuration.</p> | ||
* | ||
* <p>Plugins should create threads in their implementation of this method for | ||
* any polling, blocking, or intensive computation.</p> | ||
*/ | ||
default void init() {} | ||
|
||
/** | ||
* Clean up and terminate this plugin. | ||
* | ||
* <p>This function is called during the executor shutdown phase. The executor | ||
* will wait for the plugin to terminate before continuing its own shutdown.</p> | ||
*/ | ||
default void shutdown() {} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
139 changes: 139 additions & 0 deletions
139
core/src/test/java/org/apache/spark/ExecutorPluginSuite.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark; | ||
|
||
import org.apache.spark.api.java.JavaSparkContext; | ||
|
||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
||
import static org.junit.Assert.*; | ||
|
||
public class ExecutorPluginSuite { | ||
private static final String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; | ||
private static final String testBadPluginName = TestBadShutdownPlugin.class.getName(); | ||
private static final String testPluginName = TestExecutorPlugin.class.getName(); | ||
private static final String testSecondPluginName = TestSecondPlugin.class.getName(); | ||
|
||
// Static value modified by testing plugins to ensure plugins loaded correctly. | ||
public static int numSuccessfulPlugins = 0; | ||
|
||
// Static value modified by testing plugins to verify plugins shut down properly. | ||
public static int numSuccessfulTerminations = 0; | ||
|
||
private JavaSparkContext sc; | ||
|
||
@Before | ||
public void setUp() { | ||
sc = null; | ||
numSuccessfulPlugins = 0; | ||
numSuccessfulTerminations = 0; | ||
} | ||
|
||
@After | ||
public void tearDown() { | ||
if (sc != null) { | ||
sc.stop(); | ||
sc = null; | ||
} | ||
} | ||
|
||
private SparkConf initializeSparkConf(String pluginNames) { | ||
return new SparkConf() | ||
.setMaster("local") | ||
.setAppName("test") | ||
.set(EXECUTOR_PLUGIN_CONF_NAME, pluginNames); | ||
} | ||
|
||
@Test | ||
public void testPluginClassDoesNotExist() { | ||
SparkConf conf = initializeSparkConf("nonexistant.plugin"); | ||
try { | ||
sc = new JavaSparkContext(conf); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should fail the test if an exception is not thrown here. |
||
fail("No exception thrown for nonexistant plugin"); | ||
} catch (Exception e) { | ||
// We cannot catch ClassNotFoundException directly because Java doesn't think it'll be thrown | ||
assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException")); | ||
} | ||
} | ||
|
||
@Test | ||
public void testAddPlugin() throws InterruptedException { | ||
// Load the sample TestExecutorPlugin, which will change the value of numSuccessfulPlugins | ||
SparkConf conf = initializeSparkConf(testPluginName); | ||
sc = new JavaSparkContext(conf); | ||
assertEquals(1, numSuccessfulPlugins); | ||
sc.stop(); | ||
sc = null; | ||
assertEquals(1, numSuccessfulTerminations); | ||
} | ||
|
||
@Test | ||
public void testAddMultiplePlugins() throws InterruptedException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. super nit: shall we test whether we can load multiple different plugins? |
||
// Load two plugins and verify they both execute. | ||
SparkConf conf = initializeSparkConf(testPluginName + "," + testSecondPluginName); | ||
sc = new JavaSparkContext(conf); | ||
assertEquals(2, numSuccessfulPlugins); | ||
sc.stop(); | ||
sc = null; | ||
assertEquals(2, numSuccessfulTerminations); | ||
} | ||
|
||
@Test | ||
public void testPluginShutdownWithException() { | ||
// Verify an exception in one plugin shutdown does not affect the others | ||
String pluginNames = testPluginName + "," + testBadPluginName + "," + testPluginName; | ||
SparkConf conf = initializeSparkConf(pluginNames); | ||
sc = new JavaSparkContext(conf); | ||
assertEquals(3, numSuccessfulPlugins); | ||
sc.stop(); | ||
sc = null; | ||
assertEquals(2, numSuccessfulTerminations); | ||
} | ||
|
||
public static class TestExecutorPlugin implements ExecutorPlugin { | ||
public void init() { | ||
ExecutorPluginSuite.numSuccessfulPlugins++; | ||
} | ||
|
||
public void shutdown() { | ||
ExecutorPluginSuite.numSuccessfulTerminations++; | ||
} | ||
} | ||
|
||
public static class TestSecondPlugin implements ExecutorPlugin { | ||
public void init() { | ||
ExecutorPluginSuite.numSuccessfulPlugins++; | ||
} | ||
|
||
public void shutdown() { | ||
ExecutorPluginSuite.numSuccessfulTerminations++; | ||
} | ||
} | ||
|
||
public static class TestBadShutdownPlugin implements ExecutorPlugin { | ||
public void init() { | ||
ExecutorPluginSuite.numSuccessfulPlugins++; | ||
} | ||
|
||
public void shutdown() { | ||
throw new RuntimeException("This plugin will fail to cleanly shut down"); | ||
} | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like
DeveloperApi
already implies though.