Skip to content

Commit

Permalink
KAFKA-14482 Move LogLoader to storage module (#17042)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
mimaison committed Sep 16, 2024
1 parent 21e67b3 commit f1c011a
Show file tree
Hide file tree
Showing 20 changed files with 1,262 additions and 1,085 deletions.
6 changes: 3 additions & 3 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,11 @@

<!-- storage -->
<suppress checks="CyclomaticComplexity"
files="(LogValidator|RemoteLogManagerConfig|RemoteLogManager).java"/>
files="(LogLoader|LogValidator|RemoteLogManagerConfig|RemoteLogManager).java"/>
<suppress checks="NPathComplexity"
files="(LogValidator|RemoteLogManager|RemoteIndexCache).java"/>
files="(LogLoader|LogValidator|RemoteLogManager|RemoteIndexCache).java"/>
<suppress checks="ParameterNumber"
files="(LogAppendInfo|RemoteLogManagerConfig).java"/>
files="(LogAppendInfo|LogLoader|RemoteLogManagerConfig).java"/>

<!-- benchmarks -->
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
Expand Down
290 changes: 10 additions & 280 deletions core/src/main/scala/kafka/log/LocalLog.scala

Large diffs are not rendered by default.

528 changes: 0 additions & 528 deletions core/src/main/scala/kafka/log/LogLoader.scala

This file was deleted.

6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ class LogManager(logDirs: Seq[File],
logStartOffsets: util.Map[TopicPartition, JLong],
defaultConfig: LogConfig,
topicConfigOverrides: Map[String, LogConfig],
numRemainingSegments: ConcurrentMap[String, Int],
numRemainingSegments: ConcurrentMap[String, Integer],
isStray: UnifiedLog => Boolean): UnifiedLog = {
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val config = topicConfigOverrides.getOrElse(topicPartition.topic, defaultConfig)
Expand Down Expand Up @@ -421,7 +421,7 @@ class LogManager(logDirs: Seq[File],
// log dir path -> number of Remaining logs map for remainingLogsToRecover metric
val numRemainingLogs: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int]
// log recovery thread name -> number of remaining segments map for remainingSegmentsToRecover metric
val numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int]
val numRemainingSegments: ConcurrentMap[String, Integer] = new ConcurrentHashMap[String, Integer]

def handleIOException(logDirAbsolutePath: String, e: IOException): Unit = {
offlineDirs.add((logDirAbsolutePath, e))
Expand Down Expand Up @@ -550,7 +550,7 @@ class LogManager(logDirs: Seq[File],
}

private[log] def addLogRecoveryMetrics(numRemainingLogs: ConcurrentMap[String, Int],
numRemainingSegments: ConcurrentMap[String, Int]): Unit = {
numRemainingSegments: ConcurrentMap[String, Integer]): Unit = {
debug("Adding log recovery metrics")
for (dir <- logDirs) {
metricsGroup.newGauge("remainingLogsToRecover", () => numRemainingLogs.get(dir.getAbsolutePath),
Expand Down
235 changes: 31 additions & 204 deletions core/src/main/scala/kafka/log/UnifiedLog.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogAppendInfo, LogConfig, LogDirFailureChannel, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
Expand Down Expand Up @@ -319,12 +319,14 @@ class PartitionLockTest extends Logging {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
producerStateManager
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(log.dir, log.config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, log.topicPartition,
Expand Down
16 changes: 9 additions & 7 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.mockito.invocation.InvocationOnMock
import java.lang.{Long => JLong}
import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Semaphore}
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.compress.Compression
Expand All @@ -61,7 +61,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
Expand Down Expand Up @@ -454,12 +454,14 @@ class PartitionTest extends AbstractPartitionTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
segments = segments,
logStartOffsetCheckpoint = 0L,
recoveryPointCheckpoint = 0L,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
producerStateManager
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(log.dir, log.config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, log.topicPartition,
Expand Down
19 changes: 10 additions & 9 deletions core/src/test/scala/unit/kafka/log/LocalLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.log
import java.io.File
import java.nio.channels.ClosedChannelException
import java.nio.charset.StandardCharsets
import java.util
import java.util.regex.Pattern
import java.util.Collections
import kafka.server.KafkaConfig
Expand All @@ -30,7 +31,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{MemoryRecords, Record, SimpleRecord}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LocalLog => JLocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
Expand Down Expand Up @@ -123,7 +124,7 @@ class LocalLogTest {
log.roll()
assertEquals(2, log.segments.numberOfSegments)
assertFalse(logDir.listFiles.isEmpty)
val segmentsBeforeDelete = log.segments.values.asScala.toVector
val segmentsBeforeDelete = new util.ArrayList(log.segments.values)
val deletedSegments = log.deleteAllSegments()
assertTrue(log.segments.isEmpty)
assertEquals(segmentsBeforeDelete, deletedSegments)
Expand All @@ -138,7 +139,7 @@ class LocalLogTest {
assertEquals(1, log.segments.numberOfSegments)
assertNotEquals(oldActiveSegment, log.segments.activeSegment)
assertFalse(logDir.listFiles.isEmpty)
assertTrue(oldActiveSegment.hasSuffix(LocalLog.DeletedFileSuffix))
assertTrue(oldActiveSegment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
}

@Test
Expand Down Expand Up @@ -306,17 +307,17 @@ class LocalLogTest {

assertEquals(10L, log.segments.numberOfSegments)

val toDelete = log.segments.values.asScala.toVector
LocalLog.deleteSegmentFiles(toDelete, asyncDelete = asyncDelete, log.dir, log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "")
val toDelete = log.segments.values
JLocalLog.deleteSegmentFiles(toDelete, asyncDelete, log.dir, log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "")
if (asyncDelete) {
toDelete.foreach {
toDelete.forEach {
segment =>
assertFalse(segment.deleted())
assertTrue(segment.hasSuffix(LocalLog.DeletedFileSuffix))
assertTrue(segment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
}
mockTime.sleep(log.config.fileDeleteDelayMs + 1)
}
toDelete.foreach(segment => assertTrue(segment.deleted()))
toDelete.forEach(segment => assertTrue(segment.deleted()))
}

@Test
Expand All @@ -339,7 +340,7 @@ class LocalLogTest {
assertEquals(1, log.segments.numberOfSegments)
assertEquals(newActiveSegment, log.segments.activeSegment)
assertNotEquals(oldActiveSegment, log.segments.activeSegment)
assertTrue(oldActiveSegment.hasSuffix(LocalLog.DeletedFileSuffix))
assertTrue(oldActiveSegment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
assertEquals(newOffset, log.segments.activeSegment.baseOffset)
assertEquals(0L, log.recoveryPoint)
assertEquals(newOffset, log.logEndOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}

import java.lang.{Long => JLong}
import java.util
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.compat.java8.OptionConverters._

Expand Down Expand Up @@ -119,12 +120,14 @@ class LogCleanerManagerTest extends Logging {
time.scheduler,
time,
logDirFailureChannel,
hadCleanShutdown = true,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
producerStateManager
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(tpDir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, time.scheduler, time, tp, logDirFailureChannel)
Expand Down
10 changes: 6 additions & 4 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
Expand All @@ -43,7 +43,7 @@ import java.nio._
import java.nio.charset.StandardCharsets
import java.nio.file.Paths
import java.util.Properties
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
import scala.collection._
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -200,12 +200,14 @@ class LogCleanerTest extends Logging {
time.scheduler,
time,
logDirFailureChannel,
hadCleanShutdown = true,
true,
logSegments,
0L,
0L,
leaderEpochCache.asJava,
producerStateManager
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(dir, config, logSegments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, time.scheduler, time, topicPartition, logDirFailureChannel)
Expand Down
52 changes: 32 additions & 20 deletions core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package kafka.log

import java.io.{BufferedWriter, File, FileWriter, IOException}
import java.nio.ByteBuffer
import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util.{Optional, OptionalLong, Properties}
import kafka.server.KafkaConfig
import kafka.server.metadata.MockConfigRepository
import kafka.utils.TestUtils
Expand All @@ -35,7 +31,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
Expand All @@ -47,9 +43,13 @@ import org.mockito.{ArgumentMatchers, Mockito}
import org.mockito.ArgumentMatchers.{any, anyLong}
import org.mockito.Mockito.{mock, reset, times, verify, when}

import java.io.{BufferedWriter, File, FileWriter, IOException}
import java.lang.{Long => JLong}
import java.nio.ByteBuffer
import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.{Optional, OptionalLong, Properties}
import scala.annotation.nowarn
import scala.collection.mutable.ListBuffer
import scala.collection.{Iterable, Map, mutable}
Expand Down Expand Up @@ -136,7 +136,7 @@ class LogLoaderTest {

override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: util.Map[TopicPartition, JLong],
logStartOffsets: util.Map[TopicPartition, JLong], defaultConfig: LogConfig,
topicConfigs: Map[String, LogConfig], numRemainingSegments: ConcurrentMap[String, Int],
topicConfigs: Map[String, LogConfig], numRemainingSegments: ConcurrentMap[String, Integer],
shouldBeStrayKraftLog: UnifiedLog => Boolean): UnifiedLog = {
if (simulateError.hasError) {
simulateError.errorType match {
Expand All @@ -163,7 +163,7 @@ class LogLoaderTest {
this.maxTransactionTimeoutMs, this.producerStateManagerConfig, time)
val logLoader = new LogLoader(logDir, topicPartition, config, time.scheduler, time,
logDirFailureChannel, hadCleanShutdown, segments, logStartOffset, logRecoveryPoint,
leaderEpochCache.asJava, producerStateManager)
leaderEpochCache.asJava, producerStateManager, new ConcurrentHashMap[String, Integer], false)
val offsets = logLoader.load()
val localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
Expand Down Expand Up @@ -382,12 +382,15 @@ class LogLoaderTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = false,
false,
interceptedLogSegments,
0L,
recoveryPoint,
leaderEpochCache.asJava,
producerStateManager)
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
)
val offsets = logLoader.load()
val localLog = new LocalLog(logDir, logConfig, interceptedLogSegments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
Expand Down Expand Up @@ -445,12 +448,14 @@ class LogLoaderTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = false,
false,
segments,
0L,
0L,
leaderEpochCache.asJava,
stateManager
stateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
Expand Down Expand Up @@ -555,12 +560,14 @@ class LogLoaderTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = false,
false,
segments,
0L,
0L,
leaderEpochCache.asJava,
stateManager
stateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
Expand Down Expand Up @@ -610,12 +617,14 @@ class LogLoaderTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
stateManager
stateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
Expand Down Expand Up @@ -664,12 +673,14 @@ class LogLoaderTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
stateManager
stateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
Expand Down Expand Up @@ -1811,13 +1822,14 @@ class LogLoaderTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
stateManager,
isRemoteLogEnabled = isRemoteLogEnabled
new ConcurrentHashMap[String, Integer],
isRemoteLogEnabled
).load()
assertEquals(expectedLogStartOffset, offsets.logStartOffset)
}
Expand Down
Loading

0 comments on commit f1c011a

Please sign in to comment.