[SPARK-42566][SS] RocksDB StateStore lock acquisition should happen after getting input iterator from inputRDD #40162
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The current behavior of the
computemethod in bothStateStoreRDDandReadStateStoreRDDis: we first get the state store instance and then get the input iterator for the inputRDD.For RocksDB state store, the running task will acquire and hold the lock for this instance. The retried task or speculative task will fail to acquire the lock and eventually abort the job if there are some network issues. For example, When we shrink the executors, the alive one will try to fetch data from the killed ones because it doesn't know the target location (prefetched from the driver) is dead until it tries to fetch data. The query might be hanging for a long time as the executor will retry
spark.shuffle.io.maxRetries=3times and for each retry wait forspark.shuffle.io.connectionTimeout(default value is 120s) before timeout. In total, the task could be hanging for about 6 minutes. And the retried or speculative tasks won't be able to acquire the lock in this period.Making lock acquisition happen after retrieving the input iterator should be able to avoid this situation.
What changes were proposed in this pull request?
Making lock acquisition happen after retrieving the input iterator.
Why are the changes needed?
Avoid the failure like the following when there is a network issue
Does this PR introduce any user-facing change?
No
How was this patch tested?
existing UT should be good enough