KAFKA-12520: Ensure log loading does not truncate producer state unless required#10763
KAFKA-12520: Ensure log loading does not truncate producer state unless required#10763junrao merged 26 commits intoapache:trunkfrom
Conversation
| // Check whether swap index files exist: if not, the cleaned files must exist due to the | ||
| // existence of swap log file. Therefore, we rename the cleaned files to swap files and continue. | ||
| var recoverable = true | ||
| val swapOffsetIndexFile = Log.offsetIndexFile(swapFile.getParentFile, baseOffset, Log.SwapFileSuffix) |
There was a problem hiding this comment.
Is it possible to write something like the following in Scala?
Vector(Log.offsetIndexFile, Log.timeIndexFile, Log.transactionIndexFile).foreach{
fn => {
swapIndexFile = fn(swapFile.getParentFile, baseOffset, Log.SwapFileSuffix)
if (!swapIndexFile.exists()) {
// ...
}
// other things
}
}There was a problem hiding this comment.
You could perhaps define a method like this:
def maybeCompleteInterruptedSwap(fn: (File, Long, String) => File): Boolean = {
val swapIndexFile = fn(swapFile.getParentFile, baseOffset, Log.SwapFileSuffix)
if (!swapIndexFile.exists()) {
val cleanedIndexFile = fn(swapFile.getParentFile, baseOffset, Log.CleanedFileSuffix)
if (cleanedIndexFile.exists()) {
cleanedIndexFile.renameTo(swapIndexFile)
true
} else {
false
}
}
}
and then invoke it as
var recoverable = true
recoverable = maybeCompleteInterruptedSwap(Log.offsetIndexFile)
if (recoverable)
recoverable = maybeCompleteInterruptedSwap(Log.timeIndexFile)
if (recoverable)
recoverable = maybeCompleteInterruptedSwap(Log.transactionIndexFile)
| } | ||
| } | ||
| info(s"${params.logIdentifier}Found log file ${swapFile.getPath} from interrupted swap operation, which is not recoverable from ${Log.CleanedFileSuffix} files, repairing.") | ||
| recoverSegment(swapSegment, params) |
There was a problem hiding this comment.
The main thing we want to avoid is running this recovery logic for scenarios where the rename operation was interrupted, as it rebuilds the producer state from scratch. Could we make this recovery conditional on whether we have all the relevant log files and indices?
| time = params.time, | ||
| fileSuffix = Log.SwapFileSuffix) | ||
| info(s"${params.logIdentifier}Found log file ${swapFile.getPath} from interrupted swap operation, repairing.") | ||
| if (recoverable) { |
There was a problem hiding this comment.
Could you elaborate a bit on what this block of code is doing?
There was a problem hiding this comment.
The whole logic is that, if the segment.swap file exists, then all index files should exist as .cleaned or .swap. We find them and rename them to .swap [before this block of code]. Then do a sanity check and rename all the .swap files to non-suffix log files [within this block of code].
This could fix the issue caused by the compaction as we discussed before.
For all other cases, I think it is in an inconsistent state and we will have to do the original recovery.
Does this make sense to you?
| swapSegment.sanityCheck(true) | ||
| info(s"Found log file ${swapFile.getPath} from interrupted swap operation, which is recoverable from ${Log.CleanedFileSuffix} files.") | ||
| swapSegment.changeFileSuffixes(Log.SwapFileSuffix, "") | ||
| return |
There was a problem hiding this comment.
return might be wrong. Didn't realize it was within a for loop. Maybe continue instead.
There was a problem hiding this comment.
Would be good to avoid return or continue and instead make the call to recoverSegment conditional so that the code is easier to read.
|
|
||
| // Check whether swap index files exist: if not, the cleaned files must exist due to the | ||
| // existence of swap log file. Therefore, we rename the cleaned files to swap files and continue. | ||
| val recoverable = maybeCompleteInterruptedSwap(Log.offsetIndexFile) && |
There was a problem hiding this comment.
recoverable sounds a bit incorrect in this context, given that we actually end up calling recoverSegment when recoverable == false.
Perhaps we could call this something like needsRecovery which is set to true if we do not find the relevant index files or when the sanity check fails.
| swapSegment.sanityCheck(true) | ||
| info(s"Found log file ${swapFile.getPath} from interrupted swap operation, which is recoverable from ${Log.CleanedFileSuffix} files.") | ||
| swapSegment.changeFileSuffixes(Log.SwapFileSuffix, "") | ||
| return |
There was a problem hiding this comment.
Would be good to avoid return or continue and instead make the call to recoverSegment conditional so that the code is easier to read.
| } | ||
|
|
||
| @Test | ||
| def testRecoveryRebuildsIndices(): Unit = { |
There was a problem hiding this comment.
Would be good to enumerate different cases that we could run into during recovery and ensure we have coverage for them. eg.
- All
.swapfiles are present. We should validate that producer state is not rebuilt in this case. - Some
.swapand some.cleanfiles are present. - All
.cleanfiles are present. - One of the index files was not found when completing the swap operation, which triggers a full recovery and rebuild of producer state.
|
@dhruvilshah3 addressed your comments. Will work on the tests later today. Please take a look |
|
@dhruvilshah3 @junrao This PR is ready for review. Please take a look The above function tests all the possible cases, thus we don't need additional tests. One thing I am not sure about is how to test whether certain recovery goes the renaming path or recovery path. Current test cases only validate the results are correct. If you have any ideas, please let me know. |
|
added two tests: they should cover all the cases around file renaming during compaction |
| }) | ||
|
|
||
| completeSwapOperations(swapFiles, params) | ||
| // Do the actual recovery for toRecoverSwapFiles, as discussed above. |
There was a problem hiding this comment.
Hmm, I am not sure why we need this step. We have processed all .swap files before and no new .swap files should be introduced if we get to here.
There was a problem hiding this comment.
you are right if we don't need to do sanity checks. Removed this
| * | ||
| * @param params The parameters for the log being loaded from disk | ||
| * @return Set of .swap files that are valid to be swapped in as segment files | ||
| * @return Set of .swap files that are valid to be swapped in as segment files and index files |
There was a problem hiding this comment.
The PR descriptions says "as a result, if at least one .swap file exists for a segment, all other files for the segment must exist as .cleaned files or .swap files. Therefore, we rename the .cleaned files to .swap files, then make them normal segment files.". Are we implementing the renaming of .clean files to .swap files?
There was a problem hiding this comment.
No, we are not renaming .cleaned files to .swap files due to KAFKA-6264. I forgot to update the description of the PR. Just updated it: please see the updated one.
| val swapFiles = removeTempFilesAndCollectSwapFiles(params) | ||
|
|
||
| // Now do a second pass and load all the log and index files. | ||
| // The remaining valid swap files must come from compaction operation. We can simply rename them |
There was a problem hiding this comment.
It seems that those swap files could be the result of segment split too?
There was a problem hiding this comment.
are you concerned about the logic or the comment? If comment only, I fixed it.
| time = params.time, | ||
| fileSuffix = Log.SwapFileSuffix) | ||
| try { | ||
| segment.sanityCheck(false) |
There was a problem hiding this comment.
It doesn't seem we need this since we call segment.sanityCheck() on all segments later in loadSegmentFiles().
|
@junrao Thanks for the review. Addressed your comments. |
| toRenameSwapFiles += f | ||
| info(s"${params.logIdentifier}Found log file ${f.getPath} from interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} files by renaming.") | ||
| minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset) | ||
| maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, maxSwapFileOffset) |
There was a problem hiding this comment.
This is an existing problem. Calculating the end offset that a segment covers can be tricky. The problem is that in compaction, we remove records in the .clean and .swap files. So, the offset of the last record in a segment doesn't tell us the true end offset of the original segment.
One possibility is to use the base offset of the next segment if present.
There was a problem hiding this comment.
How can we get the next segment before finishing the recovery process?
There was a problem hiding this comment.
I could be wrong, but I think if it is compaction, the last record will never be removed. The reason is that compaction always removes earlier records of each key, and the last record will never be an earlier one.
Split should be similar.
There was a problem hiding this comment.
I could be wrong, but I think if it is compaction, the last record will never be removed. The reason is that compaction always removes earlier records of each key, and the last record will never be an earlier one.
Split should be similar.
It's true that we generally don't remove the last record during compaction. However, during a round of cleaning, we clean segments in groups and each group generates a single .clean file. The group is formed to make sure that offsets are still within 2 billion in offset gap and the .clean file won't exceed 2GB in size. If multiple groups are formed, it's possible that a group that's not the last doesn't preserve the last record.
How can we get the next segment before finishing the recovery process?
We could potentially scan all .log files and sort them in offset order.
There was a problem hiding this comment.
segment.offsetIndex.lastOffset doesn't give the exact last offset in a segment since the index is sparse. We need to use segment.nextOffset().
| } | ||
|
|
||
| // Second pass: delete segments that are between minSwapFileOffset and maxSwapFileOffset. As | ||
| // discussed above, these segments were compacted but haven't been renamed to .delete before |
There was a problem hiding this comment.
The swap files can also be created during splitting.
| }) | ||
|
|
||
| completeSwapOperations(swapFiles, params) | ||
| // Forth pass: rename remaining index swap files. They must be left due to a broker crash when |
There was a problem hiding this comment.
Hmm, not sure why we still have swap files at the point. We have renamed all existing swap files and no new swap files are created.
There was a problem hiding this comment.
We have renamed all .log.swap files and their corresponding index swap files. If there is a single .index.swap file, it is not renamed previously in the recovery process. A single .index.swap file could happen if it crashed in the middle of this line:
kafka/core/src/main/scala/kafka/log/Log.scala
Line 2381 in bd668e9
There was a problem hiding this comment.
Should we rename all .swap files in https://github.com/apache/kafka/pull/10763/files#diff-54b3df71b1e0697a211d23a9018a91aef773fca0b9cbd1abafbdca6c79664930R138 no matter the file is in toRenameSwapFiles or not? I am not sure if we have to put the retryOnOffsetOverflow call between renaming files in toRenameSwapFiles and renaming the rest .swap files.
There was a problem hiding this comment.
I guess we don't need to put retryOnOffsetOverflow call in between. Removed the toRenameSwapFiles variable and combined the renaming in 76c197c
| toRenameSwapFiles += f | ||
| info(s"${params.logIdentifier}Found log file ${f.getPath} from interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} files by renaming.") | ||
| minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset) | ||
| maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, maxSwapFileOffset) |
There was a problem hiding this comment.
segment.offsetIndex.lastOffset doesn't give the exact last offset in a segment since the index is sparse. We need to use segment.nextOffset().
| try { | ||
| if (!file.getName.endsWith(SwapFileSuffix)) { | ||
| val offset = offsetFromFile(file) | ||
| if (offset >= minSwapFileOffset && offset <= maxSwapFileOffset) { |
There was a problem hiding this comment.
If we use segment.nextOffset() to calculate maxSwapFileOffset, it's exclusive.
| // Now do a second pass and load all the log and index files. | ||
| // The remaining valid swap files must come from compaction or segment split operation. We can | ||
| // simply rename them to regular segment files. But, before renaming, we should figure out which | ||
| // segments are compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. |
There was a problem hiding this comment.
"which segments are compacted": .swap files are also generated from splitting.
| deleteIndicesIfExist(baseFile) | ||
| swapFiles += file | ||
| } | ||
| swapFiles += file |
There was a problem hiding this comment.
It's possible that during renaming, we have only renamed the .log file to .swap, but not the corresponding index files. Should we find those .clean files with the same offset and rename them to .swap?
There was a problem hiding this comment.
Due to KAFKA-6264, if there are any .cleaned files (no matter they are .index.cleaned or .log.cleaned), we delete all .cleaned files and .swap files that have larger/equal base offsets. Basically, this reverts ongoing compaction/split operations. Therefore, we don't have any additional .index.cleaned files.
Is that fair?
There was a problem hiding this comment.
Thanks for the explanation. Make sense.
|
@junrao Thanks for the review. I have addressed the comments. Please take a look |
| // 5) Simulate recovery after a subset of swap files are renamed to regular files and old segments files are renamed | ||
| // to .deleted. Clean operation is resumed during recovery. | ||
| log.logSegments.head.timeIndex.file.renameTo(new File(CoreUtils.replaceSuffix(log.logSegments.head.timeIndex.file.getPath, "", Log.SwapFileSuffix))) | ||
| // .changeFileSuffixes("", Log.SwapFileSuffix) |
There was a problem hiding this comment.
My bad. Forgot to delete this line.
…ss required (apache#10763) When we find a .swap file on startup, we typically want to rename and replace it as .log, .index, .timeindex, etc. as a way to complete any ongoing replace operations. These swap files are usually known to have been flushed to disk before the replace operation begins. One flaw in the current logic is that we recover these swap files on startup and as part of that, end up truncating the producer state and rebuild it from scratch. This is unneeded as the replace operation does not mutate the producer state by itself. It is only meant to replace the .log file along with corresponding indices. Because of this unneeded producer state rebuild operation, we have seen multi-hour startup times for clusters that have large compacted topics. This patch fixes the issue. With ext4 ordered mode, the metadata are ordered and no matter it is a clean/unclean shutdown. As a result, we rework the recovery workflow as follows. If there are any .cleaned files, we delete all .swap files with higher/equal offsets due to KAFKA-6264. We also delete the .cleaned files. If no .cleaned file, do nothing for this step. If there are any .log.swap files left after step 1, they, together with their index files, must be renamed from .cleaned and are complete (renaming from .cleaned to .swap is in reverse offset order). We rename these .log.swap files and their corresponding index files to regular files, while deleting the original files from compaction or segment split if they haven't been deleted. Do log splitting for legacy log segments with offset overflow (KAFKA-6264) If there are any other index swap files left, they must come from partial renaming from .swap files to regular files. We can simply rename them to regular files. credit: some code is copied from @dhruvilshah3 's PR: apache#10388 Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jun Rao <junrao@gmail.com>
When we find a .swap file on startup, we typically want to rename and replace it as .log, .index, .timeindex, etc. as a way to complete any ongoing replace operations. These swap files are usually known to have been flushed to disk before the replace operation begins.
One flaw in the current logic is that we recover these swap files on startup and as part of that, end up truncating the producer state and rebuild it from scratch. This is unneeded as the replace operation does not mutate the producer state by itself. It is only meant to replace the .log file along with corresponding indices. Because of this unneeded producer state rebuild operation, we have seen multi-hour startup times for clusters that have large compacted topics.
This patch fixes the issue. With ext4 ordered mode, the metadata are ordered and no matter it is a clean/unclean shutdown. As a result, we rework the recovery workflow as follows.
.cleanedfiles, we delete all.swapfiles with higher/equal offsets due to KAFKA-6264. We also delete the.cleanedfiles. If no.cleanedfile, do nothing for this step..log.swapfiles left after step 1, they, together with their index files, must be renamed from.cleanedand are complete (renaming from.cleanedto.swapis in reverse offset order). We rename these.log.swapfiles and their corresponding index files to regular files, while deleting the original files from compaction or segment split if they haven't been deleted..swapfiles to regular files. We can simply rename them to regular files.credit: some code is copied from @dhruvilshah3 's PR: #10388
Committer Checklist (excluded from commit message)