Skip to content

Conversation

@ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Apr 16, 2025

What changes were proposed in this pull request?

When state variable operations are called within the init() function of the StatefulProcessorHandle, a NullPointerException is thrown, failing the query (even through this is user error).
Throw an error with a message that better describes the issue.

Why are the changes needed?

The NullPointerException doesn't describe the issue to the user, even though this is user error. This will let them know that the function they are calling within init() is disallowed.

Does this PR introduce any user-facing change?

Before this change:

12:20:32.646 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 9a666389-5fc4-4c4c-b621-5730f9e9bd03, runId = d3535902-1c7a-442a-b52c-701d2511e56c] terminated with error
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase planning failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:631)

After this change:

11:52:20.303 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 81b2e2ff-ca37-4d40-a533-2848de6984b5, runId = 29bbdc65-f198-440f-b2ba-20700159f839] terminated with error
org.apache.spark.sql.execution.streaming.state.StatefulProcessorCannotPerformOperationWithInvalidHandleState: [STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_HANDLE_STATE] Failed to perform stateful processor operation=listState.exists with invalid handle state=PRE_INIT. SQLSTATE: 42802
	at org.apache.spark.sql.execution.streaming.state.StateStoreErrors$.cannotPerformOperationWithInvalidHandleState(StateStoreErrors.scala:123)

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@ericm-db ericm-db changed the title Throwing classified error when disallowed functions are called during StatefulProcessor.init() [SPARK-51822] Throwing classified error when disallowed functions are called during StatefulProcessor.init() Apr 16, 2025
@ericm-db ericm-db requested a review from anishshri-db April 16, 2025 20:55
@ericm-db
Copy link
Contributor Author

@HeartSaVioR Can you PTAL when you get a chance?

}
}

private[sql] class DriverSideValueState[S](override val stateName: String)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO DriverSide naming is confusing, I would expect some driver side functional state based on the name, while the actual implementation is not functional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericm-db
Shall we rename it to represent InvalidHandle since you represented this as InvalidHandle in error method name? E.g. rename trait to InvalidHandleState and so on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay sure, sounds good.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI

@HeartSaVioR HeartSaVioR changed the title [SPARK-51822] Throwing classified error when disallowed functions are called during StatefulProcessor.init() [SPARK-51822]][SS] Throwing classified error when disallowed functions are called during StatefulProcessor.init() Apr 16, 2025
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the change of rename - pending CI.

@vrozov
Copy link
Member

vrozov commented Apr 17, 2025

LGTM with nit (IMO delegation will be a better option here).

@HeartSaVioR HeartSaVioR changed the title [SPARK-51822]][SS] Throwing classified error when disallowed functions are called during StatefulProcessor.init() [SPARK-51822][SS] Throwing classified error when disallowed functions are called during StatefulProcessor.init() Apr 17, 2025
@HeartSaVioR
Copy link
Contributor

@ericm-db Looks like CI failure is relevant.

@ericm-db
Copy link
Contributor Author

ericm-db commented Apr 17, 2025

The crux of the issue is that in a test we do the following:

  override def init(
      outputMode: OutputMode,
      timeMode: TimeMode): Unit = {
    _mapState = getHandle
      .getMapState("mapState", Encoders.STRING, Encoders.scalaInt, ttlConfig)
      .asInstanceOf[MapStateImplWithTTL[String, Int]]
  }

where we fetch MapState and cast it to MapStateImplWithTTL. MapStateImplWithTTL is not a private class, so this is technically allowed behavior, but since InvalidHandleMapState does not inherit or extend the Impl class, this throws an error.
This error happens for all State Variable types. We do this in the test since we perform some checks in the test using MapStateImplWithTTL methods that are package private.
Should we allow this behavior? cc @anishshri-db @HeartSaVioR

@HeartSaVioR
Copy link
Contributor

One way is do pattern match and fail if the type is not expected "on demand" - when the variable is accessed in test, it is NOT from driver side of init.

@ericm-db
Copy link
Contributor Author

@HeartSaVioR this change should be good to merge

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants