Skip to content
Merged
9 changes: 5 additions & 4 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex}
import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex}

import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -680,9 +680,10 @@ private[log] class Cleaner(val id: Int,
// 3) The last entry in the log is a transaction marker. We retain this marker since it has the
// last producer epoch, which is needed to ensure fencing.
lastRecordsOfActiveProducers.get(batch.producerId).exists { lastRecord =>
lastRecord.lastDataOffset match {
case Some(offset) => batch.lastOffset == offset
case None => batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch
if (lastRecord.lastDataOffset.isPresent) {
batch.lastOffset == lastRecord.lastDataOffset.getAsLong
} else {
batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult}

import java.util.Optional
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.math._

Expand Down Expand Up @@ -249,7 +250,7 @@ class LogSegment private[log] (val log: FileRecords,
if (batch.hasProducerId) {
val producerId = batch.producerId
val appendInfo = producerStateManager.prepareUpdate(producerId, origin = AppendOrigin.REPLICATION)
val maybeCompletedTxn = appendInfo.append(batch, firstOffsetMetadataOpt = None)
val maybeCompletedTxn = appendInfo.append(batch, Optional.empty()).asScala
producerStateManager.update(appendInfo)
maybeCompletedTxn.foreach { completedTxn =>
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
Expand Down
346 changes: 16 additions & 330 deletions core/src/main/scala/kafka/log/ProducerStateManager.scala

Large diffs are not rendered by default.

19 changes: 11 additions & 8 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.yammer.metrics.core.MetricName

import java.io.{File, IOException}
import java.nio.file.Files
import java.util.Optional
import java.util.{Optional, OptionalLong}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, TimeUnit}
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
Expand All @@ -42,13 +42,14 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator, ProducerAppendInfo}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType

import scala.annotation.nowarn
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._

object LogAppendInfo {
Expand Down Expand Up @@ -237,7 +238,7 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason {
*/
@threadsafe
class UnifiedLog(@volatile var logStartOffset: Long,
private[log] val localLog: LocalLog,
private val localLog: LocalLog,
brokerTopicStats: BrokerTopicStats,
val producerIdExpirationCheckIntervalMs: Int,
@volatile var leaderEpochCache: Option[LeaderEpochFileCache],
Expand Down Expand Up @@ -672,7 +673,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
.setLastSequence(state.lastSeq)
.setLastTimestamp(state.lastTimestamp)
.setCoordinatorEpoch(state.coordinatorEpoch)
.setCurrentTxnStartOffset(state.currentTxnFirstOffset.getOrElse(-1L))
.setCurrentTxnStartOffset(state.currentTxnFirstOffset.orElse(-1L))
}
}.toSeq
}
Expand All @@ -685,8 +686,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,

private[log] def lastRecordsOfActiveProducers: Map[Long, LastRecord] = lock synchronized {
producerStateManager.activeProducers.map { case (producerId, producerIdEntry) =>
val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0 ) Some(producerIdEntry.lastDataOffset) else None
val lastRecord = LastRecord(lastDataOffset, producerIdEntry.producerEpoch)
val lastDataOffset =
if (producerIdEntry.lastDataOffset >= 0) OptionalLong.of(producerIdEntry.lastDataOffset)
else OptionalLong.empty()
val lastRecord = new LastRecord(lastDataOffset, producerIdEntry.producerEpoch)
producerId -> lastRecord
}
}
Expand Down Expand Up @@ -1083,7 +1086,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
if (origin == AppendOrigin.CLIENT) {
val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)

maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate =>
maybeLastEntry.flatMap(_.findDuplicateBatch(batch).asScala).foreach { duplicate =>
return (updatedProducers, completedTxns.toList, Some(duplicate))
}
}
Expand Down Expand Up @@ -1978,7 +1981,7 @@ object UnifiedLog extends Logging {
origin: AppendOrigin): Option[CompletedTxn] = {
val producerId = batch.producerId
val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))
appendInfo.append(batch, firstOffsetMetadata)
appendInfo.append(batch, firstOffsetMetadata.asJava).asScala
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/DumpLogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ object DumpLogSegments {
print(s"producerId: ${entry.producerId} producerEpoch: ${entry.producerEpoch} " +
s"coordinatorEpoch: ${entry.coordinatorEpoch} currentTxnFirstOffset: ${entry.currentTxnFirstOffset} " +
s"lastTimestamp: ${entry.lastTimestamp} ")
entry.batchMetadata.headOption.foreach { metadata =>
entry.batchMetadata.asScala.headOption.foreach { metadata =>
print(s"firstSequence: ${metadata.firstSeq} lastSequence: ${metadata.lastSeq} " +
s"lastOffset: ${metadata.lastOffset} offsetDelta: ${metadata.offsetDelta} timestamp: ${metadata.timestamp}")
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package kafka.log

import java.io.File
import java.util.OptionalLong

import kafka.server.checkpoints.LeaderEpochCheckpoint
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.utils.TestUtils
Expand All @@ -25,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Time, Utils}
import org.apache.kafka.server.log.internals.LogConfig
import org.apache.kafka.server.log.internals.{BatchMetadata, LogConfig, ProducerStateEntry}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

Expand Down Expand Up @@ -355,9 +357,7 @@ class LogSegmentTest {

// recover again, but this time assuming the transaction from pid2 began on a previous segment
stateManager = newProducerStateManager()
stateManager.loadProducerEntry(new ProducerStateEntry(pid2,
mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch,
0, RecordBatch.NO_TIMESTAMP, Some(75L)))
stateManager.loadProducerEntry(new ProducerStateEntry(pid2, producerEpoch, 0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L), java.util.Optional.of(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP))))
segment.recover(stateManager)
assertEquals(108L, stateManager.mapEndOffset)

Expand Down
Loading