-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29397][core] Extend plugin interface to include the driver. #26170
Conversation
Spark 2.4 added the ability for executor plugins to be loaded into Spark (see SPARK-24918). That feature intentionally skipped the driver to keep changes small, and also because it is possible to load code into the Spark driver using listeners + configuration. But that is a bit awkward, because the listener interface does not provide hooks into a lot of Spark functionality. This change reworks the executor plugin interface to also extend to the driver. - there's a "SparkPlugin" main interface that provides APIs to load driver and executor components. - custom metric support (added in SPARK-28091) can be used by plugins to register metrics both in the driver process and in executors. - a communication channel now exists that allows the plugin's executor components to send messages to the plugin's driver component easily, using the existing Spark RPC system. The latter was a feature intentionally left out of the original plugin design (also because it didn't include a driver component). To avoid polluting the "org.apache.spark" namespace, I added the new interfaces to the "org.apache.spark.api" package, which seems like a better place in any case. The actual implementation is kept in an internal package. The change includes unit tests for the new interface and features, but I've also been running a custom plugin that extends the new API in real applications.
Test build #112298 has finished for PR 26170 at commit
|
Test build #112302 has finished for PR 26170 at commit
|
Test build #112307 has finished for PR 26170 at commit
|
Test build #112322 has finished for PR 26170 at commit
|
I have run a quick test of this in local mode with a basic plugin. However it does not seem to work OK for me when I try it on a YARN test cluster. |
* Initialize the plugin. | ||
* <p> | ||
* This method is called early in the initialization of the Spark driver. Explicitly, it is | ||
* called before the application is submitted to the cluster manager. This means that a lot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before it is submitted? I assume that is client mode and not yarn cluster mode for instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change; "before the task scheduler is initialized".
* @param sc The SparkContext loading the plugin. | ||
* @param pluginContext Additional plugin-specific about the Spark application where the plugin | ||
* is running. | ||
* @return A map containing configuration data for the executor-side component of the plugin. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its not clear to me what is returned? do these need to be spark confs? I'm guessing not but would be good to clarify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you suggest something? It already says this exact map is provided to the executor plugin's init method. I don't know how can I be clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps just say the configuration keys are user defined. Are there any other formatting restrictions? like they shouldn't use spark. or special characters, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like trying to explain more just makes it more confusing.
You return a map. The map magically appears as an argument to the executor side's init method, with the exact contents you returned. Simple. Whatever you can put in that map will show up on the other side.
* Plugins can be loaded by adding the plugin's class name to the appropriate Spark configuration. | ||
* Check the Spark configuration documentation for details. | ||
* <p> | ||
* Plugins have two components: a driver-side component, of which a single instance is created |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps add in two optional components since I think you can run one without the other
@@ -539,6 +541,9 @@ class SparkContext(config: SparkConf) extends Logging { | |||
_heartbeatReceiver = env.rpcEnv.setupEndpoint( | |||
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) | |||
|
|||
// Initialize any plugins before the app is initialized in the cluster manager. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to comment above, this comment is a bit confusing to me perhaps we can clarify
|
||
private[spark] val PLUGINS = | ||
ConfigBuilder("spark.plugins") | ||
.withPrepended(STATIC_PLUGINS_LIST, separator = ",") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its to bad we don't have documentation on withPrepended as it was not clear what it did. My initial thought was it was something to do with the actual config name. After investigating more and reading the code figured it out, but I guess that is a separate issue.
It seems to me we have a couple of these, this one named .static., the java options with defaultJavaOptions. It might be nice to keep a consistent naming theme if we are going to start supporting having cluster level ones and then user level ones and they aren't overrides, they are prepends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change to something but I've always disliked the "default" name. Since "default" means it can be overridden, and that's not the goal here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thats fine, I agree with the "default" name, I was just thinking if we can come up with standard naming it would be nice.
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extra newline
|
||
object PluginContainer { | ||
|
||
val EXTRA_CONF_PREFIX = "spark.plugins.__conf__." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of conf should we use like .internal. I assume these aren't meant for users to set directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've always used the underscores to mean "internal" since I started programming in C... but sure.
* Plugins can use Spark's RPC system to send messages from executors to the driver (but not | ||
* the other way around, currently). Messages sent by the executor component of the plugin will | ||
* be delivered to this method, and the returned value will be sent back to the executor as | ||
* the reply, if the executor has requested one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this function know if executor requested reply? I assume its up to them to infer from message type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's up to the plugin code. I'm trying to avoid exposing two methods to handle RPC messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
override def shutdown(): Unit = { | ||
driverPlugins.foreach { case (name, plugin) => | ||
try { | ||
plugin.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add debug message like executors have
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird I have that message locally. Not sure how it's not here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Err looked in the wrong spot.
Can you be a little more descriptive about what that means? I've been testing on k8s and it was working fine. |
I added a non-local unit test which passed without any modifications to the code; I also tried on yarn client mode with a dummy plugin, all seems fine. Don't see why anything would change in yarn cluster mode. |
Test build #112551 has finished for PR 26170 at commit
|
|
||
object PluginContainer { | ||
|
||
val EXTRA_CONF_PREFIX = "spark.plugins.__internal_conf__." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I was thinking just spark.plugins.internal.conf. We have the internal() option to config builder so figured it kind of matched. I don't have a super strong opinion on this as long as we try to keep it consistent. I know we use xxx for various internal things - files directories - but didn't think we had any for configs. thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The internal()
thing does not modify the config names. Nor does it have any other effect aside from being informational (except for SQL configs, which are hidden from the "set -v" output).
So does it really matter what this name is? It's internal, it's not supposed to be set or read by anything other than internal Spark code, and people who end up seeing them should just ignore them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get that that internal() doesn't mean anything to the config name, maybe a bad comparison, its more of keeping naming consistent. To me its a lot more obvious if a config has .internal. in its name that its internal to spark. Users should ignore those. that is why I suggested it. If its .internal. I could also programmatically "grep" for all internal configs fairly easily. Not sure why I would want to do this, other than maybe hide them from user.
All the other spark configs either follow format x.y.z with the last one optionally camel case, so why not keep that consistent instead of breaking that convention with the something format. I know our internal configs now have no special name on them, which personally I don't like either as its not obvious its meant to be internal. The only benefit to that is you can easily change to not be internal if you want without changing the name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, if you want that, I don't care about the name. I'm just pointing out that there isn't a pattern to follow here.
Test build #112555 has finished for PR 26170 at commit
|
I have further investigated the issues I find when running on YARN. What I am trying to do is to add plugins packaged in a jar, using --jars to ship them to the executors. When running on a YARN cluster, the executors are not able to find the plugin class in my custom jar. |
Ah, I see that. I'll do the same for the new code. |
Test build #112613 has finished for PR 26170 at commit
|
I have just tested the latest updates and it works OK now, thanks. |
That's an issue with every cluster manager backend in Spark, except for YARN. Distribution of the plugins is up to the user / admin / etc in those cases. In the case of k8s you could use custom docker images. Not sure if recent code added to stage deps in a shared file system could help. |
ctx.registerMetrics() | ||
|
||
logInfo(s"Initialized executor component for plugin $name.") | ||
Some(p.getClass().getName() -> executorPlugin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the current implementation of executor.plugins we use plugin.getClass().getSimpleName() instead of getName. The advantage of getSimpleName is that it is more compact, it does not have "." characters, so it is easy to process when handling metrics data + when using getName, we will have long names and they will be repated for all emitted values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was intentional. The full name has a much higher chance of being unique. I don't really see the advantages you mention; the dots don't make it any more complicated to process, and it's easy to get just the "simple name" if you want to, while the other way around is impossible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
} | ||
ctx.registerMetrics() | ||
logInfo(s"Initialized driver component for plugin $name.") | ||
Some(p.getClass().getName() -> driverPlugin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment about getName vs. getSimpleName below.
Just to clarify, I was thinking about registering the plugin source in the driver somewhere "near" to what was done in #23838 |
Test build #112626 has finished for PR 26170 at commit
|
|
||
def registerMetrics(): Unit = { | ||
if (!registry.getMetrics().isEmpty()) { | ||
val src = new PluginMetricsSource(pluginName, registry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been experimenting with adding a prefix to pluginName, something like "Plugin." + pluginName.
This would have the advantage in our current setup to ease the use of plugin metrics with a graphite end point stored in InfluxDB. We currently do this with templates for InfluxDB. Current templates take the first entry in the measurement field list (separated by dots) as the sourceName/namespace value (DAGScheduler, BlockManager, JVMCPU, executor, etc), example: https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Dashboard/influxdb.conf_GRAPHITE
Another possible (mild?) advantage of adding a prefix to the source name, ahead of the class name, is that it would not allow a clash of plugin names with existing metrics namespaces.
docs/monitoring.md
Outdated
- Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and | ||
configured using the Spark executor plugin infrastructure. | ||
See also the configuration parameter `spark.executor.plugins` | ||
- namespace=<Plugin Class Name> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it could make sense to add a similar entry about plugin metrics to the driver componenet instance metrics list, similarly to what has been done for the namespace=JVMCPU, for example.
Please add < to escape the "<" character (this issue actually comes from SPARK-20891)
Test build #112784 has started for PR 26170 at commit |
docs/monitoring.md
Outdated
- Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and | ||
configured using the Spark executor plugin infrastructure. | ||
See also the configuration parameter `spark.executor.plugins` | ||
- namespace=plugin.<Plugin Class Name> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe escape characters are needed for this, something like: namespace=plugin.\<Plugin Class Name>
Test build #112798 has finished for PR 26170 at commit
|
LGTM. Thanks @vanzin for all the work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes look good.
just curious if you tested with just an executor plugin? I guess you have another jira to deal with the old api.
assert(err.getMessage().contains("unknown message")) | ||
|
||
// It should be possible for the driver plugin to send a message to itself, even if that doesn't | ||
// make a whole lot of sense. It at least allows the same context class to be used on both |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this answers my question about this. I wasn't sure it made much sense but seems ok for reuse. Was wondering if you had a specific use case in mind but sounds like not.
I haven't explicitly, no, but also don't see what would not work. Trying to send messages to the driver would not work (and in the case of |
Test build #112857 has finished for PR 26170 at commit
|
Test build #112858 has finished for PR 26170 at commit
|
@@ -165,6 +166,11 @@ private[spark] class Executor( | |||
} | |||
} | |||
|
|||
// Plugins need to load using a class loader that includes the executor's user classpath | |||
private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(replClassLoader) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure that plugins should be loaded at this stage when running in local mode, maybe only the driver side of the plugin is sufficient in local mode?
Metrics source registration at this stage, when executed in local mode, will not get the application id. Other metrics handled in executors.scala are not registered when running in local mode.
The current implementation of executor plugins sends a "isLocal boolean" via the pluginContext to handle this case in the plugin logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can check if you're running in local mode by looking at the spark.master value. I intentionally did not add to the API since it would be redundant.
I'm also not especially worried about local mode. It's mostly for debugging. If something doesn't work 100% as intended for plugins, I'm totally fine with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, that should be fine.
retest this please |
Test build #113221 has finished for PR 26170 at commit
|
No more comments, merging to master. |
@vancin, BTW why do we have the same API in two places: https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java ? Is it for compatibility? if so, I think we should remove (https://github.com/apache/spark/pull/22192/files#r343418878) or deprecate the other. |
see the jira - it has a separate jira to remove the old one: |
See #26390 |
Ah, thanks guys. |
Spark 2.4 added the ability for executor plugins to be loaded into
Spark (see SPARK-24918). That feature intentionally skipped the
driver to keep changes small, and also because it is possible to
load code into the Spark driver using listeners + configuration.
But that is a bit awkward, because the listener interface does not
provide hooks into a lot of Spark functionality. This change reworks
the executor plugin interface to also extend to the driver.
load driver and executor components.
plugins to register metrics both in the driver process and in
executors.
executor components to send messages to the plugin's driver
component easily, using the existing Spark RPC system.
The latter was a feature intentionally left out of the original
plugin design (also because it didn't include a driver component).
To avoid polluting the "org.apache.spark" namespace, I added the new
interfaces to the "org.apache.spark.api" package, which seems like
a better place in any case. The actual implementation is kept in
an internal package.
The change includes unit tests for the new interface and features,
but I've also been running a custom plugin that extends the new
API in real applications.