-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24918][Core] Executor Plugin API #22192
Conversation
This commit adds testing and moves the plugin initialization to a separate thread.
ok to test |
Please remove all of the PR template text from the description. |
Jenkins, ok to test |
Test build #95136 has finished for PR 22192 at commit
|
Test build #95138 has finished for PR 22192 at commit
|
retest this please |
(to my understanding this issue isn't part of my change, I checked other pulls that had this error around the same time and those also had similar outputs and lack of specific errors). Also no idea if jenkins will listen to me yet |
retest this please |
Test build #95182 has finished for PR 22192 at commit
|
* before the executor starts running any tasks. | ||
* | ||
* The specific api exposed to the end users still considered to be very unstable. We will | ||
* *hopefully* be able to keep compatability by providing default implementations for any methods |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the *s around "hopefully".
* | ||
* The specific api exposed to the end users still considered to be very unstable. We will | ||
* *hopefully* be able to keep compatability by providing default implementations for any methods | ||
* added, but make no guarantees this will always be possible across all spark releases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark
} | ||
} | ||
} | ||
executorPluginThread.start |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start()
@@ -130,6 +130,16 @@ private[spark] class Executor( | |||
private val urlClassLoader = createClassLoader() | |||
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) | |||
|
|||
// One thread will handle loading all of the plugins on this executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a thread? This risks having the executor start doing things before plugins have had a chance of properly starting up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally was all in one thread, changed to separate based on this discussion: #21923 (comment)
I see the merits of both arguments, can change it based on the general consensus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgravescs thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it does depend on what the intended use is here. If we have it in the same thread it has the issue that it could block the executor or take to long and things start timing out. It can have more direct impact on the executor code itself, where as a separate thread isolates it more. But like you say if its not here and we don't wait for it then we could have order issue if certain plugins have to be initialized before other things happen. I can see both arguments as well. So perhaps the api needs an init type function that can be called more inline with a timeout to prevent from taking to long and the main part of the plugin called in a separate thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggested an init
method in another comment already, and I think that should be enough, if coupled with documentation saying that method is calling inline with the initialization of the executor, and should avoid blocking it, doing any expensive initialization asynchronously.
We could try to be fancy and add both "init()" and "initAsync" or something to make it easier for plugin writers, but that sounds like a small gain for the extra complexity in the interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside from semantics, would an init
method be necessary instead of having the initialization logic be in the plugin's constructor? Since the class loader is going to call the constructor immediately, I figure having an init
function would only really make a difference if we want to load the plugins right here, and then call init
at a later point in the executor's creation. I can't think of any particular reason why we'd want to do that, unless there's specific executor structures that we want created prior to plugin initialization (although in that case we could also just move the plugin initialization further down)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it could be in the constructor; Utils.loadExtensions
already provides a SparkConf
to the constructor if one accepts it, which was the only thing I could think of.
I generally dislike plugin APIs that encourage initialization in the constructor, but here, other than maybe potentially some benefit for testing, I'm not seeing a lot of differences in not having the init method after all...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That does make sense. While I did say "aside from semantics", semantics is a good reason to include it. Especially since it'll be harder to get plugin writers to adopt an init
function later. I'll make the other changes and make sure the tests still pass, if anyone does feel strongly (or even weakly) on one way over another I don't think there's much harm in either approach.
EDIT: one of those changes is changing back to plugin construction happening in the current thread, it sounds like that's the consensus here
return new SparkConf() | ||
.setMaster("local") | ||
.setAppName("test") | ||
.set("spark.executor.plugins", pluginNames); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use .set(CONSTANT, Seq(...))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, could you explain this for me? Do you mean instead of .set("spark.executor.plugins", pluginNames)
I should a) have a variable storing the config name and b) pass the plugin names to initializeSparkConf as a list, and have it as .set(EXECUTOR_PLUGIN_CONF_NAME, String.join(",", pluginNames)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.set(EXECUTOR_PLUGINS, Seq("class1", "class2"))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it alright to use Seq in a Java file? I couldn't find it in any other java files in Spark that aren't in a target/ directory. I don't think the compiler is getting mad when I import it but want to make sure I'm sticking with the usual style rules
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, crap, this is Java... any specific reason to do this in Java? Otherwise it's probably fine the way it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same first response. The original executor plugin code was a class in Java, so I figured I should keep my test in Java as well (manually checked beforehand, couldn't find any instances of Java stuff being primarily tested in Scala so I figured it best not to do that). Perhaps it's for the plugin-writers to get to choose their plugin language, since it's easier to extend a Java interface in Scala, than to extend a Scala trait in Java? I'll leave it as is for the time being
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I made the interface in java to make it easier for plugin-writers, since it didn't need scala at all. actually an empty trait also compiles to an empty interface in java with no references to scala, but I figured this would help remember that if the interface gets added to in the future.
I don't see any reason the test needs to be in java, but don't feel strongly either.
|
||
// Load the sample TestExecutorPlugin twice | ||
SparkConf conf = initializeSparkConf( | ||
"test.org.apache.spark.TestExecutorPlugin,test.org.apache.spark.TestExecutorPlugin"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indented too far
|
||
import org.apache.spark.api.java.JavaSparkContext; | ||
|
||
import org.junit.Assert; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally prefer import static org.junit.Assert.*;
.
@Test | ||
public void testAddPlugin() throws InterruptedException { | ||
JavaSparkContext sc = null; | ||
numSuccessfulPlugins = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd do this in a @Before
method.
|
||
// A test-only sample plugin, used by ExecutorPluginSuite to verify that | ||
// plugins are correctly loaded from the spark.executor.plugins conf | ||
public class TestExecutorPlugin implements ExecutorPlugin { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems overkill to have this in a separate class in a completely separate package. It could even be a static class in the test suite.
…ic class in main testing file
Test build #95376 has finished for PR 22192 at commit
|
Test build #95377 has finished for PR 22192 at commit
|
I don't think this test failure was caused by my changes, it failed a ML test in python due to a worker crashing. Couldn't find anything in the logs indicating that my changes led to this issue (also I don't think I changed any functionality from my previous commit, which passed all tests). Could someone retest this please? |
@@ -130,6 +130,14 @@ private[spark] class Executor( | |||
private val urlClassLoader = createClassLoader() | |||
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) | |||
|
|||
// Load plugins in the current thread, they are expected to not block. | |||
// Heavy computation in plugin initialization should be done async. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment makes more sense in the API doc, not here.
// Load plugins in the current thread, they are expected to not block. | ||
// Heavy computation in plugin initialization should be done async. | ||
Thread.currentThread().setContextClassLoader(replClassLoader) | ||
conf.get(EXECUTOR_PLUGINS).foreach { classes => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This becomes more readable if the config has a Nil
default value instead of being optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should do this in a different thread.
It will isolate the execution creation/instantiation from executor initialization.
Also, we should do this after Executor
initialization has completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Utils.loadExtensions
expects classes
to be a sequence of strings, it gets mad at me if I add .createWithDefault(Nil)
to the config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gets mad how? A default of Nil
should still create a config constant that returns a Seq[String]
, right?
(I know I added the other configs that use this and they are optional instead of having a default value, but I'm kinda wondering what is the issue you're referring to.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin If I'm understanding correctly, in the ConfigBuilder I am to change .createOptional
to .createWithDefault(Nil)
. This gives the error:
[error] /Users/nsheth/personal_fork/spark/core/src/main/scala/org/apache/spark/executor/Executor.scala:136: type mismatch; [error] found : String [error] required: Seq[String] [error] Utils.loadExtensions(classOf[ExecutorPlugin], classes, conf)
Granted I have no idea why it's able to cast a Nil to a string for this config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm We've had some back and forth on that, open to changing it if people agree it should be one way over another. Just checking, have you looked at the earlier conversation at #22192 (comment)? There's also one on the old PR for the same topic.
As for moving plugins for after executors have initialized, I see no problem with that. I don't think it should make a huge difference provided either we do change to separate thread, or we keep it in the same thread but writers respect not making the initialization blocking and computation heavy. I'll see if there's a more natural place to move it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not just changing the config. You'd need to change the call too. e.g.
Utils.loadExtensions(classOf[ExecutorPlugin], conf.get(EXECUTOR_PLUGINS), conf)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, whoops, misunderstood the value of conf.get
. That works just fine
Thread.currentThread().setContextClassLoader(replClassLoader) | ||
conf.get(EXECUTOR_PLUGINS).foreach { classes => | ||
Utils.loadExtensions(classOf[ExecutorPlugin], classes, conf) | ||
.foreach { _.init() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: .foreach(_.init())
or .foreach { p => p.init() }
private JavaSparkContext sc; | ||
|
||
private String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; | ||
private String testPluginName = "org.apache.spark.ExecutorPluginSuite$TestExecutorPlugin"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestExecutorPlugin.class.getName()
?
|
||
assertEquals(2, numSuccessfulPlugins); | ||
|
||
if (sc != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be in a finally block. Same in the other test. There's a base class in Scala that helps here, but don't remember one for Java...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Load plugins in the current thread, they are expected to not block. | ||
// Heavy computation in plugin initialization should be done async. | ||
Thread.currentThread().setContextClassLoader(replClassLoader) | ||
conf.get(EXECUTOR_PLUGINS).foreach { classes => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should do this in a different thread.
It will isolate the execution creation/instantiation from executor initialization.
Also, we should do this after Executor
initialization has completed.
// Wait a moment since plugins run on separate threads | ||
Thread.sleep(500); | ||
|
||
assertEquals(1, numSuccessfulPlugins); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After introducing an explicit start(), we can replace the sleep
with a timed wait on a condition.
I'm going to commit the fixes recommended by @vanzin soon. Then, I think the biggest remaining concern is the threads. I'm okay with adding a |
Test build #95503 has finished for PR 22192 at commit
|
*/ | ||
@DeveloperApi | ||
public interface ExecutorPlugin { | ||
default void init() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc for init
@@ -130,6 +130,11 @@ private[spark] class Executor( | |||
private val urlClassLoader = createClassLoader() | |||
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) | |||
|
|||
// Load executor plugins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be useful to have logDebug before and after these calls so if necessary we can see it trying to load plugins and then when its finished. Might help more easily find bad plugins. If others think its overkill I'm ok leaving it out though
@@ -130,6 +130,11 @@ private[spark] class Executor( | |||
private val urlClassLoader = createClassLoader() | |||
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) | |||
|
|||
// Load executor plugins | |||
Thread.currentThread().setContextClassLoader(replClassLoader) | |||
Utils.loadExtensions(classOf[ExecutorPlugin], conf.get(EXECUTOR_PLUGINS), conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a big deal but any reason we did this in between setting up the classloaders, perhaps move down a few lines just to keep all the classloader stuff together.
ok to test |
Test build #95557 has finished for PR 22192 at commit
|
Test build #95862 has finished for PR 22192 at commit
|
Logs suddenly cut off again without any exceptions, don't think this one is a code error as well. Could someone retest this please? |
retest this please |
Test build #95887 has finished for PR 22192 at commit
|
retest this please. I took a look at the failures, pretty certain its unrelated, and I filed https://issues.apache.org/jira/browse/SPARK-25400 to increase the timeouts in one of those failures. |
I feel like it should be unrelated as well. It's strange that I failed tests in the same suite twice in a row, and that no other recent build has failed that suite, but I tried running locally on a mac and a linux vm and couldn't reproduce it so...fingers crossed |
Test build #4333 has finished for PR 22192 at commit
|
retest this please. It's that old "java.lang.reflect.InvocationTargetException: null" error we've seen many times. |
Jenkins, retest this please |
Test build #95915 has finished for PR 22192 at commit
|
retest this please |
Test build #95947 has finished for PR 22192 at commit
|
1. Use ExecutorPlugin interface from apache/spark#22192 for SPARK-24918. There is no support for taskStart, onTaskFailure and onTaskCompletion, so MemoryMonitorExecutorExtension cannot support the polling for thread dumps. 2. Add support for procfs memory metrics.
Hi, just want to follow up on this and see if anyone has additional comments/questions/issues. Please feel free to ping me and I'll respond as soon as I can! |
LGTM. |
retest this please |
Test build #96212 has finished for PR 22192 at commit
|
Test build #96211 has finished for PR 22192 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too only some nits.
val pluginList: Seq[ExecutorPlugin] = | ||
Utils.withContextClassLoader(replClassLoader) { | ||
val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginNames, conf) | ||
plugins.foreach(_.init()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Might be good to log whether each plugin.init()
succeeded.
} | ||
|
||
@Test | ||
public void testAddMultiplePlugins() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: shall we test whether we can load multiple different plugins?
Test build #96276 has finished for PR 22192 at commit
|
merging to master / 2.4. |
## What changes were proposed in this pull request? A continuation of squito's executor plugin task. By his request I took his code and added testing and moved the plugin initialization to a separate thread. Executor plugins now run on one separate thread, so the executor does not wait on them. Added testing. ## How was this patch tested? Added test cases that test using a sample plugin. Closes #22192 from NiharS/executorPlugin. Lead-authored-by: Nihar Sheth <niharrsheth@gmail.com> Co-authored-by: NiharS <niharrsheth@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 2f51e72) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
why do we merge a new API to branch-2.4, when RC1 is already out? |
This was discussed way before RC1 was cut. Not adding this to 2.4 makes the timeline for this API to be out way too long in the future, and we want people to try it out. |
1. Use ExecutorPlugin interface from apache/spark#22192 for SPARK-24918. There is no support for taskStart, onTaskFailure and onTaskCompletion, so MemoryMonitorExecutorExtension cannot support the polling for thread dumps. 2. Add support for procfs memory metrics.
* before the executor starts running any tasks. | ||
* | ||
* The specific api exposed to the end users still considered to be very unstable. We will | ||
* hopefully be able to keep compatability by providing default implementations for any methods |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like DeveloperApi
already implies though.
A continuation of squito's executor plugin task. By his request I took his code and added testing and moved the plugin initialization to a separate thread. Executor plugins now run on one separate thread, so the executor does not wait on them. Added testing. Added test cases that test using a sample plugin. Closes apache#22192 from NiharS/executorPlugin. Lead-authored-by: Nihar Sheth <niharrsheth@gmail.com> Co-authored-by: NiharS <niharrsheth@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 2f51e72) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 43c62e7)
What changes were proposed in this pull request?
A continuation of @squito's executor plugin task. By his request I took his code and added testing and moved the plugin initialization to a separate thread.
Executor plugins now run on one separate thread, so the executor does not wait on them. Added testing.
How was this patch tested?
Added test cases that test using a sample plugin.