diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 6c306101b1474..ef0d6aed5f0d8 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -240,7 +240,6 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { * @param scheduler The thread pool scheduler used for background actions * @param brokerTopicStats Container for Broker Topic Yammer Metrics * @param time The time instance used for checking the clock - * @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired * @param topicPartition The topic partition associated with this Log instance * @param leaderEpochCache The LeaderEpochFileCache instance (if any) containing state associated @@ -268,7 +267,6 @@ class Log(@volatile private var _dir: File, scheduler: Scheduler, brokerTopicStats: BrokerTopicStats, val time: Time, - val maxProducerIdExpirationMs: Int, val producerIdExpirationCheckIntervalMs: Int, val topicPartition: TopicPartition, @volatile var leaderEpochCache: Option[LeaderEpochFileCache], @@ -2024,8 +2022,8 @@ object Log extends Logging { leaderEpochCache, producerStateManager)) new Log(dir, config, segments, offsets.logStartOffset, offsets.recoveryPoint, offsets.nextOffsetMetadata, scheduler, - brokerTopicStats, time, maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs, topicPartition, - leaderEpochCache, producerStateManager, logDirFailureChannel, topicId, keepPartitionMetadataFile) + brokerTopicStats, time, producerIdExpirationCheckIntervalMs, topicPartition, leaderEpochCache, + producerStateManager, logDirFailureChannel, topicId, keepPartitionMetadataFile) } /** diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 8b7b18b03693b..8bb54c4ce070c 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -284,7 +284,8 @@ class PartitionLockTest extends Logging { val logDirFailureChannel = new LogDirFailureChannel(1) val segments = new LogSegments(log.topicPartition) val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion) - val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, log.maxProducerIdExpirationMs) + val maxProducerIdExpirationMs = 60 * 60 * 1000 + val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, maxProducerIdExpirationMs) val offsets = LogLoader.load(LoadLogParams( log.dir, log.topicPartition, @@ -296,7 +297,7 @@ class PartitionLockTest extends Logging { segments, 0L, 0L, - log.maxProducerIdExpirationMs, + maxProducerIdExpirationMs, leaderEpochCache, producerStateManager)) new SlowLog(log, segments, offsets, leaderEpochCache, producerStateManager, mockTime, logDirFailureChannel, appendSemaphore) @@ -379,7 +380,6 @@ class PartitionLockTest extends Logging { mockTime.scheduler, new BrokerTopicStats, mockTime, - log.maxProducerIdExpirationMs, log.producerIdExpirationCheckIntervalMs, log.topicPartition, leaderEpochCache, diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 492c867bdd7a9..8e492acb7438a 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -237,7 +237,8 @@ class PartitionTest extends AbstractPartitionTest { val logDirFailureChannel = new LogDirFailureChannel(1) val segments = new LogSegments(log.topicPartition) val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion) - val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, log.maxProducerIdExpirationMs) + val maxProducerIdExpirationMs = 60 * 60 * 1000 + val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, maxProducerIdExpirationMs) val offsets = LogLoader.load(LoadLogParams( log.dir, log.topicPartition, @@ -249,7 +250,7 @@ class PartitionTest extends AbstractPartitionTest { segments, 0L, 0L, - log.maxProducerIdExpirationMs, + maxProducerIdExpirationMs, leaderEpochCache, producerStateManager)) new SlowLog(log, segments, offsets, leaderEpochCache, producerStateManager, mockTime, logDirFailureChannel, appendSemaphore) @@ -2040,7 +2041,6 @@ class PartitionTest extends AbstractPartitionTest { mockTime.scheduler, new BrokerTopicStats, mockTime, - log.maxProducerIdExpirationMs, log.producerIdExpirationCheckIntervalMs, log.topicPartition, leaderEpochCache, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index cdaa89de9e34e..3cb32c0724e09 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -119,7 +119,7 @@ class LogCleanerManagerTest extends Logging { // the exception should be caught and the partition that caused it marked as uncleanable class LogMock(dir: File, config: LogConfig, offsets: LoadedLogOffsets) extends Log(dir, config, segments, offsets.logStartOffset, offsets.recoveryPoint, - offsets.nextOffsetMetadata, time.scheduler, new BrokerTopicStats, time, maxProducerIdExpirationMs, + offsets.nextOffsetMetadata, time.scheduler, new BrokerTopicStats, time, LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition, leaderEpochCache, producerStateManager, logDirFailureChannel, topicId = None, keepPartitionMetadataFile = true) { // Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog() diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index de55724d6ba71..801ba262386e8 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -131,7 +131,6 @@ class LogCleanerTest { scheduler = time.scheduler, brokerTopicStats = new BrokerTopicStats, time, - maxProducerIdExpirationMs = maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition = topicPartition, leaderEpochCache = leaderEpochCache, diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 0d6e7e3dfc9d4..fd75779fd5bed 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -101,7 +101,7 @@ class LogLoaderTest { maxProducerIdExpirationMs, leaderEpochCache, producerStateManager) val offsets = LogLoader.load(loadLogParams) new Log(logDir, config, segments, offsets.logStartOffset, offsets.recoveryPoint, - offsets.nextOffsetMetadata, time.scheduler, brokerTopicStats, time, maxPidExpirationMs, + offsets.nextOffsetMetadata, time.scheduler, brokerTopicStats, time, LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition, leaderEpochCache, producerStateManager, logDirFailureChannel, None, true) } @@ -283,9 +283,8 @@ class LogLoaderTest { val offsets = LogLoader.load(loadLogParams) new Log(logDir, logConfig, interceptedLogSegments, offsets.logStartOffset, offsets.recoveryPoint, offsets.nextOffsetMetadata, mockTime.scheduler, brokerTopicStats, mockTime, - maxProducerIdExpirationMs, LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition, - leaderEpochCache, producerStateManager, logDirFailureChannel, topicId = None, - keepPartitionMetadataFile = true) + LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition, leaderEpochCache, + producerStateManager, logDirFailureChannel, topicId = None, keepPartitionMetadataFile = true) } // Retain snapshots for the last 2 segments @@ -362,7 +361,6 @@ class LogLoaderTest { scheduler = mockTime.scheduler, brokerTopicStats = brokerTopicStats, time = mockTime, - maxProducerIdExpirationMs = maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs = 30000, topicPartition = topicPartition, leaderEpochCache = leaderEpochCache, @@ -431,7 +429,8 @@ class LogLoaderTest { firstAppendTimestamp, coordinatorEpoch = coordinatorEpoch) assertEquals(firstAppendTimestamp, log.producerStateManager.lastEntry(producerId).get.lastTimestamp) - mockTime.sleep(log.maxProducerIdExpirationMs) + val maxProducerIdExpirationMs = 60 * 60 * 1000 + mockTime.sleep(maxProducerIdExpirationMs) assertEquals(None, log.producerStateManager.lastEntry(producerId)) val secondAppendTimestamp = mockTime.milliseconds() @@ -496,7 +495,6 @@ class LogLoaderTest { scheduler = mockTime.scheduler, brokerTopicStats = brokerTopicStats, time = mockTime, - maxProducerIdExpirationMs = maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs = 30000, topicPartition = topicPartition, leaderEpochCache = leaderEpochCache, @@ -558,7 +556,6 @@ class LogLoaderTest { scheduler = mockTime.scheduler, brokerTopicStats = brokerTopicStats, time = mockTime, - maxProducerIdExpirationMs = maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs = 30000, topicPartition = topicPartition, leaderEpochCache = leaderEpochCache, @@ -622,7 +619,6 @@ class LogLoaderTest { scheduler = mockTime.scheduler, brokerTopicStats = brokerTopicStats, time = mockTime, - maxProducerIdExpirationMs = maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs = 30000, topicPartition = topicPartition, leaderEpochCache = leaderEpochCache, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 5a6ea2ef8ddd4..8081d0b994b32 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -1512,7 +1512,6 @@ class ReplicaManagerTest { scheduler = mockScheduler, brokerTopicStats = mockBrokerTopicStats, time = time, - maxProducerIdExpirationMs = maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs = 30000, topicPartition = tp, leaderEpochCache = leaderEpochCache, diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index 159ace8103e5a..876388d008012 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -141,7 +141,7 @@ class SchedulerTest { producerStateManager)) val log = new Log(logDir, logConfig, segments = segments, logStartOffset = offsets.logStartOffset, recoveryPoint = offsets.recoveryPoint, nextOffsetMetadata = offsets.nextOffsetMetadata, scheduler, - brokerTopicStats, mockTime, maxProducerIdExpirationMs, LogManager.ProducerIdExpirationCheckIntervalMs, + brokerTopicStats, mockTime, LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition, leaderEpochCache, producerStateManager, logDirFailureChannel, topicId = None, keepPartitionMetadataFile = true) assertTrue(scheduler.taskRunning(log.producerExpireCheck))