-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14482: Move LogLoader to storage module #17042
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mimaison thanks for nice rewriting.
@@ -1775,7 +1776,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, | |||
producerExpireCheck.cancel(true) | |||
leaderEpochCache.foreach(_.clear()) | |||
val deletedSegments = localLog.deleteAllSegments() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe deleteAllSegments
can return java collection directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
* @param function The function to be executed. | ||
* @return The value returned by the function after a successful invocation | ||
*/ | ||
public static <T> T maybeHandleIOException(LogDirFailureChannel logDirFailureChannel, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please replace LocalLog.scala#maybeHandleIOException
by java version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This requires a bunch of changes in LocalLog
. I think it's best if we do those when we move LocalLog
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my point was to leverage LocalLog.java#maybeHandleIOException
to implement LocalLog.scala#maybeHandleIOException
. That can remove the duplicate code. for example:
private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
JLocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => msg, () => fun)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right, I see what you meant now. Done
@@ -906,7 +907,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, | |||
updateHighWatermarkWithLogEndOffset() | |||
|
|||
// update the producer state | |||
updatedProducers.values.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo)) | |||
updatedProducers.values.asScala.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about updatedProducers.values.forEach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right we don't need this conversion.
} | ||
|
||
private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = lock synchronized { | ||
val result = UnifiedLog.splitOverflowedSegment(segment, localLog.segments, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent) | ||
deleteProducerSnapshots(result.deletedSegments, asyncDelete = true) | ||
result.newSegments.toList | ||
result.newSegments.asScala.toList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems the return value is never used, so maybe we can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return value is used in a few places in LogLoaderTest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Committer Checklist (excluded from commit message)