-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-12543: Change RawSnapshotReader ownership model #10431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
c7a0a5c to
c7567f2
Compare
|
@hachikuji @dengziming @mumrah This PR is ready for review. Thanks! |
dengziming
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor questions.
mumrah
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch @jsancio! A few questions and comments inline
| latestSnapshotId().asScala match { | ||
| case Some(snapshotId) if (snapshotId.epoch > latestEpoch || | ||
| (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) => | ||
| val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we grab the snapshots lock for this whole match expression like we do in deleteBeforeSnapshot? Is there possible a race between this block and deleteBeforeSnapshot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synchronizing snapshots is only needed when accessing that object. In deleteBeforeSnapshot it is grabbed because the match expression accesses snapshots in one of the case/branch.
In this method I think it is safe to only grab the log where we currently do.
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jsancio : Thanks for the PR. Just a few comments below.
| // This object needs to be thread-safe because it is used by the snapshotting thread to notify the | ||
| // polling thread when snapshots are created. | ||
| snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch], | ||
| snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the above comment still accurate since snapshots is no longer thread safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. I updated the comment. I'll push a commit tomorrow after a few other changes.
| try { | ||
| Utils.atomicMoveWithFallback(immutablePath, deletedPath, false); | ||
| } catch (IOException e) { | ||
| log.error("Error renaming snapshot file from {} to {}", immutablePath, deletedPath, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just fail the controller on IOException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mumrah suggested converting all of the IOException to UncheckedIOException. Kafka doesn't have a precedence of doing that but maybe we should do that going forward. I filed https://issues.apache.org/jira/browse/KAFKA-12773 but I'll change it here to re-throw instead of logging this message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By changing it to UncheckedIOExcpetion this will unwind the stack for the polling thread. Tomorrow, I'll look into how we handle that case but it may already shutdown the broker and controller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excuse the delay @junrao but I look into this in more detail today. I changed this code to throw an exception instead. This exception will be unhandled by the KafkaRaftClient polling thread in both the broker and controller. This will cause the thread to terminate but I don't think it will cause the JVM process to terminate.
We have the following Jira to revisit our exception handling: https://issues.apache.org/jira/browse/KAFKA-10594. I added a comment there to document the issue you highlighted here. Do you mind if we tackle this problem holistically in that Jira?
| try { | ||
| fileRecords.close(); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw KafkaStorageException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure. I could use some guidance here. I read the documentation for KafkaStorageException: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java#L19-L30. It looks like Kafka uses KafkaStorageException if the IO is visible to the client.
On the server (broker and controller) this code will be called async by the same scheduler used for deleting log segments. In that case CoreUtils.swallow is used which logs a WARN message. I think we should do the same here.
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mumrah
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Agree that we should revisit the exception handling later on.
Kafka networking layer doesn't close
FileRecordsand assumes that they are already open when sending them over a channel. To support this pattern this commit changes the ownership model forFileRawSnapshotReaderso that they are owned byKafkaMetadataLog. This includes:Changing
KafkaMetadataLog'ssnapshotIdsform aSet[OffsetAndEpoch]to aTreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]]. This map contains all of the known snapshots. The value will beSomeif a snapshot reader has been opened in the past.Split and change the functionality in
KafkaMetadataLog::removeSnapshotFilesBeforeso that a)forgetSnapshotsBeforeremoves any snapshot less that the given snapshot id fromsnapshots; b)removeSnapshotsdeletes the enumerated snapshots fromforgetSnapshotsBefore.Change the interface
RawSnapshotReaderto not extendCloseablesince onlyKafkaMetadataLogis responsible for closing snapshots.FileRawSnapshotReaderimplementsAutoCloseable.Fixed the implementation of
handleFetchSnapshotRequestinKafkaRaftClientso thatRawSnapshotReaderand the associatedFileRecordsare not close before sending to the network layer.More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)