Skip to content

Conversation

@juliuszsompolski
Copy link
Contributor

@juliuszsompolski juliuszsompolski commented May 25, 2023

What changes were proposed in this pull request?

  • Move code related to execution from being done directly in the GRPC callback in SparkConnextStreamHandler, to it's own classes.
  • ExecutionHolder (renamed from ExecuteHolder) launches the execution in it's own thread using ExecuteThreadRunner
  • The execution pushes responses via ExecuteResponseObserver (running in the execution thread)
  • ExecuteResponseObserver notifies ExecuteGrpcResponseSender (running in the rpc handler thread) to send the responses.
  • The actual execution code is refactored into SparkConnectPlanExecution

This allows to improve query interruption, by making interrupt method interrupt the execution thread. This makes interrupt work also when no Spark Jobs are running.

The refactoring further opens the possibilities of detaching query execution from a single RPC execution. Right now ExecutionHolder is waiting for the execution thread to finish, and ExecutePlanResponseObserver is forwarding the responses directly to the RPC observer.

In a followup, we can design different modes of execution, e.g.
ExecutePlanResponseObserver buffering the responses. A client which lost connection could then reconnect and ask for the stream to be retransmitted.

  • ExecutionHolder returning the operationId to the client directly, and then client requesting results in separate RPCs, with more control over the response stream, instead of having it just pushed to it.

Why are the changes needed?

  • Improve the working of interrupt
  • Refactoring that opens up possibilities of detaching query execution from a single RPC.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing Spark Connect CI covers the execution.

@juliuszsompolski juliuszsompolski changed the title [SPARK-43755] [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread May 25, 2023
@juliuszsompolski
Copy link
Contributor Author

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is no-changes verbatim moved out of SparkConnectStreamHandler.

@juliuszsompolski
Copy link
Contributor Author

[info] *** 1 TEST FAILED ***
[error] Failed: Total 3649, Failed 1, Errors 0, Passed 3648, Ignored 10, Canceled 2
[error] Failed tests:
[error] 	org.apache.spark.storage.BlockManagerProactiveReplicationSuite

unrelated flake.
Will wait with retriggering CI for review comments.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few nit picks, but looks good overall.

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Couple of nits that are mostly around naming and documentation. My only real comment would be to iron out what the public interface is going to be. Please make sure that all other classes are package private.

Comment on lines 72 to 76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason for the nesting? Why not create an ExecutionThread that inherits from Thread instead of creating a thread that keeps a reference to the outside that is called from within?

@juliuszsompolski
Copy link
Contributor Author

Ignore for now - ExecutePlanResponseSender is unfinished, didn't go through nit comments yet.

if (lastIndex.nonEmpty) {
throw new IllegalStateException("Stream onError can't be called after stream completed")
}
error = Some(t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we were just talking about this. It's great that we cache this here for an explicit error response to pikcup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding to what we were talking:

  • This picks up errors of ExecutePlan, but it wouldn't be hard to extend this execution mechanism to other RPCs.
  • However, this will pick up errors on the execution thread only. If errors happen on the grpc thread (these would be more internal errors like unable to start execution thread. So those would need a different mechanism.

TBH, if I were to tabula-rasa design it, I would not use GRPC errors for application errors (like errors for Spark execution), but have these returned as an explicit message type, and reserve GRPC onError for server errors. That would free us from GRPC onError size limitations that we now need such workarounds for. That would make it easier to distinguish network/framework errors from user errors, which would make it easier to establish retry policies, or to keep stats on user errors vs. system errors... But such change couldn't be done backwards compatible at this point. Maybe for Spark 4.0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gRPC errors are not just for internal system errors. The state of the stream is undefined after an error has been thrown and I think its fair to close it with onError. Using the response trailers for exceptions that contain an 8MB query plan and description is not great.

My current thinking is to leverage the reattach RPC to fetch the last error and get a hifi response message with all of the metadata of the error.

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass on the new files

}

def completed(): Boolean = synchronized {
lastIndex.isDefined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not make lastIndex a flag completed instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto lastIndex != index in the responses list.

@github-actions github-actions bot added the DOCS label Jul 13, 2023
case e: Throwable =>
logDebug(s"Exception in execute: $e")
// Always cancel all remaining execution after error.
executeHolder.sessionHolder.session.sparkContext.cancelJobsWithTag(executeHolder.jobTag)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this wait for cancellation to succeed? What happens if this throws?
I see currently only way is to do session.interruptAll().
Wondering if we plan to provide per-query API to cancel and if that would have any contract regarding cancellation.

Copy link
Contributor Author

@juliuszsompolski juliuszsompolski Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't wait and doesn't throw, it's just sends an async ping to DAGScheduler to cancel whatever is left running. See SparkContext.cancelJobsWithTag -> DAGScheduler.cancelJobsWithTag.
This is "to be safe and clean everything up" like in https://github.com/apache/spark/blob/master/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala#L248 in Thriftserver

I plan to add two more types of cancelation:

  • per-query (requires operation_id, to be introduces in a next PR that also deals with query reattach)
  • per user settable tag (very similar to SparkContext.addJobTag / cancelJobsWithTag, but on a Spark Connet SparkSession

@juliuszsompolski
Copy link
Contributor Author

juliuszsompolski commented Jul 14, 2023

Resolved (trivial) merge conflict.
Previous CI run had 1 flaky test:

2023-07-14T00:42:34.1310770Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m*** 1 TEST FAILED ***�[0m�[0m
2023-07-14T00:42:34.1515215Z �[0m[�[0m�[31merror�[0m] �[0m�[0mFailed: Total 9369, Failed 1, Errors 0, Passed 9368, Ignored 27�[0m
2023-07-14T00:42:34.1681702Z �[0m[�[0m�[31merror�[0m] �[0m�[0mFailed tests:�[0m
2023-07-14T00:42:34.1682533Z �[0m[�[0m�[31merror�[0m] �[0m�[0m	org.apache.spark.sql.execution.streaming.MicroBatchExecutionSuite�[0m

@juliuszsompolski
Copy link
Contributor Author

CI before the lint change was clean except for lint:
https://github.com/juliuszsompolski/apache-spark/actions/runs/5551730507

@xuanyuanking
Copy link
Member

Thanks! Merged to master.

asl3 pushed a commit to asl3/spark that referenced this pull request Jul 17, 2023
…ndler and to a different thread

### What changes were proposed in this pull request?

* Move code related to execution from being done directly in the GRPC callback in SparkConnextStreamHandler, to it's own classes.
* `ExecutionHolder` (renamed from `ExecuteHolder`) launches the execution in it's own thread using `ExecuteThreadRunner`
* The execution pushes responses via `ExecuteResponseObserver` (running in the execution thread)
* `ExecuteResponseObserver` notifies `ExecuteGrpcResponseSender` (running in the rpc handler thread) to send the responses.
* The actual execution code is refactored into `SparkConnectPlanExecution`

This allows to improve query interruption, by making `interrupt` method interrupt the execution thread. This makes `interrupt` work also when no Spark Jobs are running.

The refactoring further opens the possibilities of detaching query execution from a single RPC execution. Right now `ExecutionHolder` is waiting for the execution thread to finish, and `ExecutePlanResponseObserver` is forwarding the responses directly to the RPC observer.

In a followup, we can design different modes of execution, e.g.
`ExecutePlanResponseObserver` buffering the responses. A client which lost connection could then reconnect and ask for the stream to be retransmitted.
* `ExecutionHolder` returning the operationId to the client directly, and then client requesting results in separate RPCs, with more control over the response stream, instead of having it just pushed to it.

### Why are the changes needed?

* Improve the working of `interrupt`
* Refactoring that opens up possibilities of detaching query execution from a single RPC.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing Spark Connect CI covers the execution.

Closes apache#41315 from juliuszsompolski/sc-execute-thread.

Lead-authored-by: Juliusz Sompolski <julek@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Yuanjian Li <yuanjian.li@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants