-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-48931][SS] Reduce Cloud Store List API cost for state store maintenance task #47393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48931][SS] Reduce Cloud Store List API cost for state store maintenance task #47393
Conversation
049e13f to
8a8e4c4
Compare
8a8e4c4 to
9737bec
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
Outdated
Show resolved
Hide resolved
| .doc("The minimum number of stale versions to delete when maintenance is invoked.") | ||
| .version("2.1.1") | ||
| .intConf | ||
| .createWithDefault(30) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an HDFS user where I care more about number of files than number of list calls, this would have been a little surprising and confusing to be the new default behavior when I upgraded. Should this default to 1? That way by default you at least don't waste time looking for files to delete if you haven't even written a new version since the last time you checked, but still respects minBatchesToRetain as a tighter bound
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only apply to rocksdb state store.
You can change minBatchesToRetain to have the same size of checkpoint.
Actually, we can consider having a smaller minBatchesToRetain default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm not really sure why the default minBatchesToRetain is so high, we set it to 3 in our jobs hah. I would likely set this new config to 1 if that's not the default, I'm just not sure how many other rocksdb state users would be impacted by effectively maintaining more state store versions by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, setting min retained versions to 100 is to be super conservative :)
There is concern about increasing storage cost because of the new change. WDYT? @HeartSaVioR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minBatchesToRetain is a guaranteed lower bound of the number of retained batches, but it has been one side guarantee. This config is to leverage the other side which we do not guarantee to loose more, so that we make trade-off between number of files vs API cost.
I think this default value suggested here (30) is in line with the default value of minBatchesToRetain (100). If they don't mind retaining 100 batches they also wouldn't mind retaining 130-140 batches to reduce down the overhead and cost. For someone who really cares to minimize the number of files, they should have changed the config for minBatchesToRetain, which they could also change the config on this.
(Btw, I agree that minBatchesToRetain to be 100 by default hadn't been making sense, but with state data source reader, it figures out at least one of usages. I don't have strong opinion whether the default value has to be enforced to support this use case though.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if we consider this as important behavioral change, we could set the default value to be effectively no-op and document how to config this instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree the default of 30 is inline with the default 100 batches to retain. I guess the state reader will provide some use case to maintaining so many batches, don't understand what previous use case would have made sense to store that many batches, since you would have to do some manual checkpoint surgery to try to rollback.
I don't have a strong preference either way, just not sure how many other people would be in my boat or how unique my use case is (large aggregations and deduping with batches every few hours). Either way would just be good to document in case others are surprised at an average increase in state store size
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
99591eb to
3084710
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Show resolved
Hide resolved
6c639a8 to
c7a4a78
Compare
| def deleteOldVersions(numVersionsToRetain: Int, minVersionsToDelete: Long = 0): Unit = { | ||
| // If minVersionsToDelete <= 0, we call list every time maintenance is invoked | ||
| // This is the original behaviour without list api call optimization | ||
| if (minVersionsToDelete > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this part to a helper function? deleteOldVersions is already a too long function
siying
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new option looks good to me. I'll defer to others to review the implementation.
|
Can you also update the pr description about the new conf? |
bogao007
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
5488886 to
a639da2
Compare
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
Thanks! Merging to master. |
|
@riyaverm-db |
|
@HeartSaVioR Updated the migration doc here. #47507 |
…intenance task ### What changes were proposed in this pull request? Currently, during the state store maintenance process, we find which old version files of the **RocksDB** state store to delete by listing all existing snapshotted version files in the checkpoint directory every 1 minute by default. The frequent list calls in the cloud can result in high costs. To address this concern and reduce the cost associated with state store maintenance, we should aim to minimize the frequency of listing object stores inside the maintenance task. To minimize the frequency, we will try to accumulate versions to delete and only call list when the number of versions to delete reaches a configured threshold. The changes include: 1. Adding new configuration variable `ratioExtraVersionsAllowedInCheckpoint` in **SQLConf**. This determines the ratio of extra versions files we want to retain in the checkpoint directory compared to number of versions to retain for rollbacks (`minBatchesToRetain`). 2. Using this config and `minBatchesToRetain`, set `minVersionsToDelete` config inside **StateStoreConf** to `minVersionsToDelete = ratioExtraVersionsAllowedInCheckpoint * minBatchesToRetain.` 3. Using `minSeenVersion` and `maxSeenVersion` variables in **RocksDBFileManager** to estimate min/max version present in directory and control deletion frequency. This is done by ensuring number of stale versions to delete is at least `minVersionsToDelete` ### Why are the changes needed? Currently, maintenance operations like snapshotting, purging, deletion, and file management is done asynchronously for each data partition. We want to shift away periodic deletion and instead rely on the estimated number of files in the checkpoint directory to reduce list calls and introduce batch deletion. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47393 from riyaverm-db/reduce-cloud-store-list-api-cost-in-maintenance. Authored-by: Riya Verma <riya.verma@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…e store maintenance task ### What changes were proposed in this pull request? Updating migration doc for #47393 ### Why are the changes needed? Better visibility of the change. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? No Closes #47507 from riyaverm-db/update-migration-doc. Authored-by: Riya Verma <riya.verma@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
Currently, during the state store maintenance process, we find which old version files of the RocksDB state store to delete by listing all existing snapshotted version files in the checkpoint directory every 1 minute by default. The frequent list calls in the cloud can result in high costs. To address this concern and reduce the cost associated with state store maintenance, we should aim to minimize the frequency of listing object stores inside the maintenance task. To minimize the frequency, we will try to accumulate versions to delete and only call list when the number of versions to delete reaches a configured threshold.
The changes include:
ratioExtraSpaceAllowedInCheckpointin SQLConf. This determines the ratio of extra versions files we want to retain in the checkpoint directory compared to number of versions to retain for rollbacks (minBatchesToRetain).minBatchesToRetain, setminVersionsToDeleteconfig inside StateStoreConf tominVersionsToDelete = ratioExtraVersionsAllowedInCheckpoint * minBatchesToRetain.minSeenVersionandmaxSeenVersionvariables in RocksDBFileManager to estimate min/max version present in directory and control deletion frequency. This is done by ensuring number of stale versions to delete is at leastminVersionsToDeleteWhy are the changes needed?
Currently, maintenance operations like snapshotting, purging, deletion, and file management is done asynchronously for each data partition. We want to shift away periodic deletion and instead rely on the estimated number of files in the checkpoint directory to reduce list calls and introduce batch deletion.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit tests.
Was this patch authored or co-authored using generative AI tooling?
No.