Skip to content

KAFKA-12964: Collect and rename snapshot files prior to async deletion.#10896

Merged
junrao merged 6 commits intoapache:trunkfrom
gardnervickers:KAFKA-12964
Jul 1, 2021
Merged

KAFKA-12964: Collect and rename snapshot files prior to async deletion.#10896
junrao merged 6 commits intoapache:trunkfrom
gardnervickers:KAFKA-12964

Conversation

@gardnervickers
Copy link
Contributor

Segment and index files are currently renamed with a .deleted
suffix prior to async deletion. This serves two purposes, to
resume deletion on broker failure and also protect against
deletion of new segments during truncation (due to deletion
being async).

We should do the same for snapshot files. While they are not subject
to issues around resuming deletion due to the stray snapshot
scanning which is performed on log initialization, we can end up
with situations where truncation queues snapshots for deletion, but
prior to deletion new segments with the same snapshot file name are
created. Async deletion can then delete these new snapshots.

This patch offers a two-stage snapshot deletion which first renames
and removes the segments in question from the ProducerStateManager,
allowing the Log to asynchronously delete them.

Credit to Kowshik Prakasam kowshik@gmail.com for finding this issue
and creating the test demonstrating the failure.

Co-authored-by: Kowshik Prakasam kowshik@gmail.com

Segment and index files are currently renamed with a .deleted
suffix prior to async deletion. This serves two purposes, to
resume deletion on broker failure and also protect against
deletion of new segments during truncation (due to deletion
being async).

We should do the same for snapshot files. While they are not subject
to issues around resuming deletion due to the stray snapshot
scanning which is performed on log initialization, we can end up
with situations where truncation queues snapshots for deletion, but
prior to deletion new segments with the same snapshot file name are
created. Async deletion can then delete these new snapshots.

This patch offers a two-stage snapshot deletion which first renames
and removes the segments in question from the ProducerStateManager,
allowing the Log to asynchronously delete them.

Credit to Kowshik Prakasam <kowshik@gmail.com> for finding this issue
and creating the test demonstrating the failure.

Co-authored-by: Kowshik Prakasam <kowshik@gmail.com>
Copy link
Contributor

@kowshik kowshik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers Thanks for the PR! LGTM. I have added few comments.

Copy link
Contributor

@kowshik kowshik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers Thanks for the PR! LGTM. Just few comments below.

private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = {
Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
// If the file cannot be renamed, it likely means that the file was deleted already.
// This can happen due to the way we construct an intermediate producer state manager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure I understood this comment. Looking at the LogLoader code, it doesn't appear that we use the intermediate producer state manager to issue async deletions of snapshot files.

So, is it still possible that a missing file is a valid case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Kowshik. It's not clear if the comment is still valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao @kowshik I accidentally included the word async here, the deletion performed by the intermediate ProducerStateManager is done synchronously.

I'm referring to the case where we go through LogLoader.recoverSegment. We construct a new "intermediate" ProducerStateManager for segment recovery which is separate from the "real" ProducerStateManager captured in LoadLogParams.

Segment recovery can use the intermediate ProducerStateManager to truncate snapshot files via ProducerStateManager.truncateAndReload in Log.rebuildProducerState. The SnapshotFile instances will be removed from the in-memory map for the "intermediate" ProducerStateManager in this case, but will remain for the "real" ProducerStateManager captured in LoadLogParams.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers : Good point. It seems that LogLoader.recoverSegment() can both remove and add snapshots, both of which will be missing in the "real" ProducerStateManager captured in LoadLogParams. This can lead to the missing file issue you pointed out and also potentially cause LogLoad.load() to do an unnecessary expensive Log.rebuildProducerState().

@kowshik : I am wondering if we should let LoadLoader reload the snapshots in the "real" ProducerStateManager before calling Log.rebuildProducerState() in LogLoad.load().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers : @kowshik mentioned that params.producerStateManager.removeStraySnapshots(params.segments.baseOffsets.toSeq)
in LogLoad.load() actually reloads the snapshots after log recovery. So, it seems that the issue you mentioned may not be a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kowshik @junrao Right, we'll still end up with a a fully in-sync ProducerStateManager after LogLoader.load(..) runs.

The problem can still occur though because we delete snapshots using both the "intermediate" and "real" ProducerStateManager prior to removeStraySnapshots at the end of LogLoader.load.

  1. recoverSegment can delete snapshots with the intermediate ProducerStateManager
  2. removeAndDeleteSegmentsAsync will use the "real" ProducerStateManager to schedule async deletion. It may have a stale view of the present snapshots on the filesystem if Switch to using scala 2.9.2 #1 deleted snapshots, causing the rename to fail.
  3. At the end of LogLoader.load, we will removeStraySnapshots, which will fix up any discrepancies between the contents of the log dir and the "real" ProducerStateManager.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets assume PSM refers to ProducerStateManager.

@junrao @gardnervickers That feels right to me, thanks for the explanation! Couple things I wanted to ask:

  1. Should we update the comment here to say:
// Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used
// during log recovery may have created or deleted some snapshots
// without the LoadLogParams.producerStateManager instance witnessing the changes.
  1. PSM.removeStraySnapshots and its params could have a better name. Should we call it differently, like PSM.reloadEssentialSnapshots(essentialSegmentBaseOffsets: Seq[Long])?

=== SUMMARY OF CASES ===

I thought it's useful to summarize. There are few different cases that arise wheneverPSM.removeAndMarkSnapshotForDeletion() is invoked on the "real" PSM instance. I believe all cases are handled with the current code as explained below:

Straightforward cases:

  1. Snapshot entry is present in real PSM instance and snapshot file is present. This is a straightforward case where we remove the entry and rename the file.
  2. Snapshot entry is absent in real PSM instance and snapshot file is absent. This is also a more straightforward case where we do nothing.

Corner cases:

  1. Snapshot entry is present in the real PSM instance, but snapshot file absent. This can happen because intermediate PSM deleted the snapshot file. In this case, we ignore the failure in the file rename.
  2. Snapshot entry is absent in the real PSM instance, but snapshot file present. This can happen when intermediate PSM takes a snapshot, but the real PSM doesn't have the entry (yet). This is handled by the call to PSM.removeStraySnapshots here which corrects such discrepancies by loading all snapshots from disk and eliminating those that don't match the list of segment base offsets post recovery.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers : Thanks for the explanation. Makes sense.

@kowshik : The source of all the confusing is that we use the real PSM in some cases while using a temporary PSM in some other cases during recovery. The temporary PSM in recoverSegment() is used in 4 different places.

  1. In recoverLog(). this is the case that we could just pass in the real PSM.
  2. In completeSwapOperations(). We try to avoid recovering segment here in KAFKA-12520: Ensure log loading does not truncate producer state unless required #10763.
    3 and 4. In loadSegmentFiles(). We probably need to clean this part of the logic a bit. If we are missing index file or the index file is corrupted, typically we can just rebuild the index without changing PSM. If the segment is truncated while rebuilding the index, we actually want to follow the process in step 1, by just removing the rest of the segments. So, we could also get rid of the temporary PSM in this case.

I am wondering if we could have a separate PR to get rid of the temporary PSM complete?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao That's a good point. Yes, we should get rid of the temporary PSM. I've created a jira tracking this improvement: https://issues.apache.org/jira/browse/KAFKA-12977. It is currently assigned to myself and I'll follow up on it.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers : Thanks for the PR. Just a couple of minor comments.

private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = {
Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
// If the file cannot be renamed, it likely means that the file was deleted already.
// This can happen due to the way we construct an intermediate producer state manager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Kowshik. It's not clear if the comment is still valid.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers : The PR itself looks good to me. Just one more comment on the issue that you pointed out.

private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = {
Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
// If the file cannot be renamed, it likely means that the file was deleted already.
// This can happen due to the way we construct an intermediate producer state manager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers : Good point. It seems that LogLoader.recoverSegment() can both remove and add snapshots, both of which will be missing in the "real" ProducerStateManager captured in LoadLogParams. This can lead to the missing file issue you pointed out and also potentially cause LogLoad.load() to do an unnecessary expensive Log.rebuildProducerState().

@kowshik : I am wondering if we should let LoadLoader reload the snapshots in the "real" ProducerStateManager before calling Log.rebuildProducerState() in LogLoad.load().

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers : One further comment on the previous issue.

private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = {
Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
// If the file cannot be renamed, it likely means that the file was deleted already.
// This can happen due to the way we construct an intermediate producer state manager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers : @kowshik mentioned that params.producerStateManager.removeStraySnapshots(params.segments.baseOffsets.toSeq)
in LogLoad.load() actually reloads the snapshots after log recovery. So, it seems that the issue you mentioned may not be a problem?

Copy link
Contributor

@kowshik kowshik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers Thanks for the updated PR! Just few more comments.

private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = {
Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
// If the file cannot be renamed, it likely means that the file was deleted already.
// This can happen due to the way we construct an intermediate producer state manager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets assume PSM refers to ProducerStateManager.

@junrao @gardnervickers That feels right to me, thanks for the explanation! Couple things I wanted to ask:

  1. Should we update the comment here to say:
// Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used
// during log recovery may have created or deleted some snapshots
// without the LoadLogParams.producerStateManager instance witnessing the changes.
  1. PSM.removeStraySnapshots and its params could have a better name. Should we call it differently, like PSM.reloadEssentialSnapshots(essentialSegmentBaseOffsets: Seq[Long])?

=== SUMMARY OF CASES ===

I thought it's useful to summarize. There are few different cases that arise wheneverPSM.removeAndMarkSnapshotForDeletion() is invoked on the "real" PSM instance. I believe all cases are handled with the current code as explained below:

Straightforward cases:

  1. Snapshot entry is present in real PSM instance and snapshot file is present. This is a straightforward case where we remove the entry and rename the file.
  2. Snapshot entry is absent in real PSM instance and snapshot file is absent. This is also a more straightforward case where we do nothing.

Corner cases:

  1. Snapshot entry is present in the real PSM instance, but snapshot file absent. This can happen because intermediate PSM deleted the snapshot file. In this case, we ignore the failure in the file rename.
  2. Snapshot entry is absent in the real PSM instance, but snapshot file present. This can happen when intermediate PSM takes a snapshot, but the real PSM doesn't have the entry (yet). This is handled by the call to PSM.removeStraySnapshots here which corrects such discrepancies by loading all snapshots from disk and eliminating those that don't match the list of segment base offsets post recovery.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers : Thanks for the explanation. Makes sense. One more comment below.

private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = {
Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
// If the file cannot be renamed, it likely means that the file was deleted already.
// This can happen due to the way we construct an intermediate producer state manager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers : Thanks for the explanation. Makes sense.

@kowshik : The source of all the confusing is that we use the real PSM in some cases while using a temporary PSM in some other cases during recovery. The temporary PSM in recoverSegment() is used in 4 different places.

  1. In recoverLog(). this is the case that we could just pass in the real PSM.
  2. In completeSwapOperations(). We try to avoid recovering segment here in KAFKA-12520: Ensure log loading does not truncate producer state unless required #10763.
    3 and 4. In loadSegmentFiles(). We probably need to clean this part of the logic a bit. If we are missing index file or the index file is corrupted, typically we can just rebuild the index without changing PSM. If the segment is truncated while rebuilding the index, we actually want to follow the process in step 1, by just removing the rest of the segments. So, we could also get rid of the temporary PSM in this case.

I am wondering if we could have a separate PR to get rid of the temporary PSM complete?

Copy link
Contributor

@kowshik kowshik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers Thanks for the PR! LGTM. Just few minor comments below.

}
}
assertTrue(offsetsWithMissingSnapshotFiles.isEmpty,
s"Found offsets with missing producer state snapshot files: $offsetsWithMissingSnapshotFiles")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to check that there are no .deleted files in the log dir at the end of this test?

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers : Thanks for the updated PR. LGTM. Do you want to address the remaining comments from Kowshik?

@gardnervickers
Copy link
Contributor Author

Yes thanks @junrao. @kowshik please let me know if you think any other changes are necessary.

@kowshik
Copy link
Contributor

kowshik commented Jul 1, 2021

@gardnervickers Thanks for the updated PR! LGTM.

@junrao junrao merged commit 789fc26 into apache:trunk Jul 1, 2021
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…n. (apache#10896)

Segment and index files are currently renamed with a .deleted
suffix prior to async deletion. This serves two purposes, to
resume deletion on broker failure and also protect against
deletion of new segments during truncation (due to deletion
being async).

We should do the same for snapshot files. While they are not subject
to issues around resuming deletion due to the stray snapshot
scanning which is performed on log initialization, we can end up
with situations where truncation queues snapshots for deletion, but
prior to deletion new segments with the same snapshot file name are
created. Async deletion can then delete these new snapshots.

This patch offers a two-stage snapshot deletion which first renames
and removes the segments in question from the ProducerStateManager,
allowing the Log to asynchronously delete them.

Credit to Kowshik Prakasam <kowshik@gmail.com> for finding this issue
and creating the test demonstrating the failure.

Co-authored-by: Kowshik Prakasam <kowshik@gmail.com> Address PR feedback

Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants