Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,070 changes: 436 additions & 634 deletions core/src/main/scala/kafka/log/Log.scala

Large diffs are not rendered by default.

11 changes: 5 additions & 6 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,11 @@ object LogCleaner {

}

def createNewCleanedSegment(log: Log, baseOffset: Long): LogSegment = {
LogSegment.deleteIfExists(log.dir, baseOffset, fileSuffix = Log.CleanedFileSuffix)
LogSegment.open(log.dir, baseOffset, log.config, Time.SYSTEM,
fileSuffix = Log.CleanedFileSuffix, initFileSize = log.initFileSize, preallocate = log.config.preallocate)
def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
LogSegment.deleteIfExists(dir, baseOffset, fileSuffix = Log.CleanedFileSuffix)
LogSegment.open(dir, baseOffset, logConfig, Time.SYSTEM,
fileSuffix = Log.CleanedFileSuffix, initFileSize = logConfig.initFileSize, preallocate = logConfig.preallocate)
}

}

/**
Expand Down Expand Up @@ -563,7 +562,7 @@ private[log] class Cleaner(val id: Int,
stats: CleanerStats,
transactionMetadata: CleanedTransactionMetadata): Unit = {
// create a new segment with a suffix appended to the name of the log and indexes
val cleaned = LogCleaner.createNewCleanedSegment(log, segments.head.baseOffset)
val cleaned = LogCleaner.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)

try {
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/kafka/log/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
if (compact && maxCompactionLagMs > 0) math.min(maxCompactionLagMs, segmentMs)
else segmentMs
}

def initFileSize: Int = {
if (preallocate)
segmentSize
else
0
}
}

object LogConfig {
Expand Down
525 changes: 525 additions & 0 deletions core/src/main/scala/kafka/log/LogLoader.scala

Large diffs are not rendered by default.

47 changes: 39 additions & 8 deletions core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import kafka.api.ApiVersion
import kafka.log._
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils._
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.{TopicPartition, Uuid}
Expand Down Expand Up @@ -280,7 +281,25 @@ class PartitionLockTest extends Logging {

override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Log = {
val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None)
new SlowLog(log, mockTime, appendSemaphore)
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion)
val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, log.maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
log.dir,
log.topicPartition,
log.config,
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
segments,
0L,
0L,
log.maxProducerIdExpirationMs,
leaderEpochCache,
producerStateManager))
new SlowLog(log, segments, offsets, leaderEpochCache, producerStateManager, mockTime, logDirFailureChannel, appendSemaphore)
}
}
when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition)))
Expand Down Expand Up @@ -341,19 +360,31 @@ class PartitionLockTest extends Logging {
}
}

private class SlowLog(log: Log, mockTime: MockTime, appendSemaphore: Semaphore) extends Log(
private class SlowLog(
log: Log,
segments: LogSegments,
offsets: LoadedLogOffsets,
leaderEpochCache: Option[LeaderEpochFileCache],
producerStateManager: ProducerStateManager,
mockTime: MockTime,
logDirFailureChannel: LogDirFailureChannel,
appendSemaphore: Semaphore
) extends Log(
log.dir,
log.config,
log.logStartOffset,
log.recoveryPoint,
segments,
offsets.logStartOffset,
offsets.recoveryPoint,
offsets.nextOffsetMetadata,
mockTime.scheduler,
new BrokerTopicStats,
log.time,
mockTime,
log.maxProducerIdExpirationMs,
log.producerIdExpirationCheckIntervalMs,
log.topicPartition,
log.producerStateManager,
new LogDirFailureChannel(1),
leaderEpochCache,
producerStateManager,
logDirFailureChannel,
topicId = None,
keepPartitionMetadataFile = true) {

Expand All @@ -362,5 +393,5 @@ class PartitionLockTest extends Logging {
appendSemaphore.acquire()
appendInfo
}
}
}
}
48 changes: 40 additions & 8 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock

import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}

import kafka.server.epoch.LeaderEpochFileCache

import scala.jdk.CollectionConverters._

class PartitionTest extends AbstractPartitionTest {
Expand Down Expand Up @@ -232,7 +234,25 @@ class PartitionTest extends AbstractPartitionTest {

override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Log = {
val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None)
new SlowLog(log, mockTime, appendSemaphore)
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion)
val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, log.maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
log.dir,
log.topicPartition,
log.config,
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
segments,
0L,
0L,
log.maxProducerIdExpirationMs,
leaderEpochCache,
producerStateManager))
new SlowLog(log, segments, offsets, leaderEpochCache, producerStateManager, mockTime, logDirFailureChannel, appendSemaphore)
}
}

Expand Down Expand Up @@ -1934,19 +1954,31 @@ class PartitionTest extends AbstractPartitionTest {
}
}

private class SlowLog(log: Log, mockTime: MockTime, appendSemaphore: Semaphore) extends Log(
private class SlowLog(
log: Log,
segments: LogSegments,
offsets: LoadedLogOffsets,
leaderEpochCache: Option[LeaderEpochFileCache],
producerStateManager: ProducerStateManager,
mockTime: MockTime,
logDirFailureChannel: LogDirFailureChannel,
appendSemaphore: Semaphore
) extends Log(
log.dir,
log.config,
log.logStartOffset,
log.recoveryPoint,
segments,
offsets.logStartOffset,
offsets.recoveryPoint,
offsets.nextOffsetMetadata,
mockTime.scheduler,
new BrokerTopicStats,
log.time,
mockTime,
log.maxProducerIdExpirationMs,
log.producerIdExpirationCheckIntervalMs,
log.topicPartition,
log.producerStateManager,
new LogDirFailureChannel(1),
leaderEpochCache,
producerStateManager,
logDirFailureChannel,
topicId = None,
keepPartitionMetadataFile = true) {

Expand Down
37 changes: 29 additions & 8 deletions core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.log

import java.io.File
import java.nio.file.Files
import java.util.Properties

import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
Expand Down Expand Up @@ -94,19 +95,39 @@ class LogCleanerManagerTest extends Logging {
val logSegmentSize = TestUtils.singletonRecords("test".getBytes).sizeInBytes * 10
val logSegmentsCount = 2
val tpDir = new File(logDir, "A-1")

// the exception should be catched and the partition that caused it marked as uncleanable
class LogMock(dir: File, config: LogConfig) extends Log(dir, config, 0L, 0L,
time.scheduler, new BrokerTopicStats, time, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs,
topicPartition, new ProducerStateManager(tp, tpDir, 60 * 60 * 1000),
new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true) {

Files.createDirectories(tpDir.toPath)
val logDirFailureChannel = new LogDirFailureChannel(10)
val config = createLowRetentionLogConfig(logSegmentSize, LogConfig.Compact)
val maxProducerIdExpirationMs = 60 * 60 * 1000
val segments = new LogSegments(tp)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
tpDir,
tp,
config,
time.scheduler,
time,
logDirFailureChannel,
hadCleanShutdown = true,
segments,
0L,
0L,
maxProducerIdExpirationMs,
leaderEpochCache,
producerStateManager))
// the exception should be caught and the partition that caused it marked as uncleanable
class LogMock(dir: File, config: LogConfig, offsets: LoadedLogOffsets)
extends Log(dir, config, segments, offsets.logStartOffset, offsets.recoveryPoint,
offsets.nextOffsetMetadata, time.scheduler, new BrokerTopicStats, time, maxProducerIdExpirationMs,
LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition, leaderEpochCache,
producerStateManager, logDirFailureChannel, topicId = None, keepPartitionMetadataFile = true) {
// Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog()
override def getFirstBatchTimestampForSegments(segments: Iterable[LogSegment]): Iterable[Long] =
throw new IllegalStateException("Error!")
}

val log: Log = new LogMock(tpDir, createLowRetentionLogConfig(logSegmentSize, LogConfig.Compact))
val log: Log = new LogMock(tpDir, config, offsets)
writeRecords(log = log,
numBatches = logSegmentsCount * 2,
recordsPerBatch = 10,
Expand Down
40 changes: 32 additions & 8 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,43 @@ class LogCleanerTest {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024 : java.lang.Integer)
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete)
val config = LogConfig.fromProps(logConfig.originals, logProps)
val topicPartition = Log.parseTopicPartitionName(dir)
val producerStateManager = new ProducerStateManager(topicPartition, dir)
val logDirFailureChannel = new LogDirFailureChannel(10)
val maxProducerIdExpirationMs = 60 * 60 * 1000
val logSegments = new LogSegments(topicPartition)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
dir,
topicPartition,
config,
time.scheduler,
time,
logDirFailureChannel,
hadCleanShutdown = true,
logSegments,
0L,
0L,
maxProducerIdExpirationMs,
leaderEpochCache,
producerStateManager))

val log = new Log(dir,
config = LogConfig.fromProps(logConfig.originals, logProps),
logStartOffset = 0L,
recoveryPoint = 0L,
config = config,
segments = logSegments,
logStartOffset = offsets.logStartOffset,
recoveryPoint = offsets.recoveryPoint,
nextOffsetMetadata = offsets.nextOffsetMetadata,
scheduler = time.scheduler,
brokerTopicStats = new BrokerTopicStats, time,
maxProducerIdExpirationMs = 60 * 60 * 1000,
brokerTopicStats = new BrokerTopicStats,
time,
maxProducerIdExpirationMs = maxProducerIdExpirationMs,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
topicPartition = topicPartition,
leaderEpochCache = leaderEpochCache,
producerStateManager = producerStateManager,
logDirFailureChannel = new LogDirFailureChannel(10),
logDirFailureChannel = logDirFailureChannel,
topicId = None,
keepPartitionMetadataFile = true) {
override def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
Expand Down Expand Up @@ -1755,7 +1779,7 @@ class LogCleanerTest {
private def tombstoneRecord(key: Int): MemoryRecords = record(key, null)

private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long]): Log = {
LogTest.recoverAndCheck(dir, config, expectedKeys, new BrokerTopicStats(), time, time.scheduler)
LogTestUtils.recoverAndCheck(dir, config, expectedKeys, new BrokerTopicStats(), time, time.scheduler)
}
}

Expand Down
Loading