diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala index 55e091bd8d08e..ec2db3efa9603 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala @@ -168,8 +168,12 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte resultHandler = resultHandler, resultFunc = () => ()) - // The man thread will wait until 0-th partition is available, + // The main thread will wait until 0-th partition is available, // then send it to client and wait for the next partition. + // Different from the implementation of [[Dataset#collectAsArrowToPython]], it sends + // the arrow batches in main thread to avoid DAGScheduler thread been blocked for + // tasks not related to scheduling. This is particularly important if there are + // multiple users or clients running code at the same time. var currentPartitionId = 0 while (currentPartitionId < numPartitions) { val partition = signal.synchronized {