-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-31238] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) #24031
[FLINK-31238] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) #24031
Conversation
for (Pair<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> entry : | ||
exportedCFAndMetaData) { | ||
ExportImportFilesMetaData cfMetaData = entry.getValue(); | ||
// TODO: method files() doesn't exist in the RocksDB API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mayuehappy Is it correct to remove this code? I could not find a files() method in the RocksDB API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for rebasing the PR!
I've left some comments, PTAL.
Besides that, I think it makes sense to test the change for correctness using ITCase randomization, WDYT?
(see TestStreamEnvironment.randomizeConfiguration
)
...ksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
Outdated
Show resolved
Hide resolved
...nd-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
Outdated
Show resolved
Hide resolved
...t/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
Show resolved
Hide resolved
...ava/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
Outdated
Show resolved
Hide resolved
...nd-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
Outdated
Show resolved
Hide resolved
...nd-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
Outdated
Show resolved
Hide resolved
...nd-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
Outdated
Show resolved
Hide resolved
...t/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
Show resolved
Hide resolved
@rkhachatryan Can you take one more look? I have fixed all problems that I encountered in the original PR and also addressed your comments. Using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR @StefanRRichter,
LGTM in general.
Regarding randomised testing,
Using TestStreamEnvironment.randomizeConfiguration seems a bit problematic because the RocksDB config keys are not visible in that module.
Do I understand correctly, that adding rocksdb module as a test dependency would create a cycle?
In that case, should we just use string constants to enable this feature randomly?
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing) | ||
.setUseIngestDbRestoreMode(useIngestDB); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that testCorrectMergeOperatorSet
sets up the backend independently of this method - we could also add setUseIngestDbRestoreMode
there.
if (exportedSstFiles != null && exportedSstFiles.length > 0) { | ||
resultOutput | ||
.computeIfAbsent(stateMetaInfo, (key) -> new ArrayList<>()) | ||
.add(cfMetaData); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else cfMetaData.close();
?
e3d0938
to
74505a9
Compare
74505a9
to
776790d
Compare
16393d3
to
d356083
Compare
One more thing, we should probably update the UI and show the new flag under |
620593b
to
3818b94
Compare
49b417f
to
720e6a8
Compare
@@ -488,6 +495,8 @@ private void rescaleClipIngestDB( | |||
List<ColumnFamilyHandle> tmpColumnFamilyHandles = | |||
tmpRestoreDBInfo.columnFamilyHandles; | |||
|
|||
// Check if the data in all SST files referenced in the handle is within the | |||
// proclaimed key-groups range of the handle. | |||
if (RocksDBIncrementalCheckpointUtils.isSstDataInKeyGroupRange( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we need to check
Is key in proclaimed range or is there overlap between the checking stateHandles?
For example, the range of proclaimed is [1,5] [6,10] but the actual range is [1,7] [8,9]. Should it be possible to export in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible to import the case in your example, but the cost of detecting the case is that we need to compare all handles against each other which we can only do after all DBs have been opened. I'm excluding this case on purpose to avoid it, because I also assume that this missed opportunities will be very very rare. In particular because subtasks over time will compact their data to the proclaimed range from normal compaction activity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why I say it's rare because it means that while the handle [6,10] was created by a task with range [6,10] we never wrote a key from that falls into key-group 6 or 7 if the actual range is just [8, 9]. Because of the hash partitioning nature, that is highly unlikely if the task didn't idle the whole time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha , that makes sense to me
// if we have remaining handles to restore, we will insert by copy with from temporary | ||
// instances to base DB. | ||
// if we have remaining unopened handles to restore, we will insert by copy via | ||
// temporary instances to base DB. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the old code, we called choseTheBestStateHandleForInitial
to choose the best state handle to init the initial db .Because if we use TheBestHandle as a temporary DB instead of the main DB, we may need to write a lot of data when copying. Can we maintain this logic int the new code ?
When choosing db to export, we prioritize TheBestHandle
to ensure that we will not copy TheBestHandle during the subsequent copying phase. Can this ensure that there is no regression compared to the old code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm already working on adding that code again here.
stateHandle); | ||
// Use Range delete to clip the temp db to the target range of the backend | ||
RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another point is, if these handles do not overlaps, can we first import them before clip them with deleteRange.
Because in current code, first we deleteRange and then exports, each DB will generate a new small file containing the RangeDeletion tombstone during export. So can we deleteRange together after importing, so that we can reduce the number of small files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to try that. Your code was doing the delete before they export, so I was also wondering about this but didn't bother to change. Let me try that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the code accordingly. It didn't work at first because the import code didn't add the ColumnFamilyHandles to the DbHandle. Now it seems to work :)
8c90c30
to
30f4b15
Compare
30f4b15
to
4e09aa0
Compare
… is available. Then this commit should be reverted.
4e09aa0
to
0a33220
Compare
…local and downloaded remote state).
6d84dec
to
6d68d32
Compare
6d68d32
to
05a7a0d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broadly speaking LGTM % other reviewers' open issues.
Rebased and slightly refactored version of #23169.