Skip to content

[SPARK-54585][SS] Fix State Store rollback when thread is in interrupted state#53313

Closed
dylanwong250 wants to merge 4 commits intoapache:masterfrom
dylanwong250:SPARK-54585
Closed

[SPARK-54585][SS] Fix State Store rollback when thread is in interrupted state#53313
dylanwong250 wants to merge 4 commits intoapache:masterfrom
dylanwong250:SPARK-54585

Conversation

@dylanwong250
Copy link
Contributor

@dylanwong250 dylanwong250 commented Dec 3, 2025

What changes were proposed in this pull request?

  1. Modifies ChecksumCancellableFSDataOutputStream.cancel() to cancel both the main stream and checksum stream synchronously instead of using Futures with awaitResult.

  2. Moves changelogWriter.foreach(_.abort()) and changelogWriter = None in a try finally block within RocksDB.rollback().

Why are the changes needed?

For fix 1:

When cancel() is called while the thread is in an interrupted state (e.g., during task cancellation), the previous implementation would fail. The code submitted Futures to cancel each stream, then called awaitResult() to wait for completion. However, awaitResult() checks the thread's interrupt flag and throws InterruptedException immediately if the thread is interrupted.

For fix 2:

Consider the case where abort() is called on RocksDBStateStoreProvider. This calls rollback() on the RocksDB instance, which in turn calls changelogWriter.foreach(_.abort()) and then sets changelogWriter = None.

However, if changelogWriter.abort() throws an exception, the finally block still sets backingFileStream and compressedStream to null. The exception propagates, and we never reach the line that sets changelogWriter = None.

This leaves the RocksDB instance in an inconsistent state:

  • changelogWriter = Some(changelogWriterWeAttemptedToAbort)
  • changelogWriterWeAttemptedToAbort.backingFileStream = null
  • changelogWriterWeAttemptedToAbort.compressedStream = null

Now consider calling RocksDB.load() again. This calls replayChangelog(), which calls put(), which calls changelogWriter.put(). At this point, the assertion assert(compressedStream != null) fails, causing an exception while loading the StateStore.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added test "SPARK-54585: Interrupted task calling rollback does not throw an exception" which simulates the case when a thread in the interrupted state and begins a rollback

Was this patch authored or co-authored using generative AI tooling?

No

Copy link
Contributor

@micheal-o micheal-o left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nits. Thanks

@dylanwong250
Copy link
Contributor Author

@micheal-o I made some changes to fix the root cause of the interrupted exception. It happens when the executor interrupts the task thread by setting its interrupt flag, and awaitResult() checks this flag before waiting, throwing InterruptedException immediately if set.

PTAL. Thanks

@dylanwong250 dylanwong250 changed the title [SPARK-54585][SS] Always set changelogWriter to None on rollback [SPARK-54585][SS] Fix State Store rollback when thread is in interrupted state Dec 9, 2025
Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending green CI

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants