-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25095][PySpark] Python support for BarrierTaskContext #22085
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| logError(message) | ||
| throw new SparkException(message) | ||
| } else { | ||
| logDebug(s"Started GatewayServer to port BarrierTaskContext on port $boundPort.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When isBarrier is false, I think we don't need show this?
|
|
||
| # initialize global state | ||
| taskContext = TaskContext._getOrCreate() | ||
| isBarrier = read_bool(infile) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment indicating the following 3 inputs are only for barrier task?
python/pyspark/worker.py
Outdated
| _accumulatorRegistry.clear() | ||
|
|
||
| if isBarrier: | ||
| paras = GatewayParameters(port=boundPort, auth_token=secret, auto_convert=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe params instead of paras?
python/pyspark/taskcontext.py
Outdated
| """ | ||
|
|
||
| _taskContext = None | ||
| _javaContext = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_barrierContext?
| } else { | ||
| logDebug(s"Started GatewayServer to port BarrierTaskContext on port $boundPort.") | ||
| } | ||
| // Write out the TaskContextInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment should be moved too.
|
Test build #94650 has finished for PR 22085 at commit
|
mengxr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also add tests.
| """ | ||
| return self._localProperties.get(key, None) | ||
|
|
||
| def barrier(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create BarrierTaskContext that extends TaskContext and then move those two methods there.
python/pyspark/taskcontext.py
Outdated
| raise Exception("Not supported to call getTaskInfos() inside a non-barrier task.") | ||
| else: | ||
| java_list = self._javaContext.getTaskInfos() | ||
| return [h for h in java_list] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create BarrierTaskInfo class and wrap it over Java object.
| .entryPoint(context.asInstanceOf[BarrierTaskContext]) | ||
| .authToken(secret) | ||
| .javaPort(0) | ||
| .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave a TODO here. We do not have requests from Java to Python.
| .javaPort(0) | ||
| .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), | ||
| secret) | ||
| .build()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait wait.. you guys sure have another Java gateway for each worker? (or did I rush to read this code?) Can you elaborate why this is needed? We should avoid this unless it's super required or necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to port BarrierTaskContext from java to python side, otherwise there is no way to call BarrierTaskContext.barrier() from python side. Thus, of course, the JavaGateway is only initiated when the context is a BarrierTaskContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I read and understood if this is only initialised when the context is a BarrierTaskContext but this is super weird we start another Java gateway here. If it's a hard requirement, then I suspect the design issue. Should this be targeted to 2.4.0, @mengxr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this should necessarily target 2.4.0, don't block by me since it's a new feature and probably we could consider another approach later but if we can avoid, I would suggest to avoid for now.
Let me try to track the design doc and changes about this. I think I need more time to check why it happened like this and if there's another way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon Could you elaborate your concerns? Is it because resource usage or security?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mainly the reason is about resource usage, unusual access pattern via Py4J at worker side, and the possibility of allowing JVM access within Python worker.
It pretty much looks an overkill to launch a Java gateway to allow access to call a function assuming from #22085 (comment). This pattern sounds pretty unusual - such cases, we usually send the data manually and read it in Python side, for instance TaskContext. Now, it opens a gateway for each worker if I am not mistaken.
I was thinking if we can avoid this. Can you elaborate why this is required and necessary? I haven't got enough time to look into this so was thinking about taking a look on this weekends.
This also now opens a possibility for an JVM access from worker side via BarrierTaskContext. For instance, I believe we can hack and access to JVM inside of UDFs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, this is a design change https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals .. is this really required to open a gate there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The major issue here is that we want to make the barrier() call blocking, the task shall wait until timeout or succeeded, do we have other ways to achieve this goal other than current approach here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet. So I asked to hold it on for now since another gateway here looks the last choice, and was wondering if we can avoid to target 2.4.0. If this blocks, please go ahead. Will check it later on this weekends.
|
@mengxr, let me leave this link here again - #22085 (comment) in case this is missed by being folded. |
|
Test build #94681 has finished for PR 22085 at commit
|
|
|
||
| def __init__(self): | ||
| """Construct a BarrierTaskContext, use get instead""" | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would throw an exception BTW if this method should rather be banned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just follows TaskContext.__init__(), shall we update both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is called in _getOrCreate. Sorry, I rushed to read. In this case, frankly I think we can remove this since that's the default constructor injected by Python or monkey patch to disallow the initialization (like we did for ImageSchema) but I guess we don't necessarily be super clever on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm okay as is.
python/pyspark/taskcontext.py
Outdated
| """ | ||
|
|
||
| def __init__(self, info): | ||
| self.address = info.address |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- should be
info.address() - better to rename
infotojobjto make it clear this is from Java
python/pyspark/taskcontext.py
Outdated
| """ | ||
|
|
||
| def __init__(self, info): | ||
| self.address = info.address |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- should be
info.address() - better to rename
infotojobjto make it clear this is from Java
|
Test build #94738 has finished for PR 22085 at commit
|
|
@HyukjinKwon Thanks for the feedback! We will replace the py4j route by a special implementation that can only trigger "context.barrier()" in JVM. |
|
Thank you so much @mengxr and @jiangxb1987. |
| /** | ||
| * Gateway to call BarrierTaskContext.barrier(). | ||
| */ | ||
| def barrierAndServe(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear yet how to trigger this.
| else: | ||
| _load_from_socket(self._port, self._secret) | ||
|
|
||
| def getTaskInfos(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not available temporarily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
Test build #94901 has finished for PR 22085 at commit
|
|
Test build #94942 has finished for PR 22085 at commit
|
|
Test build #94951 has finished for PR 22085 at commit
|
| import scala.collection.JavaConverters._ | ||
|
|
||
| import org.apache.spark._ | ||
| import org.apache.spark.{SparkException, _} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_ should include SparkException already
| return self._localProperties.get(key, None) | ||
|
|
||
|
|
||
| def _load_from_socket(port, auth_secret): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should document how this is different from the one in context.py.
|
|
||
| // Authentication helper used when serving method calls via socket from Python side. | ||
| private lazy val authHelper = { | ||
| val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When SparkEnv.get returns null?
| // Init a ServerSocket to accept method calls from Python side. | ||
| val isBarrier = context.isInstanceOf[BarrierTaskContext] | ||
| if (isBarrier) { | ||
| serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: useful to add /* port */ and /* backlog */
| sock = serverSocket.get.accept() | ||
| sock.setSoTimeout(10000) | ||
| val cmdString = readUtf8(sock) | ||
| if (cmdString.equals("run")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do not expect any other command from the socket, we should throw an exception
| var sock: Socket = null | ||
| try { | ||
| sock = serverSocket.get.accept() | ||
| sock.setSoTimeout(10000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should add a comment about this timeout.
| barrierAndServe(sock) | ||
| } | ||
| } catch { | ||
| case _: SocketException => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the the timeout exception? I don't see any exception that we could silently ignore.
| } | ||
|
|
||
| def writeUTF(str: String, dataOut: DataOutputStream) { | ||
| val bytes = str.getBytes(StandardCharsets.UTF_8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: UTF_8 or always use StandardCharsets
| @classmethod | ||
| def _getOrCreate(cls): | ||
| """Internal function to get or create global BarrierTaskContext.""" | ||
| if cls._taskContext is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Does it handle python worker reuse?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC reuse python worker just means we start a python worker from a daemon thread, it shall not affect the input/output files related to worker.py.
| import org.apache.spark.security.SocketAuthHelper | ||
| import org.apache.spark.util._ | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny nit: I would remove this one back while addressing other comments.
|
Test build #95037 has finished for PR 22085 at commit
|
|
Test build #95040 has finished for PR 22085 at commit
|
|
LGTM. I'm merging this into master. We might need a minor refactor for readability. But it shouldn't block developers testing this new feature. Thanks! |
| Load data from a given socket, this is a blocking method thus only return when the socket | ||
| connection has been closed. | ||
| This is copied from context.py, while modified the message protocol. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nicer if we can deduplciate it later.
|
Thanks, @jiangxb1987 and @mengxr again. |
| } | ||
| // Close ServerSocket on task completion. | ||
| serverSocket.foreach { server => | ||
| context.addTaskCompletionListener(_ => server.close()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is failing the Scala 2.12 build
[error] /Users/d_tsai/dev/apache-spark/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:242: ambiguous reference to overloaded definition,
[error] both method addTaskCompletionListener in class TaskContext of type [U](f: org.apache.spark.TaskContext => U)org.apache.spark.TaskContext
[error] and method addTaskCompletionListener in class TaskContext of type (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext
[error] match argument types (org.apache.spark.TaskContext => Unit)
[error] context.addTaskCompletionListener(_ => server.close())
[error] ^
[error] one error found
[error] Compile failed at Aug 24, 2018 1:56:06 PM [31.582s]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in #22229
| sock = serverSocket.get.accept() | ||
| // Wait for function call from python side. | ||
| sock.setSoTimeout(10000) | ||
| val input = new DataInputStream(sock.getInputStream()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why isn't authentication the first thing which happens on this connection? I don't think anything bad can happen in this case, but it just makes it more likely we leave a security hole here later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I'd also like to do some refactoring of the socket setup code in python, and that can go further if we do authenticaion first here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this, yea I agree it would be better to move the authentication before recognising functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I'm doing this -- SPARK-25253, will open a pr shortly
What changes were proposed in this pull request?
Add method
barrier()andgetTaskInfos()in python TaskContext, these two methods are only allowed for barrier tasks.How was this patch tested?
Add new tests in
tests.py