Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
errorOnDuplicatedFieldNames = false)

var numSent = 0
var totalNumRows: Long = 0
def sendBatch(bytes: Array[Byte], count: Long): Unit = {
val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
val batch = proto.ExecutePlanResponse.ArrowBatch
Expand All @@ -120,14 +121,15 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
response.setArrowBatch(batch)
responseObserver.onNext(response.build())
numSent += 1
totalNumRows += count
}

dataframe.queryExecution.executedPlan match {
case LocalTableScanExec(_, rows) =>
executePlan.eventsManager.postFinished()
converter(rows.iterator).foreach { case (bytes, count) =>
sendBatch(bytes, count)
}
executePlan.eventsManager.postFinished(Some(totalNumRows))
case _ =>
SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) {
val rows = dataframe.queryExecution.executedPlan.execute()
Expand Down Expand Up @@ -162,8 +164,7 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
resultFunc = () => ())
// Collect errors and propagate them to the main thread.
.andThen {
case Success(_) =>
executePlan.eventsManager.postFinished()
case Success(_) => // do nothing
case Failure(throwable) =>
signal.synchronized {
error = Some(throwable)
Expand Down Expand Up @@ -200,8 +201,9 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
currentPartitionId += 1
}
ThreadUtils.awaitReady(future, Duration.Inf)
executePlan.eventsManager.postFinished(Some(totalNumRows))
} else {
executePlan.eventsManager.postFinished()
executePlan.eventsManager.postFinished(Some(totalNumRows))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2480,7 +2480,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
.putAllArgs(getSqlCommand.getArgsMap)
.addAllPosArgs(getSqlCommand.getPosArgsList)))
}
executeHolder.eventsManager.postFinished()
executeHolder.eventsManager.postFinished(Some(rows.size))
// Exactly one SQL Command Result Batch
responseObserver.onNext(
ExecutePlanResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) {

private var canceled = Option.empty[Boolean]

private var producedRowCount = Option.empty[Long]

/**
* @return
* Last event posted by the Connect request
Expand All @@ -95,6 +97,13 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) {
*/
private[connect] def hasError: Option[Boolean] = error

/**
* @return
* How many rows the Connect request has produced @link
* org.apache.spark.sql.connect.service.SparkListenerConnectOperationFinished
*/
private[connect] def getProducedRowCount: Option[Long] = producedRowCount

/**
* Post @link org.apache.spark.sql.connect.service.SparkListenerConnectOperationStarted.
*/
Expand Down Expand Up @@ -192,13 +201,23 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) {

/**
* Post @link org.apache.spark.sql.connect.service.SparkListenerConnectOperationFinished.
* @param producedRowsCountOpt
* Number of rows that are returned to the user. None is expected when the operation does not
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: why not use 0 if not rows are returned?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think returning None would make it clear to us whether the corresponding query should have produced row or not, for example for a SELECT statement that has no result, we would put 0 here. And for query Like INSERT INTO, we would put a None instead of 0 here. This could better tell us whether a produced row is expected or not

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed with gjxdxh that it's clearer with None that no rows are expected for this operation.

* return any rows.
*/
def postFinished(): Unit = {
def postFinished(producedRowsCountOpt: Option[Long] = None): Unit = {
assertStatus(
List(ExecuteStatus.Started, ExecuteStatus.ReadyForExecution),
ExecuteStatus.Finished)
producedRowCount = producedRowsCountOpt

listenerBus
.post(SparkListenerConnectOperationFinished(jobTag, operationId, clock.getTimeMillis()))
.post(
SparkListenerConnectOperationFinished(
jobTag,
operationId,
clock.getTimeMillis(),
producedRowCount))
}

/**
Expand Down Expand Up @@ -395,13 +414,17 @@ case class SparkListenerConnectOperationFailed(
* 36 characters UUID assigned by Connect during a request.
* @param eventTime:
* The time in ms when the event was generated.
* @param producedRowCount:
* Number of rows that are returned to the user. None is expected when the operation does not
* return any rows.
* @param extraTags:
* Additional metadata during the request.
*/
case class SparkListenerConnectOperationFinished(
jobTag: String,
operationId: String,
eventTime: Long,
producedRowCount: Option[Long] = None,
extraTags: Map[String, String] = Map.empty)
extends SparkListenerEvent

Expand Down
Loading