-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29397][core] Extend plugin interface to include the driver. #26170
Changes from 4 commits
4cbe86e
6a0cb1c
94329b2
7cc7536
5dd6afd
ccb950e
e38a8da
76a5b0d
2c81c31
6b59f5c
e04e0eb
2dd8dff
e12eedf
8352e28
69600e6
37ad680
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.api.plugin; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
|
||
import org.apache.spark.SparkContext; | ||
import org.apache.spark.annotation.DeveloperApi; | ||
|
||
/** | ||
* :: DeveloperApi :: | ||
* Driver component of a {@link SparkPlugin}. | ||
* | ||
* @since 3.0.0 | ||
*/ | ||
@DeveloperApi | ||
public interface DriverPlugin { | ||
|
||
/** | ||
* Initialize the plugin. | ||
* <p> | ||
* This method is called early in the initialization of the Spark driver. Explicitly, it is | ||
* called before the application is submitted to the cluster manager. This means that a lot | ||
* of Spark subsystems may yet not have been initialized. This call also blocks driver | ||
* initialization. | ||
* <p> | ||
* It's recommended that plugins be careful about what operations are performed in this call, | ||
* preferrably performing expensive operations in a separate thread, or postponing them until | ||
* the application has fully started. | ||
* | ||
* @param sc The SparkContext loading the plugin. | ||
* @param pluginContext Additional plugin-specific about the Spark application where the plugin | ||
* is running. | ||
* @return A map containing configuration data for the executor-side component of the plugin. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. its not clear to me what is returned? do these need to be spark confs? I'm guessing not but would be good to clarify. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you suggest something? It already says this exact map is provided to the executor plugin's init method. I don't know how can I be clearer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps just say the configuration keys are user defined. Are there any other formatting restrictions? like they shouldn't use spark. or special characters, etc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like trying to explain more just makes it more confusing. You return a map. The map magically appears as an argument to the executor side's init method, with the exact contents you returned. Simple. Whatever you can put in that map will show up on the other side. |
||
* This map will be provided to the {@link ExecutorPlugin}'s initialization method. | ||
*/ | ||
default Map<String, String> init(SparkContext sc, PluginContext pluginContext) { | ||
return Collections.emptyMap(); | ||
} | ||
|
||
/** | ||
* RPC message handler. | ||
* <p> | ||
* Plugins can use Spark's RPC system to send messages from executors to the driver (but not | ||
* the other way around, currently). Messages sent by the executor component of the plugin will | ||
* be delivered to this method, and the returned value will be sent back to the executor as | ||
* the reply, if the executor has requested one. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how does this function know if executor requested reply? I assume its up to them to infer from message type? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's up to the plugin code. I'm trying to avoid exposing two methods to handle RPC messages. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
* <p> | ||
* Any exception thrown will be sent back to the executor as an error, in case it is expecting | ||
* a reply. In case a reply is not expected, a log message will be written to the driver log. | ||
* <p> | ||
* The implementation of this handler should be thread-safe. | ||
* <p> | ||
* Note all plugins share RPC dispatch threads, and this method is called synchronously. So | ||
* performing expensive operations in this handler may affect the operation of other active | ||
* plugins. Internal Spark endpoints are not directly affected, though, since they use different | ||
* threads. | ||
* <p> | ||
* Spark guarantees that the driver component will be ready to receive messages through this | ||
* handler when executors are started. | ||
* | ||
* @param message The incoming message. | ||
* @return Value to be returned to the caller. Ignored if the caller does not expect a reply. | ||
*/ | ||
default Object receive(Object message) throws Exception { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
/** | ||
* Informs the plugin that the Spark application is shutting down. | ||
* <p> | ||
* This method is called during the driver shutdown phase. It is recommended that plugins | ||
* not use any Spark functions (e.g. send RPC messages) during this call. | ||
*/ | ||
default void shutdown() {} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.api.plugin; | ||
|
||
import java.util.Map; | ||
|
||
import org.apache.spark.annotation.DeveloperApi; | ||
|
||
/** | ||
* :: DeveloperApi :: | ||
* Executor component of a {@link SparkPlugin}. | ||
* | ||
* @since 3.0.0 | ||
*/ | ||
@DeveloperApi | ||
public interface ExecutorPlugin { | ||
|
||
/** | ||
* Initialize the executor plugin. | ||
* <p> | ||
* When a Spark plugin provides an executor plugin, this method will be called during the | ||
* initialization of the executor process. It will block executor initialization until it | ||
* returns. | ||
* | ||
* @param ctx Context information for the executor where the plugin is running. | ||
* @param extraConf Extra configuration provided by the driver component during its | ||
* initialization. | ||
*/ | ||
default void init(PluginContext ctx, Map<String, String> extraConf) {} | ||
|
||
/** | ||
* Clean up and terminate this plugin. | ||
* <p> | ||
* This method is called during the executor shutdown phase, and blocks executor shutdown. | ||
*/ | ||
default void shutdown() {} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.api.plugin; | ||
|
||
import java.io.IOException; | ||
|
||
import com.codahale.metrics.MetricRegistry; | ||
|
||
import org.apache.spark.SparkConf; | ||
import org.apache.spark.annotation.DeveloperApi; | ||
|
||
/** | ||
* :: DeveloperApi :: | ||
* Context information and operations for plugins loaded by Spark. | ||
* <p> | ||
* An instance of this class is provided to plugins in their initialization method. It is safe | ||
* for plugins to keep a reference to the instance for later use (for example, to send messages | ||
* to the plugin's driver component). | ||
* <p> | ||
* Context instances are plugin-specific, so metrics and messages are tied each plugin. It is | ||
* not possible for a plugin to directly interact with other plugins. | ||
* | ||
* @since 3.0.0 | ||
*/ | ||
@DeveloperApi | ||
public interface PluginContext { | ||
|
||
/** | ||
* Registry where to register metrics published by the plugin associated with this context. | ||
* <p> | ||
* Plugins should register all needed metrics in their initialization callback, otherwise | ||
* Spark's metrics system may not properly report them. It's safe for plugins to access this | ||
* registry later to interface with the registered metrics, but adding or removing metrics | ||
* after initialization may not have the desired effect. | ||
* <p> | ||
* If the plugin does not register any metrics during its initialization call, a metrics | ||
* source for the plugin will not be generated. | ||
*/ | ||
MetricRegistry metricRegistry(); | ||
|
||
/** Configuration of the Spark application. */ | ||
SparkConf conf(); | ||
|
||
/** Executor ID of the process. On the driver, this will identify the driver. */ | ||
String executorID(); | ||
|
||
/** The host name which is being used by the Spark process for communication. */ | ||
String hostname(); | ||
|
||
/** | ||
* Send a message to the plugin's driver-side component. | ||
* <p> | ||
* This method sends a message to the driver-side component of the plugin, without expecting | ||
* a reply. It returns as soon as the message is enqueued for sending. | ||
* <p> | ||
* The message must be serializable. | ||
* | ||
* @param message Message to be sent. | ||
*/ | ||
void send(Object message) throws IOException; | ||
|
||
/** | ||
* Send an RPC to the plugin's driver-side component. | ||
* <p> | ||
* This method sends a message to the driver-side component of the plugin, and blocks until a | ||
* reply arrives, or the configured RPC ask timeout (<code>spark.rpc.askTimeout</code>) elapses. | ||
* <p> | ||
* If the driver replies with an error, an exception with the corresponding error will be thrown. | ||
* <p> | ||
* The message must be serializable. | ||
* | ||
* @param message Message to be sent. | ||
* @return The reply from the driver-side component. | ||
*/ | ||
Object ask(Object message) throws Exception; | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.api.plugin; | ||
|
||
import org.apache.spark.annotation.DeveloperApi; | ||
|
||
/** | ||
* :: DeveloperApi :: | ||
* A plugin that can be dynamically loaded into a Spark application. | ||
* <p> | ||
* Plugins can be loaded by adding the plugin's class name to the appropriate Spark configuration. | ||
* Check the Spark configuration documentation for details. | ||
* <p> | ||
* Plugins have two components: a driver-side component, of which a single instance is created | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. perhaps add in two optional components since I think you can run one without the other |
||
* per application, inside the Spark driver. And an executor-side component, of which one instance | ||
* is created in each executor that is started by Spark. Details of each component can be found | ||
* in the documentation for {@link DriverPlugin} and {@link ExecutorPlugin}. | ||
* | ||
* @since 3.0.0 | ||
*/ | ||
@DeveloperApi | ||
public interface SparkPlugin { | ||
|
||
/** | ||
* Return the plugin's driver-side component. | ||
* | ||
* @return The driver-side component, or null if one is not needed. | ||
*/ | ||
DriverPlugin driverPlugin(); | ||
|
||
/** | ||
* Return the plugin's executor-side component. | ||
* | ||
* @return The executor-side component, or null if one is not needed. | ||
*/ | ||
ExecutorPlugin executorPlugin(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,7 @@ import org.apache.spark.internal.Logging | |
import org.apache.spark.internal.config._ | ||
import org.apache.spark.internal.config.Tests._ | ||
import org.apache.spark.internal.config.UI._ | ||
import org.apache.spark.internal.plugin.PluginContainer | ||
import org.apache.spark.io.CompressionCodec | ||
import org.apache.spark.metrics.source.JVMCPUSource | ||
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} | ||
|
@@ -220,6 +221,7 @@ class SparkContext(config: SparkConf) extends Logging { | |
private var _heartbeater: Heartbeater = _ | ||
private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _ | ||
private var _shuffleDriverComponents: ShuffleDriverComponents = _ | ||
private var _plugins: Option[PluginContainer] = None | ||
|
||
/* ------------------------------------------------------------------------------------- * | ||
| Accessors and public fields. These provide access to the internal state of the | | ||
|
@@ -539,6 +541,9 @@ class SparkContext(config: SparkConf) extends Logging { | |
_heartbeatReceiver = env.rpcEnv.setupEndpoint( | ||
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) | ||
|
||
// Initialize any plugins before the app is initialized in the cluster manager. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similar to comment above, this comment is a bit confusing to me perhaps we can clarify |
||
_plugins = PluginContainer(this) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are obvious advantages to initialize the driver plugin at this early stage, however this is not an ideal point for registering metrics (for those plugins that want to do so), as the metrics source should ideally be registered with _env.metricsSystem which is only started at later point, after the task scheduler has been started. As it is now, driver plugin metrics do not get the application id, so they are difficult to consume. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can take another look, but I'm 99% sure that when I tried Maybe it's a bug in that particular sink, but I didn't investigate that far. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, if you really want to, you can register metrics later. Just install a listener and wait for the "application start" event. (You just need at least a dummy metric registered here, or your source won't be initialized. But that can be fixed easily.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll just move the conversation back here because github doesn't have top-level threads and that's annoying.
That has the problem I ran into - that's done after And the thing is, I can't just move the metrics registration to that spot. I'd have to move all plugin initialization to that spot, otherwise there's no place where the plugin can initialize the metrics before the registration. And I can't move the plugin initialization to that spot, because then I can't send the plugin data to executors via config. I could use RPCs for that but it would slow down executor startup unnecessarily. So, ignoring the I can change the code to allow for this late initialization, so you don't have to add a dummy metric in the plugin's init. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, here's what happens. If you add a metric to the registry after you register it as a source, those metrics don't show up. At least not in I'll add a new method to the driver plugins for explicitly registering metrics; that way initialization can happen early, and metrics initialization later. It diverges a bit from the executor API, but I don't see a much better alternative. To be able to register plugin RPC endpoints before executors are up, initialization needs to happen early. (That also avoids a warning from the metrics system in the output when you register a source before an app ID is known.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have just tested it and it works for me. Thanks for the work and explanation. |
||
|
||
// Create and start the scheduler | ||
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) | ||
_schedulerBackend = sched | ||
|
@@ -1976,6 +1981,9 @@ class SparkContext(config: SparkConf) extends Logging { | |
_listenerBusStarted = false | ||
} | ||
} | ||
Utils.tryLogNonFatalError { | ||
_plugins.foreach(_.shutdown()) | ||
} | ||
Utils.tryLogNonFatalError { | ||
_eventLogger.foreach(_.stop()) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1159,6 +1159,17 @@ package object config { | |
s"The value must be in allowed range [1,048,576, ${MAX_BUFFER_SIZE_BYTES}].") | ||
.createWithDefault(1024 * 1024) | ||
|
||
private[spark] val STATIC_PLUGINS_LIST = "spark.plugins.static" | ||
|
||
private[spark] val PLUGINS = | ||
ConfigBuilder("spark.plugins") | ||
.withPrepended(STATIC_PLUGINS_LIST, separator = ",") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. its to bad we don't have documentation on withPrepended as it was not clear what it did. My initial thought was it was something to do with the actual config name. After investigating more and reading the code figured it out, but I guess that is a separate issue. It seems to me we have a couple of these, this one named .static., the java options with defaultJavaOptions. It might be nice to keep a consistent naming theme if we are going to start supporting having cluster level ones and then user level ones and they aren't overrides, they are prepends. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll change to something but I've always disliked the "default" name. Since "default" means it can be overridden, and that's not the goal here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thats fine, I agree with the "default" name, I was just thinking if we can come up with standard naming it would be nice. |
||
.doc("Comma-separated list of class names implementing " + | ||
"org.apache.spark.api.plugin.SparkPlugin to load into the application.") | ||
.stringConf | ||
.toSequence | ||
.createWithDefault(Nil) | ||
|
||
private[spark] val EXECUTOR_PLUGINS = | ||
ConfigBuilder("spark.executor.plugins") | ||
.doc("Comma-separated list of class names for \"plugins\" implementing " + | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before it is submitted? I assume that is client mode and not yarn cluster mode for instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change; "before the task scheduler is initialized".