-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49899][PYTHON][SS] Support deleteIfExists for TransformWithStateInPandas #48373
Conversation
Hi , I want to contribute to the project and can help out. Please let me know what to do! Thanks! |
try: | ||
yield result | ||
finally: | ||
statefulProcessor.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we set handle state to CLOSE
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized an issue that this is actually being called after processing each grouping key instead of finishing processing all keys for a microbatch. I'll need to revisit this to see if there's a good way to handle this (I cannot think about a good way to detect if the current key is the last key to process
right now), if it's not a quick fix, we can probably exclude it for now and have a followup PR fixing it. cc @HeartSaVioR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could try injecting a dummy row at the end of the iterator in writeNextInputToArrowStream
indicating all the keys have been processed, but I'll need to do some experiments first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel like current interface would give you such information - we'll probably need to have another control message to send the signal from JVM to Python (UDF). I agree this may take time, but probably need to mark it as a blocker so that we address before the release.
@@ -106,6 +106,30 @@ case class TransformWithStateInPandasExec( | |||
List.empty | |||
} | |||
|
|||
// operator specific metrics | |||
override def customStatefulOperatorMetrics: Seq[StatefulOperatorCustomMetric] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we have some simple tests around custom metrics to ensure this works for python?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in python test to verify.
@@ -1890,6 +1890,14 @@ def process(): | |||
try: | |||
serializer.dump_stream(out_iter, outfile) | |||
finally: | |||
# Sending a signal to TransformWithState UDF to perform proper cleanup steps. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR @jingz-db I made a change for properly calling close()
and other cleanup steps, could you help take a look and see if this change makes sense? The change is mainly in this file and group_ops.py
. I've verified with both manual test and exiting unit test to confirm this change works as expected. I'll fix the merge conflict issue later, just wanted to get some early feedbacks on this specific change, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks OK to me; I assume you've confirmed that the close method is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, confirmed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for making this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Also I moved the proto generated py files under a new directory for better project management purpose in my last PR. Run with protoc --proto_path=sql/core/src/main/protobuf/org/apache/spark/sql/execution/streaming --python_out=python/pyspark/sql/streaming/proto --pyi_out=python/pyspark/sql/streaming/proto sql/core/src/main/protobuf/org/apache/spark/sql/execution/streaming/StateMessage.proto
to generate the py files under the new directory to resolve the merge conflicts.
I'll give it a try, thanks @jingz-db! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass. Looks great in overall. One major comment about separating metrics change out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Friendly reminder: we can remove this now.
@@ -154,6 +178,7 @@ case class TransformWithStateInPandasExec( | |||
// by the upstream (consumer) operators in addition to the processing in this operator. | |||
allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - updatesStartTimeNs) | |||
commitTimeMs += timeTakenMs { | |||
processorHandle.doTtlCleanup() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a bugfix for existing bug? If then please file a new JIRA ticket and submit a new PR. Let's not mix up with different things.
@@ -106,6 +106,30 @@ case class TransformWithStateInPandasExec( | |||
List.empty | |||
} | |||
|
|||
// operator specific metrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, shall we move the change for metrics (and test) out to separate JIRA ticket and corresponding PR?
@@ -194,6 +195,10 @@ def test_transform_with_state_in_pandas_query_restarts(self): | |||
q.awaitTermination(10) | |||
self.assertTrue(q.exception() is None) | |||
|
|||
# Verify custom metrics. We created 2 value states in this test case and deleted 1 of them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, let's move this out.
@HeartSaVioR Addressed your comment, could you help take another look? Thanks! |
Create a new ticket https://issues.apache.org/jira/browse/SPARK-50270 to track the metrics change |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Thanks! Merging to master. |
What changes were proposed in this pull request?
close()
support for StatefulProcessor.Why are the changes needed?
Add parity to TransformWithStateInPandas for functionalities we support in TransformWithState
Does this PR introduce any user-facing change?
Yes
How was this patch tested?
New unit test.
Was this patch authored or co-authored using generative AI tooling?
No