Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2011,7 +2011,7 @@ object Log extends Logging {
logDirFailureChannel,
config.messageFormatVersion.recordVersion,
s"[Log partition=$topicPartition, dir=${dir.getParent}] ")
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs, time)
val offsets = LogLoader.load(LoadLogParams(
dir,
topicPartition,
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/log/LogLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,11 @@ object LogLoader extends Logging {
* @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow
*/
private def recoverSegment(segment: LogSegment, params: LoadLogParams): Int = {
val producerStateManager = new ProducerStateManager(params.topicPartition, params.dir, params.maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(
params.topicPartition,
params.dir,
params.maxProducerIdExpirationMs,
params.time)
Log.rebuildProducerState(
producerStateManager,
params.segments,
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/kafka/log/ProducerStateManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Files, StandardOpenOption}
import java.util.concurrent.ConcurrentSkipListMap

import kafka.log.Log.offsetFromFile
import kafka.server.LogOffsetMetadata
import kafka.utils.{Logging, nonthreadsafe, threadsafe}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{ByteUtils, Crc32C}

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -484,7 +484,8 @@ object ProducerStateManager {
@nonthreadsafe
class ProducerStateManager(val topicPartition: TopicPartition,
@volatile var _logDir: File,
val maxProducerIdExpirationMs: Int = 60 * 60 * 1000) extends Logging {
val maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
val time: Time = Time.SYSTEM) extends Logging {
import ProducerStateManager._
import java.util

Expand Down Expand Up @@ -718,8 +719,10 @@ class ProducerStateManager(val topicPartition: TopicPartition,
// If not a new offset, then it is not worth taking another snapshot
if (lastMapOffset > lastSnapOffset) {
val snapshotFile = SnapshotFile(Log.producerSnapshotFile(_logDir, lastMapOffset))
info(s"Writing producer snapshot at offset $lastMapOffset")
val start = time.hiResClockMs()
writeSnapshot(snapshotFile.file, producers)
info(s"Wrote producer snapshot at offset $lastMapOffset with ${producers.size} producer ids in ${time.hiResClockMs() - start} ms.")

snapshots.put(snapshotFile.offset, snapshotFile)

// Update the last snap offset according to the serialized map
Expand All @@ -730,7 +733,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
/**
* Update the parentDir for this ProducerStateManager and all of the snapshot files which it manages.
*/
def updateParentDir(parentDir: File): Unit ={
def updateParentDir(parentDir: File): Unit = {
_logDir = parentDir
snapshots.forEach((_, s) => s.updateParentDir(parentDir))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class LogCleanerManagerTest extends Logging {
val maxProducerIdExpirationMs = 60 * 60 * 1000
val segments = new LogSegments(tp)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxProducerIdExpirationMs, time)
val offsets = LogLoader.load(LoadLogParams(
tpDir,
tp,
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class LogCleanerTest {
val maxProducerIdExpirationMs = 60 * 60 * 1000
val logSegments = new LogSegments(topicPartition)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs, time)
val offsets = LogLoader.load(LoadLogParams(
dir,
topicPartition,
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class LogLoaderTest {
val maxProducerIdExpirationMs = 60 * 60 * 1000
val segments = new LogSegments(topicPartition)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs, time)
val loadLogParams = LoadLogParams(logDir, topicPartition, config, time.scheduler, time,
logDirFailureChannel, hadCleanShutdown, segments, logStartOffset, logRecoveryPoint,
maxProducerIdExpirationMs, leaderEpochCache, producerStateManager)
Expand Down Expand Up @@ -265,7 +265,7 @@ class LogLoaderTest {
}
}
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs, mockTime)
val loadLogParams = LoadLogParams(
logDir,
topicPartition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ProducerStateManagerTest {
@BeforeEach
def setUp(): Unit = {
logDir = TestUtils.tempDir()
stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
}

@AfterEach
Expand Down Expand Up @@ -467,7 +467,7 @@ class ProducerStateManagerTest {
append(stateManager, producerId, epoch, 1, 1L, isTransactional = true)

stateManager.takeSnapshot()
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds)

// The snapshot only persists the last appended batch metadata
Expand All @@ -490,7 +490,7 @@ class ProducerStateManagerTest {
appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.ABORT, offset = 2L)

stateManager.takeSnapshot()
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds)

// The snapshot only persists the last appended batch metadata
Expand All @@ -510,7 +510,7 @@ class ProducerStateManagerTest {
offset = 0L, timestamp = appendTimestamp)
stateManager.takeSnapshot()

val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
recoveredMapping.truncateAndReload(logStartOffset = 0L, logEndOffset = 1L, time.milliseconds)

val lastEntry = recoveredMapping.lastEntry(producerId)
Expand Down Expand Up @@ -542,7 +542,7 @@ class ProducerStateManagerTest {
append(stateManager, producerId, epoch, 1, 1L, 1)

stateManager.takeSnapshot()
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
recoveredMapping.truncateAndReload(0L, 1L, 70000)

// entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Hence
Expand All @@ -561,7 +561,7 @@ class ProducerStateManagerTest {
append(stateManager, producerId, epoch, 1, 1L, 1)

stateManager.takeSnapshot()
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
recoveredMapping.truncateAndReload(0L, 1L, 70000)

val sequence = 2
Expand Down Expand Up @@ -769,7 +769,7 @@ class ProducerStateManagerTest {
@Test
def testSequenceNotValidatedForGroupMetadataTopic(): Unit = {
val partition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
val stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)

val epoch = 0.toShort
append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 99,
Expand Down Expand Up @@ -818,7 +818,7 @@ class ProducerStateManagerTest {
appendEndTxnMarker(stateManager, producerId, producerEpoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1)
stateManager.takeSnapshot()

val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
recoveredMapping.truncateAndReload(0L, 2L, 70000)

// append from old coordinator should be rejected
Expand Down Expand Up @@ -922,7 +922,7 @@ class ProducerStateManagerTest {
}

// Ensure that the truncated snapshot is deleted and producer state is loaded from the previous snapshot
val reloadedStateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val reloadedStateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
reloadedStateManager.truncateAndReload(0L, 20L, time.milliseconds())
assertFalse(snapshotToTruncate.exists())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ class ReplicaManagerTest {
val maxProducerIdExpirationMs = 30000
val segments = new LogSegments(tp)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(tp, logDir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(tp, logDir, maxProducerIdExpirationMs, time)
val offsets = LogLoader.load(LoadLogParams(
logDir,
tp,
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class SchedulerTest {
val logDirFailureChannel = new LogDirFailureChannel(10)
val segments = new LogSegments(topicPartition)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs, mockTime)
val offsets = LogLoader.load(LoadLogParams(
logDir,
topicPartition,
Expand Down