Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason {
* @param scheduler The thread pool scheduler used for background actions
* @param brokerTopicStats Container for Broker Topic Yammer Metrics
* @param time The time instance used for checking the clock
* @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired
* @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
* @param topicPartition The topic partition associated with this Log instance
* @param leaderEpochCache The LeaderEpochFileCache instance (if any) containing state associated
Expand Down Expand Up @@ -268,7 +267,6 @@ class Log(@volatile private var _dir: File,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
val time: Time,
val maxProducerIdExpirationMs: Int,
val producerIdExpirationCheckIntervalMs: Int,
val topicPartition: TopicPartition,
@volatile var leaderEpochCache: Option[LeaderEpochFileCache],
Expand Down Expand Up @@ -2024,8 +2022,8 @@ object Log extends Logging {
leaderEpochCache,
producerStateManager))
new Log(dir, config, segments, offsets.logStartOffset, offsets.recoveryPoint, offsets.nextOffsetMetadata, scheduler,
brokerTopicStats, time, maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs, topicPartition,
leaderEpochCache, producerStateManager, logDirFailureChannel, topicId, keepPartitionMetadataFile)
brokerTopicStats, time, producerIdExpirationCheckIntervalMs, topicPartition, leaderEpochCache,
producerStateManager, logDirFailureChannel, topicId, keepPartitionMetadataFile)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ class PartitionLockTest extends Logging {
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion)
val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, log.maxProducerIdExpirationMs)
val maxProducerIdExpirationMs = 60 * 60 * 1000
val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
log.dir,
log.topicPartition,
Expand All @@ -296,7 +297,7 @@ class PartitionLockTest extends Logging {
segments,
0L,
0L,
log.maxProducerIdExpirationMs,
maxProducerIdExpirationMs,
leaderEpochCache,
producerStateManager))
new SlowLog(log, segments, offsets, leaderEpochCache, producerStateManager, mockTime, logDirFailureChannel, appendSemaphore)
Expand Down Expand Up @@ -379,7 +380,6 @@ class PartitionLockTest extends Logging {
mockTime.scheduler,
new BrokerTopicStats,
mockTime,
log.maxProducerIdExpirationMs,
log.producerIdExpirationCheckIntervalMs,
log.topicPartition,
leaderEpochCache,
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ class PartitionTest extends AbstractPartitionTest {
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion)
val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, log.maxProducerIdExpirationMs)
val maxProducerIdExpirationMs = 60 * 60 * 1000
val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
log.dir,
log.topicPartition,
Expand All @@ -249,7 +250,7 @@ class PartitionTest extends AbstractPartitionTest {
segments,
0L,
0L,
log.maxProducerIdExpirationMs,
maxProducerIdExpirationMs,
leaderEpochCache,
producerStateManager))
new SlowLog(log, segments, offsets, leaderEpochCache, producerStateManager, mockTime, logDirFailureChannel, appendSemaphore)
Expand Down Expand Up @@ -2040,7 +2041,6 @@ class PartitionTest extends AbstractPartitionTest {
mockTime.scheduler,
new BrokerTopicStats,
mockTime,
log.maxProducerIdExpirationMs,
log.producerIdExpirationCheckIntervalMs,
log.topicPartition,
leaderEpochCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class LogCleanerManagerTest extends Logging {
// the exception should be caught and the partition that caused it marked as uncleanable
class LogMock(dir: File, config: LogConfig, offsets: LoadedLogOffsets)
extends Log(dir, config, segments, offsets.logStartOffset, offsets.recoveryPoint,
offsets.nextOffsetMetadata, time.scheduler, new BrokerTopicStats, time, maxProducerIdExpirationMs,
offsets.nextOffsetMetadata, time.scheduler, new BrokerTopicStats, time,
LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition, leaderEpochCache,
producerStateManager, logDirFailureChannel, topicId = None, keepPartitionMetadataFile = true) {
// Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog()
Expand Down
1 change: 0 additions & 1 deletion core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ class LogCleanerTest {
scheduler = time.scheduler,
brokerTopicStats = new BrokerTopicStats,
time,
maxProducerIdExpirationMs = maxProducerIdExpirationMs,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
topicPartition = topicPartition,
leaderEpochCache = leaderEpochCache,
Expand Down
14 changes: 5 additions & 9 deletions core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class LogLoaderTest {
maxProducerIdExpirationMs, leaderEpochCache, producerStateManager)
val offsets = LogLoader.load(loadLogParams)
new Log(logDir, config, segments, offsets.logStartOffset, offsets.recoveryPoint,
offsets.nextOffsetMetadata, time.scheduler, brokerTopicStats, time, maxPidExpirationMs,
offsets.nextOffsetMetadata, time.scheduler, brokerTopicStats, time,
LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition, leaderEpochCache,
producerStateManager, logDirFailureChannel, None, true)
}
Expand Down Expand Up @@ -283,9 +283,8 @@ class LogLoaderTest {
val offsets = LogLoader.load(loadLogParams)
new Log(logDir, logConfig, interceptedLogSegments, offsets.logStartOffset, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, brokerTopicStats, mockTime,
maxProducerIdExpirationMs, LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition,
leaderEpochCache, producerStateManager, logDirFailureChannel, topicId = None,
keepPartitionMetadataFile = true)
LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition, leaderEpochCache,
producerStateManager, logDirFailureChannel, topicId = None, keepPartitionMetadataFile = true)
}

// Retain snapshots for the last 2 segments
Expand Down Expand Up @@ -362,7 +361,6 @@ class LogLoaderTest {
scheduler = mockTime.scheduler,
brokerTopicStats = brokerTopicStats,
time = mockTime,
maxProducerIdExpirationMs = maxProducerIdExpirationMs,
producerIdExpirationCheckIntervalMs = 30000,
topicPartition = topicPartition,
leaderEpochCache = leaderEpochCache,
Expand Down Expand Up @@ -431,7 +429,8 @@ class LogLoaderTest {
firstAppendTimestamp, coordinatorEpoch = coordinatorEpoch)
assertEquals(firstAppendTimestamp, log.producerStateManager.lastEntry(producerId).get.lastTimestamp)

mockTime.sleep(log.maxProducerIdExpirationMs)
val maxProducerIdExpirationMs = 60 * 60 * 1000
mockTime.sleep(maxProducerIdExpirationMs)
assertEquals(None, log.producerStateManager.lastEntry(producerId))

val secondAppendTimestamp = mockTime.milliseconds()
Expand Down Expand Up @@ -496,7 +495,6 @@ class LogLoaderTest {
scheduler = mockTime.scheduler,
brokerTopicStats = brokerTopicStats,
time = mockTime,
maxProducerIdExpirationMs = maxProducerIdExpirationMs,
producerIdExpirationCheckIntervalMs = 30000,
topicPartition = topicPartition,
leaderEpochCache = leaderEpochCache,
Expand Down Expand Up @@ -558,7 +556,6 @@ class LogLoaderTest {
scheduler = mockTime.scheduler,
brokerTopicStats = brokerTopicStats,
time = mockTime,
maxProducerIdExpirationMs = maxProducerIdExpirationMs,
producerIdExpirationCheckIntervalMs = 30000,
topicPartition = topicPartition,
leaderEpochCache = leaderEpochCache,
Expand Down Expand Up @@ -622,7 +619,6 @@ class LogLoaderTest {
scheduler = mockTime.scheduler,
brokerTopicStats = brokerTopicStats,
time = mockTime,
maxProducerIdExpirationMs = maxProducerIdExpirationMs,
producerIdExpirationCheckIntervalMs = 30000,
topicPartition = topicPartition,
leaderEpochCache = leaderEpochCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1512,7 +1512,6 @@ class ReplicaManagerTest {
scheduler = mockScheduler,
brokerTopicStats = mockBrokerTopicStats,
time = time,
maxProducerIdExpirationMs = maxProducerIdExpirationMs,
producerIdExpirationCheckIntervalMs = 30000,
topicPartition = tp,
leaderEpochCache = leaderEpochCache,
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class SchedulerTest {
producerStateManager))
val log = new Log(logDir, logConfig, segments = segments, logStartOffset = offsets.logStartOffset,
recoveryPoint = offsets.recoveryPoint, nextOffsetMetadata = offsets.nextOffsetMetadata, scheduler,
brokerTopicStats, mockTime, maxProducerIdExpirationMs, LogManager.ProducerIdExpirationCheckIntervalMs,
brokerTopicStats, mockTime, LogManager.ProducerIdExpirationCheckIntervalMs,
topicPartition, leaderEpochCache, producerStateManager, logDirFailureChannel,
topicId = None, keepPartitionMetadataFile = true)
assertTrue(scheduler.taskRunning(log.producerExpireCheck))
Expand Down