-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-48849][SS]Create OperatorStateMetadataV2 for the TransformWithStateExec operator #47445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…StateExec operator
...n/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
Outdated
Show resolved
Hide resolved
...scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
Show resolved
Hide resolved
...scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Outdated
Show resolved
Hide resolved
...scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
Outdated
Show resolved
Hide resolved
...n/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
Show resolved
Hide resolved
...n/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
|
@HeartSaVioR PTAL, thanks! |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass. Looks good in general.
...scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
Show resolved
Hide resolved
...scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
Outdated
Show resolved
Hide resolved
...scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
Show resolved
Hide resolved
...n/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
Show resolved
Hide resolved
| * @param extraOptions - any extra options to be passed for StateStoreConf creation | ||
| * @param storeName - optional state store name | ||
| * @param schemaFilePath - optional schema file path | ||
| * @param oldSchemaFilePath - optional path to the old schema file. If not provided, will default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it valid for schema version 3 to have None? Otherwise let's also mention here as same as below; Needed for schema version 3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we don't expect there to be a schema file if we are in the first run of a new query, right?
...re/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
Outdated
Show resolved
Hide resolved
...scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
Show resolved
Hide resolved
|
@HeartSaVioR addressed feedback, please take another look when you get a chance |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
Thanks! Merging to master. |
…hStateExec operator ### What changes were proposed in this pull request? Introducing the OperatorStateMetadataV2 format that integrates with the TransformWithStateExec operator. This is used to keep information about the TWS operator, will be used to enforce invariants in between query runs. Each OperatorStateMetadataV2 has a pointer to the StateSchemaV3 file for the corresponding operator. Will introduce purging in this PR: apache#47286 ### Why are the changes needed? This is needed for State Metadata integration with the TransformWithState operator. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Added unit tests to StateStoreSuite and TransformWithStateSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47445 from ericm-db/metadata-v2. Authored-by: Eric Marnadi <eric.marnadi@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
Introducing the OperatorStateMetadataV2 format that integrates with the TransformWithStateExec operator. This is used to keep information about the TWS operator, will be used to enforce invariants in between query runs. Each OperatorStateMetadataV2 has a pointer to the StateSchemaV3 file for the corresponding operator.
Will introduce purging in this PR: #47286
Why are the changes needed?
This is needed for State Metadata integration with the TransformWithState operator.
Does this PR introduce any user-facing change?
How was this patch tested?
Added unit tests to StateStoreSuite and TransformWithStateSuite
Was this patch authored or co-authored using generative AI tooling?
No