From e0ebe0ad7aa267d63c01b571171ec6c0002a0576 Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Tue, 20 Dec 2022 10:24:21 +0530 Subject: [PATCH 01/11] Moved LastRecord and TxnMetadata to storage module --- .../src/main/scala/kafka/log/LogCleaner.scala | 12 ++-- .../kafka/log/ProducerStateManager.scala | 29 ++------- .../src/main/scala/kafka/log/UnifiedLog.scala | 6 +- .../kafka/log/ProducerStateManagerTest.scala | 6 +- .../server/log/internals/LastRecord.java | 55 ++++++++++++++++ .../server/log/internals/TxnMetadata.java | 63 +++++++++++++++++++ 6 files changed, 135 insertions(+), 36 deletions(-) create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 83b1b0e81b6b8..2ca5c2e85a173 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{BufferSupplier, Time} -import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex} +import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex} import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer @@ -174,7 +174,8 @@ class LogCleaner(initialConfig: CleanerConfig, } override def validateReconfiguration(newConfig: KafkaConfig): Unit = { - val numThreads = LogCleaner.cleanerConfig(newConfig).numThreads + val newCleanerConfig = LogCleaner.cleanerConfig(newConfig) + val numThreads = newCleanerConfig.numThreads val currentThreads = config.numThreads if (numThreads < 1) throw new ConfigException(s"Log cleaner threads should be at least 1") @@ -680,9 +681,10 @@ private[log] class Cleaner(val id: Int, // 3) The last entry in the log is a transaction marker. We retain this marker since it has the // last producer epoch, which is needed to ensure fencing. lastRecordsOfActiveProducers.get(batch.producerId).exists { lastRecord => - lastRecord.lastDataOffset match { - case Some(offset) => batch.lastOffset == offset - case None => batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch + if(lastRecord.lastDataOffset.isPresent) { + batch.lastOffset == lastRecord.lastDataOffset.getAsLong + } else { + batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch } } } diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index da9f17c2c2219..e5d13a7f277b1 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -30,33 +30,12 @@ import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.file.{Files, NoSuchFileException, StandardOpenOption} +import java.util.OptionalLong import java.util.concurrent.ConcurrentSkipListMap import scala.collection.mutable.ListBuffer import scala.collection.{immutable, mutable} import scala.jdk.CollectionConverters._ -/** - * The last written record for a given producer. The last data offset may be undefined - * if the only log entry for a producer is a transaction marker. - */ -case class LastRecord(lastDataOffset: Option[Long], producerEpoch: Short) - - -private[log] case class TxnMetadata( - producerId: Long, - firstOffset: LogOffsetMetadata, - var lastOffset: Option[Long] = None -) { - def this(producerId: Long, firstOffset: Long) = this(producerId, new LogOffsetMetadata(firstOffset)) - - override def toString: String = { - "TxnMetadata(" + - s"producerId=$producerId, " + - s"firstOffset=$firstOffset, " + - s"lastOffset=$lastOffset)" - } -} - private[log] object ProducerStateEntry { private[log] val NumBatchesToRetain = 5 @@ -283,7 +262,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, case None if isTransactional => // Began a new transaction updatedEntry.currentTxnFirstOffset = Some(firstOffset) - transactions += TxnMetadata(producerId, firstOffsetMetadata) + transactions += new TxnMetadata(producerId, firstOffsetMetadata) case _ => // nothing to do } @@ -809,7 +788,7 @@ class ProducerStateManager( while (iterator.hasNext) { val txnEntry = iterator.next() val lastOffset = txnEntry.getValue.lastOffset - if (lastOffset.exists(_ < offset)) + if (lastOffset.isPresent && lastOffset.getAsLong < offset) iterator.remove() } } @@ -849,7 +828,7 @@ class ProducerStateManager( throw new IllegalArgumentException(s"Attempted to complete transaction $completedTxn on partition $topicPartition " + s"which was not started") - txnMetadata.lastOffset = Some(completedTxn.lastOffset) + txnMetadata.lastOffset = OptionalLong.of(completedTxn.lastOffset) unreplicatedTxns.put(completedTxn.firstOffset, txnMetadata) updateOldestTxnTimestamp() } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index c982e10a3ef82..1ec3085c2b04d 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 -import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator} +import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator} import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.record.BrokerCompressionType @@ -237,7 +237,7 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { */ @threadsafe class UnifiedLog(@volatile var logStartOffset: Long, - private[log] val localLog: LocalLog, + private val localLog: LocalLog, brokerTopicStats: BrokerTopicStats, val producerIdExpirationCheckIntervalMs: Int, @volatile var leaderEpochCache: Option[LeaderEpochFileCache], @@ -686,7 +686,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, private[log] def lastRecordsOfActiveProducers: Map[Long, LastRecord] = lock synchronized { producerStateManager.activeProducers.map { case (producerId, producerIdEntry) => val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0 ) Some(producerIdEntry.lastDataOffset) else None - val lastRecord = LastRecord(lastDataOffset, producerIdEntry.producerEpoch) + val lastRecord = new LastRecord(lastDataOffset, producerIdEntry.producerEpoch) producerId -> lastRecord } } diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 3e5ae15d211a0..8d3a6bc498dd3 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{MockTime, Utils} -import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata} +import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata, TxnMetadata} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.Mockito.{mock, when} @@ -255,8 +255,8 @@ class ProducerStateManagerTest { appendData(30L, 31L, secondAppend) assertEquals(2, secondAppend.startedTransactions.size) - assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.head) - assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.last) + assertEquals(new TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.head) + assertEquals(new TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.last) stateManager.update(secondAppend) stateManager.completeTxn(firstCompletedTxn.get) stateManager.completeTxn(secondCompletedTxn.get) diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java b/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java new file mode 100644 index 0000000000000..27df9cbf63711 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +/** + * The last written record for a given producer. The last data offset may be undefined + * if the only log entry for a producer is a transaction marker. + */ +public final class LastRecord { + public final OptionalLong lastDataOffset; + public final short producerEpoch; + + public LastRecord(OptionalLong lastDataOffset, short producerEpoch) { + this.lastDataOffset = lastDataOffset; + this.producerEpoch = producerEpoch; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LastRecord that = (LastRecord) o; + return producerEpoch == that.producerEpoch && Objects.equals(lastDataOffset, that.lastDataOffset); + } + + @Override + public int hashCode() { + return Objects.hash(lastDataOffset, producerEpoch); + } + + @Override + public String toString() { + return "LastRecord{" + + "lastDataOffset=" + lastDataOffset + + ", producerEpoch=" + producerEpoch + + '}'; + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java new file mode 100644 index 0000000000000..83ac8bb0747c3 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +public final class TxnMetadata { + public final long producerId; + public final LogOffsetMetadata firstOffset; + public OptionalLong lastOffset; + + public TxnMetadata(long producerId, + LogOffsetMetadata firstOffset, + OptionalLong lastOffset) { + this.producerId = producerId; + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + } + public TxnMetadata(long producerId, long firstOffset) { + this(producerId, new LogOffsetMetadata(firstOffset)); + } + + public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) { + this(producerId, firstOffset, OptionalLong.empty()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TxnMetadata that = (TxnMetadata) o; + return producerId == that.producerId && Objects.equals(firstOffset, that.firstOffset) && Objects.equals(lastOffset, that.lastOffset); + } + + @Override + public int hashCode() { + return Objects.hash(producerId, firstOffset, lastOffset); + } + + @Override + public String toString() { + return "TxnMetadata{" + + "producerId=" + producerId + + ", firstOffset=" + firstOffset + + ", lastOffset=" + lastOffset + + '}'; + } +} From c6710e00a0ab98ff372188a2c5174c8405ac10cb Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Tue, 20 Dec 2022 16:42:58 +0530 Subject: [PATCH 02/11] Moved BatchMetadata and ProducerStateEntry to storage module --- .../kafka/log/ProducerStateManager.scala | 156 +++--------------- .../src/main/scala/kafka/log/UnifiedLog.scala | 5 +- .../scala/kafka/tools/DumpLogSegments.scala | 2 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 9 +- .../kafka/log/ProducerStateManagerTest.scala | 6 +- .../server/log/internals/BatchMetadata.java | 71 ++++++++ .../log/internals/ProducerStateEntry.java | 138 ++++++++++++++++ 7 files changed, 249 insertions(+), 138 deletions(-) create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index e5d13a7f277b1..8bf94c2cf45a3 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -22,7 +22,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.types._ -import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch} +import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch} import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time} import org.apache.kafka.server.log.internals._ @@ -30,115 +30,17 @@ import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.file.{Files, NoSuchFileException, StandardOpenOption} -import java.util.OptionalLong import java.util.concurrent.ConcurrentSkipListMap +import java.util.{Collections, OptionalLong} import scala.collection.mutable.ListBuffer import scala.collection.{immutable, mutable} import scala.jdk.CollectionConverters._ -private[log] object ProducerStateEntry { - private[log] val NumBatchesToRetain = 5 - def empty(producerId: Long) = new ProducerStateEntry(producerId, - batchMetadata = mutable.Queue[BatchMetadata](), - producerEpoch = RecordBatch.NO_PRODUCER_EPOCH, - coordinatorEpoch = -1, - lastTimestamp = RecordBatch.NO_TIMESTAMP, - currentTxnFirstOffset = None) -} - -private[log] case class BatchMetadata(lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) { - def firstSeq: Int = DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta) - def firstOffset: Long = lastOffset - offsetDelta - - override def toString: String = { - "BatchMetadata(" + - s"firstSeq=$firstSeq, " + - s"lastSeq=$lastSeq, " + - s"firstOffset=$firstOffset, " + - s"lastOffset=$lastOffset, " + - s"timestamp=$timestamp)" - } -} - -// the batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the -// batch with the highest sequence is at the tail of the queue. We will retain at most ProducerStateEntry.NumBatchesToRetain -// elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. -private[log] class ProducerStateEntry(val producerId: Long, - val batchMetadata: mutable.Queue[BatchMetadata], - var producerEpoch: Short, - var coordinatorEpoch: Int, - var lastTimestamp: Long, - var currentTxnFirstOffset: Option[Long]) { - - def firstSeq: Int = if (isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.front.firstSeq - - def firstDataOffset: Long = if (isEmpty) -1L else batchMetadata.front.firstOffset - - def lastSeq: Int = if (isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.last.lastSeq - def lastDataOffset: Long = if (isEmpty) -1L else batchMetadata.last.lastOffset - def lastOffsetDelta : Int = if (isEmpty) 0 else batchMetadata.last.offsetDelta - def isEmpty: Boolean = batchMetadata.isEmpty - def addBatch(producerEpoch: Short, lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long): Unit = { - maybeUpdateProducerEpoch(producerEpoch) - addBatchMetadata(BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp)) - this.lastTimestamp = timestamp - } - - def maybeUpdateProducerEpoch(producerEpoch: Short): Boolean = { - if (this.producerEpoch != producerEpoch) { - batchMetadata.clear() - this.producerEpoch = producerEpoch - true - } else { - false - } - } - - private def addBatchMetadata(batch: BatchMetadata): Unit = { - if (batchMetadata.size == ProducerStateEntry.NumBatchesToRetain) - batchMetadata.dequeue() - batchMetadata.enqueue(batch) - } - - def update(nextEntry: ProducerStateEntry): Unit = { - maybeUpdateProducerEpoch(nextEntry.producerEpoch) - while (nextEntry.batchMetadata.nonEmpty) - addBatchMetadata(nextEntry.batchMetadata.dequeue()) - this.coordinatorEpoch = nextEntry.coordinatorEpoch - this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset - this.lastTimestamp = nextEntry.lastTimestamp - } - - def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = { - if (batch.producerEpoch != producerEpoch) - None - else - batchWithSequenceRange(batch.baseSequence, batch.lastSequence) - } - - // Return the batch metadata of the cached batch having the exact sequence range, if any. - def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[BatchMetadata] = { - val duplicate = batchMetadata.filter { metadata => - firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq - } - duplicate.headOption - } - - override def toString: String = { - "ProducerStateEntry(" + - s"producerId=$producerId, " + - s"producerEpoch=$producerEpoch, " + - s"currentTxnFirstOffset=$currentTxnFirstOffset, " + - s"coordinatorEpoch=$coordinatorEpoch, " + - s"lastTimestamp=$lastTimestamp, " + - s"batchMetadata=$batchMetadata" - } -} /** * This class is used to validate the records appended by a given producer before they are written to the log. @@ -162,12 +64,11 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, val origin: AppendOrigin) extends Logging { private val transactions = ListBuffer.empty[TxnMetadata] - private val updatedEntry = ProducerStateEntry.empty(producerId) - - updatedEntry.producerEpoch = currentEntry.producerEpoch - updatedEntry.coordinatorEpoch = currentEntry.coordinatorEpoch - updatedEntry.lastTimestamp = currentEntry.lastTimestamp - updatedEntry.currentTxnFirstOffset = currentEntry.currentTxnFirstOffset + private val updatedEntry = new ProducerStateEntry(producerId, Collections.emptyList(), + currentEntry.producerEpoch, + currentEntry.coordinatorEpoch, + currentEntry.lastTimestamp, + currentEntry.currentTxnFirstOffset) private def maybeValidateDataBatch(producerEpoch: Short, firstSeq: Int, offset: Long): Unit = { checkProducerEpoch(producerEpoch, offset) @@ -253,18 +154,17 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, maybeValidateDataBatch(epoch, firstSeq, firstOffset) updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - firstOffset).toInt, lastTimestamp) - updatedEntry.currentTxnFirstOffset match { - case Some(_) if !isTransactional => + if(updatedEntry.currentTxnFirstOffset.isPresent()) { + if (!isTransactional) { // Received a non-transactional message while a transaction is active throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId at " + s"offset $firstOffsetMetadata in partition $topicPartition") - - case None if isTransactional => - // Began a new transaction - updatedEntry.currentTxnFirstOffset = Some(firstOffset) + } + } else { + if(isTransactional) { + updatedEntry.currentTxnFirstOffset = OptionalLong.of(firstOffset) transactions += new TxnMetadata(producerId, firstOffsetMetadata) - - case _ => // nothing to do + } } } @@ -294,12 +194,12 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, // Only emit the `CompletedTxn` for non-empty transactions. A transaction marker // without any associated data will not have any impact on the last stable offset // and would not need to be reflected in the transaction index. - val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset => - new CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT) - } + val completedTxn = if(updatedEntry.currentTxnFirstOffset.isPresent) { + Some(new CompletedTxn(producerId, updatedEntry.currentTxnFirstOffset.getAsLong, offset, endTxnMarker.controlType == ControlRecordType.ABORT)) + } else None updatedEntry.maybeUpdateProducerEpoch(producerEpoch) - updatedEntry.currentTxnFirstOffset = None + updatedEntry.currentTxnFirstOffset = OptionalLong.empty() updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch updatedEntry.lastTimestamp = timestamp @@ -384,11 +284,11 @@ object ProducerStateManager { val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) val lastAppendedDataBatches = mutable.Queue.empty[BatchMetadata] if (offset >= 0) - lastAppendedDataBatches += BatchMetadata(seq, offset, offsetDelta, timestamp) + lastAppendedDataBatches += new BatchMetadata(seq, offset, offsetDelta, timestamp) - val newEntry = new ProducerStateEntry(producerId, lastAppendedDataBatches, producerEpoch, - coordinatorEpoch, timestamp, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None) - newEntry + val currentTxnFirstOffsetValue:OptionalLong = if (currentTxnFirstOffset >= 0) OptionalLong.of(currentTxnFirstOffset) else OptionalLong.empty() + new ProducerStateEntry(producerId, lastAppendedDataBatches.asJava, producerEpoch, + coordinatorEpoch, timestamp, currentTxnFirstOffsetValue) } } catch { case e: SchemaException => @@ -410,7 +310,7 @@ object ProducerStateManager { .set(OffsetDeltaField, entry.lastOffsetDelta) .set(TimestampField, entry.lastTimestamp) .set(CoordinatorEpochField, entry.coordinatorEpoch) - .set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L)) + .set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.orElse(-1L)) producerEntryStruct }.toArray struct.set(ProducerEntriesField, entriesArray) @@ -497,7 +397,7 @@ class ProducerStateManager( val lastTimestamp = oldestTxnLastTimestamp lastTimestamp > 0 && (currentTimeMs - lastTimestamp) > maxTransactionTimeoutMs + ProducerStateManager.LateTransactionBufferMs } - + def truncateFullyAndReloadSnapshots(): Unit = { info("Reloading the producer state snapshots") truncateFullyAndStartAt(0L) @@ -631,13 +531,11 @@ class ProducerStateManager( private[log] def loadProducerEntry(entry: ProducerStateEntry): Unit = { val producerId = entry.producerId producers.put(producerId, entry) - entry.currentTxnFirstOffset.foreach { offset => - ongoingTxns.put(offset, new TxnMetadata(producerId, offset)) - } + entry.currentTxnFirstOffset.ifPresent((offset: Long) => ongoingTxns.put(offset, new TxnMetadata(producerId, offset))) } private def isProducerExpired(currentTimeMs: Long, producerState: ProducerStateEntry): Boolean = - producerState.currentTxnFirstOffset.isEmpty && currentTimeMs - producerState.lastTimestamp >= producerStateManagerConfig.producerIdExpirationMs + !producerState.currentTxnFirstOffset.isPresent && currentTimeMs - producerState.lastTimestamp >= producerStateManagerConfig.producerIdExpirationMs /** * Expire any producer ids which have been idle longer than the configured maximum expiration timeout. @@ -677,7 +575,7 @@ class ProducerStateManager( } def prepareUpdate(producerId: Long, origin: AppendOrigin): ProducerAppendInfo = { - val currentEntry = lastEntry(producerId).getOrElse(ProducerStateEntry.empty(producerId)) + val currentEntry = lastEntry(producerId).getOrElse(new ProducerStateEntry(producerId)) new ProducerAppendInfo(topicPartition, producerId, currentEntry, origin) } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 1ec3085c2b04d..fee2485655674 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -49,6 +49,7 @@ import org.apache.kafka.server.record.BrokerCompressionType import scala.annotation.nowarn import scala.collection.mutable.ListBuffer import scala.collection.{Seq, immutable, mutable} +import scala.compat.java8.OptionConverters.RichOptionalGeneric import scala.jdk.CollectionConverters._ object LogAppendInfo { @@ -672,7 +673,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, .setLastSequence(state.lastSeq) .setLastTimestamp(state.lastTimestamp) .setCoordinatorEpoch(state.coordinatorEpoch) - .setCurrentTxnStartOffset(state.currentTxnFirstOffset.getOrElse(-1L)) + .setCurrentTxnStartOffset(state.currentTxnFirstOffset.orElse(-1L)) } }.toSeq } @@ -1083,7 +1084,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (origin == AppendOrigin.CLIENT) { val maybeLastEntry = producerStateManager.lastEntry(batch.producerId) - maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate => + maybeLastEntry.flatMap(_.findDuplicateBatch(batch).asScala).foreach { duplicate => return (updatedProducers, completedTxns.toList, Some(duplicate)) } } diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 5111320cc4b38..ec0fb7d2e7352 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -107,7 +107,7 @@ object DumpLogSegments { print(s"producerId: ${entry.producerId} producerEpoch: ${entry.producerEpoch} " + s"coordinatorEpoch: ${entry.coordinatorEpoch} currentTxnFirstOffset: ${entry.currentTxnFirstOffset} " + s"lastTimestamp: ${entry.lastTimestamp} ") - entry.batchMetadata.headOption.foreach { metadata => + entry.batchMetadata.asScala.headOption.foreach { metadata => print(s"firstSequence: ${metadata.firstSeq} lastSequence: ${metadata.lastSeq} " + s"lastOffset: ${metadata.lastOffset} offsetDelta: ${metadata.offsetDelta} timestamp: ${metadata.timestamp}") } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 85b4801a0ece6..d7b5a5c145249 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -17,6 +17,9 @@ package kafka.log import java.io.File +import java.util +import java.util.OptionalLong + import kafka.server.checkpoints.LeaderEpochCheckpoint import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.utils.TestUtils @@ -25,7 +28,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{MockTime, Time, Utils} -import org.apache.kafka.server.log.internals.LogConfig +import org.apache.kafka.server.log.internals.{BatchMetadata, LogConfig, ProducerStateEntry} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -356,8 +359,8 @@ class LogSegmentTest { // recover again, but this time assuming the transaction from pid2 began on a previous segment stateManager = newProducerStateManager() stateManager.loadProducerEntry(new ProducerStateEntry(pid2, - mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, - 0, RecordBatch.NO_TIMESTAMP, Some(75L))) + util.Arrays.asList(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, + 0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L))) segment.recover(stateManager) assertEquals(108L, stateManager.mapEndOffset) diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 8d3a6bc498dd3..0e1e23cc8d815 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{MockTime, Utils} -import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata, TxnMetadata} +import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata, ProducerStateEntry, TxnMetadata} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.Mockito.{mock, when} @@ -196,7 +196,7 @@ class ProducerStateManagerTest { val producerEpoch = 0.toShort val offset = 992342L val seq = 0 - val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), AppendOrigin.CLIENT) + val producerAppendInfo = new ProducerAppendInfo(partition, producerId, new ProducerStateEntry(producerId), AppendOrigin.CLIENT) val firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224) producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(), @@ -368,7 +368,7 @@ class ProducerStateManagerTest { val producerAppendInfo = new ProducerAppendInfo( partition, producerId, - ProducerStateEntry.empty(producerId), + new ProducerStateEntry(producerId), AppendOrigin.CLIENT ) val firstOffsetMetadata = new LogOffsetMetadata(startOffset, segmentBaseOffset, 50 * relativeOffset) diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java b/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java new file mode 100644 index 0000000000000..9d341c24e992e --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.DefaultRecordBatch; + +import java.util.Objects; + +public class BatchMetadata { + + public final int lastSeq; + public final long lastOffset; + public final int offsetDelta; + public final long timestamp; + + public BatchMetadata( + int lastSeq, + long lastOffset, + int offsetDelta, + long timestamp) { + this.lastSeq = lastSeq; + this.lastOffset = lastOffset; + this.offsetDelta = offsetDelta; + this.timestamp = timestamp; + } + + public int firstSeq() { + return DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta); + } + + public long firstOffset() { + return lastOffset - offsetDelta; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + BatchMetadata that = (BatchMetadata) o; + return lastSeq == that.lastSeq && lastOffset == that.lastOffset && offsetDelta == that.offsetDelta && timestamp == that.timestamp; + } + + @Override + public int hashCode() { + return Objects.hash(lastSeq, lastOffset, offsetDelta, timestamp); + } + + @Override + public String toString() { + return "BatchMetadata{" + + "lastSeq=" + lastSeq + + ", lastOffset=" + lastOffset + + ", offsetDelta=" + offsetDelta + + ", timestamp=" + timestamp + + '}'; + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java new file mode 100644 index 0000000000000..dc154551dc755 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.RecordBatch; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Stream; + +/** + * The batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the + * batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NumBatchesToRetain} + * elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. + */ +public class ProducerStateEntry { + public static final int NumBatchesToRetain = 5; + public final long producerId; + private final List batchMetadata; + public short producerEpoch; + public int coordinatorEpoch; + public long lastTimestamp; + public OptionalLong currentTxnFirstOffset; + + public ProducerStateEntry(long producerId) { + this(producerId, Collections.emptyList(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); + } + + public ProducerStateEntry(long producerId, List batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { + this.producerId = producerId; + this.batchMetadata = batchMetadata; + this.producerEpoch = producerEpoch; + this.coordinatorEpoch = coordinatorEpoch; + this.lastTimestamp = lastTimestamp; + this.currentTxnFirstOffset = currentTxnFirstOffset; + } + + public int firstSeq() { + return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.get(0).firstSeq(); + } + + + public long firstDataOffset() { + return isEmpty() ? -1L : batchMetadata.get(0).firstOffset(); + } + + public int lastSeq() { + return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.get(batchMetadata.size() - 1).lastSeq; + } + + public long lastDataOffset() { + return isEmpty() ? -1L : batchMetadata.get(batchMetadata.size() - 1).lastOffset; + } + + public int lastOffsetDelta() { + return isEmpty() ? 0 : batchMetadata.get(batchMetadata.size() - 1).offsetDelta; + } + + public boolean isEmpty() { + return batchMetadata.isEmpty(); + } + + public void addBatch(short producerEpoch, int lastSeq, long lastOffset, int offsetDelta, long timestamp) { + maybeUpdateProducerEpoch(producerEpoch); + addBatchMetadata(new BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp)); + this.lastTimestamp = timestamp; + } + + public boolean maybeUpdateProducerEpoch(short producerEpoch) { + if (this.producerEpoch != producerEpoch) { + batchMetadata.clear(); + this.producerEpoch = producerEpoch; + return true; + } else { + return false; + } + } + + private void addBatchMetadata(BatchMetadata batch) { + if (batchMetadata.size() == ProducerStateEntry.NumBatchesToRetain) batchMetadata.remove(0); + batchMetadata.add(batch); + } + + public void update(ProducerStateEntry nextEntry) { + maybeUpdateProducerEpoch(nextEntry.producerEpoch); + while (!nextEntry.batchMetadata.isEmpty()) addBatchMetadata(nextEntry.batchMetadata.remove(0)); + this.coordinatorEpoch = nextEntry.coordinatorEpoch; + this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset; + this.lastTimestamp = nextEntry.lastTimestamp; + } + + public Optional findDuplicateBatch(RecordBatch batch) { + if (batch.producerEpoch() != producerEpoch) return Optional.empty(); + else return batchWithSequenceRange(batch.baseSequence(), batch.lastSequence()); + } + + // Return the batch metadata of the cached batch having the exact sequence range, if any. + Optional batchWithSequenceRange(int firstSeq, int lastSeq) { + Stream duplicate = batchMetadata.stream().filter(metadata -> firstSeq == metadata.firstSeq() && lastSeq == metadata.lastSeq); + return duplicate.findFirst(); + } + + public List batchMetadata() { + return Collections.unmodifiableList(batchMetadata); + } + + public short producerEpoch() { + return producerEpoch; + } + + public int coordinatorEpoch() { + return coordinatorEpoch; + } + + public long lastTimestamp() { + return lastTimestamp; + } + + public OptionalLong currentTxnFirstOffset() { + return currentTxnFirstOffset; + } +} \ No newline at end of file From 472998b739f8ca4017412562980c8c2f9e3c44f5 Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Thu, 22 Dec 2022 12:44:02 +0530 Subject: [PATCH 03/11] Moved ProducerAppendInfo to storage module. Minor cleanup in classes move to storage module. Resolved upstream trunk merge conflicts. --- .../src/main/scala/kafka/log/LogCleaner.scala | 2 +- .../src/main/scala/kafka/log/LogSegment.scala | 3 +- .../kafka/log/ProducerStateManager.scala | 193 +------------- .../src/main/scala/kafka/log/UnifiedLog.scala | 6 +- .../kafka/log/ProducerStateManagerTest.scala | 37 +-- .../server/log/internals/BatchMetadata.java | 19 +- .../server/log/internals/LastRecord.java | 13 +- .../log/internals/ProducerAppendInfo.java | 238 ++++++++++++++++++ .../log/internals/ProducerStateEntry.java | 35 +-- .../server/log/internals/TxnMetadata.java | 15 +- 10 files changed, 323 insertions(+), 238 deletions(-) create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 2ca5c2e85a173..83d1ff7475453 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -681,7 +681,7 @@ private[log] class Cleaner(val id: Int, // 3) The last entry in the log is a transaction marker. We retain this marker since it has the // last producer epoch, which is needed to ensure fencing. lastRecordsOfActiveProducers.get(batch.producerId).exists { lastRecord => - if(lastRecord.lastDataOffset.isPresent) { + if (lastRecord.lastDataOffset.isPresent) { batch.lastOffset == lastRecord.lastDataOffset.getAsLong } else { batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index d289df2ec47ee..53b51cb16ac63 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult} import java.util.Optional +import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ import scala.math._ @@ -249,7 +250,7 @@ class LogSegment private[log] (val log: FileRecords, if (batch.hasProducerId) { val producerId = batch.producerId val appendInfo = producerStateManager.prepareUpdate(producerId, origin = AppendOrigin.REPLICATION) - val maybeCompletedTxn = appendInfo.append(batch, firstOffsetMetadataOpt = None) + val maybeCompletedTxn = appendInfo.append(batch, Optional.empty()).asScala producerStateManager.update(appendInfo) maybeCompletedTxn.foreach { completedTxn => val lastStableOffset = producerStateManager.lastStableOffset(completedTxn) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 8bf94c2cf45a3..2782ab54f6218 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -20,9 +20,8 @@ import kafka.server.{BrokerReconfigurable, KafkaConfig} import kafka.utils.{Logging, nonthreadsafe, threadsafe} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.ConfigException -import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.types._ -import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch} +import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time} import org.apache.kafka.server.log.internals._ @@ -30,9 +29,8 @@ import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.file.{Files, NoSuchFileException, StandardOpenOption} +import java.util.OptionalLong import java.util.concurrent.ConcurrentSkipListMap -import java.util.{Collections, OptionalLong} -import scala.collection.mutable.ListBuffer import scala.collection.{immutable, mutable} import scala.jdk.CollectionConverters._ @@ -42,186 +40,7 @@ import scala.jdk.CollectionConverters._ -/** - * This class is used to validate the records appended by a given producer before they are written to the log. - * It is initialized with the producer's state after the last successful append, and transitively validates the - * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata - * as the incoming records are validated. - * - * @param producerId The id of the producer appending to the log - * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of - * the most recent appends made by the producer. Validation of the first incoming append will - * be made against the latest append in the current entry. New appends will replace older appends - * in the current entry so that the space overhead is constant. - * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset - * commits, which originate from the group coordinator, do not have sequence numbers and therefore - * only producer epoch validation is done. Appends which come through replication are not validated - * (we assume the validation has already been done) and appends from clients require full validation. - */ -private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, - val producerId: Long, - val currentEntry: ProducerStateEntry, - val origin: AppendOrigin) extends Logging { - - private val transactions = ListBuffer.empty[TxnMetadata] - private val updatedEntry = new ProducerStateEntry(producerId, Collections.emptyList(), - currentEntry.producerEpoch, - currentEntry.coordinatorEpoch, - currentEntry.lastTimestamp, - currentEntry.currentTxnFirstOffset) - - private def maybeValidateDataBatch(producerEpoch: Short, firstSeq: Int, offset: Long): Unit = { - checkProducerEpoch(producerEpoch, offset) - if (origin == AppendOrigin.CLIENT) { - checkSequence(producerEpoch, firstSeq, offset) - } - } - - private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = { - if (producerEpoch < updatedEntry.producerEpoch) { - val message = s"Epoch of producer $producerId at offset $offset in $topicPartition is $producerEpoch, " + - s"which is smaller than the last seen epoch ${updatedEntry.producerEpoch}" - - if (origin == AppendOrigin.REPLICATION) { - warn(message) - } else { - // Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the - // producer send response callback to differentiate from the former fatal exception, - // letting client abort the ongoing transaction and retry. - throw new InvalidProducerEpochException(message) - } - } - } - - private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: Long): Unit = { - if (producerEpoch != updatedEntry.producerEpoch) { - if (appendFirstSeq != 0) { - if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) { - throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch of producer $producerId " + - s"at offset $offset in partition $topicPartition: $producerEpoch (request epoch), $appendFirstSeq (seq. number), " + - s"${updatedEntry.producerEpoch} (current producer epoch)") - } - } - } else { - val currentLastSeq = if (!updatedEntry.isEmpty) - updatedEntry.lastSeq - else if (producerEpoch == currentEntry.producerEpoch) - currentEntry.lastSeq - else - RecordBatch.NO_SEQUENCE - - // If there is no current producer epoch (possibly because all producer records have been deleted due to - // retention or the DeleteRecords API) accept writes with any sequence number - if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) { - throw new OutOfOrderSequenceException(s"Out of order sequence number for producer $producerId at " + - s"offset $offset in partition $topicPartition: $appendFirstSeq (incoming seq. number), " + - s"$currentLastSeq (current end sequence number)") - } - } - } - - private def inSequence(lastSeq: Int, nextSeq: Int): Boolean = { - nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue) - } - - def append(batch: RecordBatch, firstOffsetMetadataOpt: Option[LogOffsetMetadata]): Option[CompletedTxn] = { - if (batch.isControlBatch) { - val recordIterator = batch.iterator - if (recordIterator.hasNext) { - val record = recordIterator.next() - val endTxnMarker = EndTransactionMarker.deserialize(record) - appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp) - } else { - // An empty control batch means the entire transaction has been cleaned from the log, so no need to append - None - } - } else { - val firstOffsetMetadata = firstOffsetMetadataOpt.getOrElse(new LogOffsetMetadata(batch.baseOffset)) - appendDataBatch(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, - firstOffsetMetadata, batch.lastOffset, batch.isTransactional) - None - } - } - def appendDataBatch(epoch: Short, - firstSeq: Int, - lastSeq: Int, - lastTimestamp: Long, - firstOffsetMetadata: LogOffsetMetadata, - lastOffset: Long, - isTransactional: Boolean): Unit = { - val firstOffset = firstOffsetMetadata.messageOffset - maybeValidateDataBatch(epoch, firstSeq, firstOffset) - updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - firstOffset).toInt, lastTimestamp) - - if(updatedEntry.currentTxnFirstOffset.isPresent()) { - if (!isTransactional) { - // Received a non-transactional message while a transaction is active - throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId at " + - s"offset $firstOffsetMetadata in partition $topicPartition") - } - } else { - if(isTransactional) { - updatedEntry.currentTxnFirstOffset = OptionalLong.of(firstOffset) - transactions += new TxnMetadata(producerId, firstOffsetMetadata) - } - } - } - - private def checkCoordinatorEpoch(endTxnMarker: EndTransactionMarker, offset: Long): Unit = { - if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch) { - if (origin == AppendOrigin.REPLICATION) { - info(s"Detected invalid coordinator epoch for producerId $producerId at " + - s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " + - s"is older than previously known coordinator epoch ${updatedEntry.coordinatorEpoch}") - } else { - throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch for producerId $producerId at " + - s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " + - s"(zombie), ${updatedEntry.coordinatorEpoch} (current)") - } - } - } - - def appendEndTxnMarker( - endTxnMarker: EndTransactionMarker, - producerEpoch: Short, - offset: Long, - timestamp: Long - ): Option[CompletedTxn] = { - checkProducerEpoch(producerEpoch, offset) - checkCoordinatorEpoch(endTxnMarker, offset) - - // Only emit the `CompletedTxn` for non-empty transactions. A transaction marker - // without any associated data will not have any impact on the last stable offset - // and would not need to be reflected in the transaction index. - val completedTxn = if(updatedEntry.currentTxnFirstOffset.isPresent) { - Some(new CompletedTxn(producerId, updatedEntry.currentTxnFirstOffset.getAsLong, offset, endTxnMarker.controlType == ControlRecordType.ABORT)) - } else None - - updatedEntry.maybeUpdateProducerEpoch(producerEpoch) - updatedEntry.currentTxnFirstOffset = OptionalLong.empty() - updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch - updatedEntry.lastTimestamp = timestamp - - completedTxn - } - - def toEntry: ProducerStateEntry = updatedEntry - - def startedTransactions: List[TxnMetadata] = transactions.toList - - override def toString: String = { - "ProducerAppendInfo(" + - s"producerId=$producerId, " + - s"producerEpoch=${updatedEntry.producerEpoch}, " + - s"firstSequence=${updatedEntry.firstSeq}, " + - s"lastSequence=${updatedEntry.lastSeq}, " + - s"currentTxnFirstOffset=${updatedEntry.currentTxnFirstOffset}, " + - s"coordinatorEpoch=${updatedEntry.coordinatorEpoch}, " + - s"lastTimestamp=${updatedEntry.lastTimestamp}, " + - s"startedTransactions=$transactions)" - } -} object ProducerStateManager { val LateTransactionBufferMs = 5 * 60 * 1000 @@ -282,12 +101,12 @@ object ProducerStateManager { val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) - val lastAppendedDataBatches = mutable.Queue.empty[BatchMetadata] + val lastAppendedDataBatches = new java.util.ArrayList[BatchMetadata] if (offset >= 0) - lastAppendedDataBatches += new BatchMetadata(seq, offset, offsetDelta, timestamp) + lastAppendedDataBatches.add(new BatchMetadata(seq, offset, offsetDelta, timestamp)) val currentTxnFirstOffsetValue:OptionalLong = if (currentTxnFirstOffset >= 0) OptionalLong.of(currentTxnFirstOffset) else OptionalLong.empty() - new ProducerStateEntry(producerId, lastAppendedDataBatches.asJava, producerEpoch, + new ProducerStateEntry(producerId, lastAppendedDataBatches, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetValue) } } catch { @@ -597,7 +416,7 @@ class ProducerStateManager( producers.put(appendInfo.producerId, updatedEntry) } - appendInfo.startedTransactions.foreach { txn => + appendInfo.startedTransactions.asScala.foreach { txn => ongoingTxns.put(txn.firstOffset.messageOffset, txn) } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index fee2485655674..10fd0e3e59617 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -21,7 +21,7 @@ import com.yammer.metrics.core.MetricName import java.io.{File, IOException} import java.nio.file.Files -import java.util.Optional +import java.util.{Optional, OptionalLong} import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, TimeUnit} import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.log.remote.RemoteLogManager @@ -42,14 +42,14 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 -import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator} +import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator} import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.record.BrokerCompressionType import scala.annotation.nowarn import scala.collection.mutable.ListBuffer import scala.collection.{Seq, immutable, mutable} -import scala.compat.java8.OptionConverters.RichOptionalGeneric +import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ object LogAppendInfo { diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 0e1e23cc8d815..f4dd5ec646067 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -21,7 +21,7 @@ import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.file.{Files, StandardOpenOption} -import java.util.Collections +import java.util.{Collections, Optional} import java.util.concurrent.atomic.AtomicInteger import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition @@ -29,11 +29,14 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{MockTime, Utils} -import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata, ProducerStateEntry, TxnMetadata} +import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata, ProducerAppendInfo, ProducerStateEntry, TxnMetadata} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.Mockito.{mock, when} +import scala.compat.java8.OptionConverters.RichOptionalGeneric +import scala.jdk.CollectionConverters._ + class ProducerStateManagerTest { private var logDir: File = _ private var stateManager: ProducerStateManager = _ @@ -130,7 +133,7 @@ class ProducerStateManagerTest { val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.REPLICATION) // Sequence number wrap around appendInfo.appendDataBatch(epoch, Int.MaxValue - 10, 9, time.milliseconds(), - new LogOffsetMetadata(2000L), 2020L, isTransactional = false) + new LogOffsetMetadata(2000L), 2020L, false) assertEquals(None, stateManager.lastEntry(producerId)) stateManager.update(appendInfo) assertTrue(stateManager.lastEntry(producerId).isDefined) @@ -200,7 +203,7 @@ class ProducerStateManagerTest { val firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224) producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(), - firstOffsetMetadata, offset, isTransactional = true) + firstOffsetMetadata, offset, true) stateManager.update(producerAppendInfo) assertEquals(Some(firstOffsetMetadata), stateManager.firstUnstableOffset) @@ -218,7 +221,7 @@ class ProducerStateManagerTest { appendInfo: ProducerAppendInfo ): Option[CompletedTxn] = { appendInfo.appendEndTxnMarker(new EndTransactionMarker(recordType, coordinatorEpoch), - producerEpoch, offset, time.milliseconds()) + producerEpoch, offset, time.milliseconds()).asScala } def appendData( @@ -228,14 +231,14 @@ class ProducerStateManagerTest { ): Unit = { val count = (endOffset - startOffset).toInt appendInfo.appendDataBatch(producerEpoch, seq.get(), seq.addAndGet(count), time.milliseconds(), - new LogOffsetMetadata(startOffset), endOffset, isTransactional = true) + new LogOffsetMetadata(startOffset), endOffset, true) seq.incrementAndGet() } // Start one transaction in a separate append val firstAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT) appendData(16L, 20L, firstAppend) - assertEquals(new TxnMetadata(producerId, 16L), firstAppend.startedTransactions.head) + assertEquals(new TxnMetadata(producerId, 16L), firstAppend.startedTransactions.asScala.head) stateManager.update(firstAppend) stateManager.onHighWatermarkUpdated(21L) assertEquals(Some(new LogOffsetMetadata(16L)), stateManager.firstUnstableOffset) @@ -255,8 +258,8 @@ class ProducerStateManagerTest { appendData(30L, 31L, secondAppend) assertEquals(2, secondAppend.startedTransactions.size) - assertEquals(new TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.head) - assertEquals(new TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.last) + assertEquals(new TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.asScala.head) + assertEquals(new TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.asScala.last) stateManager.update(secondAppend) stateManager.completeTxn(firstCompletedTxn.get) stateManager.completeTxn(secondCompletedTxn.get) @@ -373,7 +376,7 @@ class ProducerStateManagerTest { ) val firstOffsetMetadata = new LogOffsetMetadata(startOffset, segmentBaseOffset, 50 * relativeOffset) producerAppendInfo.appendDataBatch(producerEpoch, 0, 0, time.milliseconds(), - firstOffsetMetadata, startOffset, isTransactional = true) + firstOffsetMetadata, startOffset, true) stateManager.update(producerAppendInfo) } @@ -417,14 +420,14 @@ class ProducerStateManagerTest { val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT) appendInfo.appendDataBatch(producerEpoch, 0, 5, time.milliseconds(), - new LogOffsetMetadata(15L), 20L, isTransactional = false) + new LogOffsetMetadata(15L), 20L, false) assertEquals(None, stateManager.lastEntry(producerId)) stateManager.update(appendInfo) assertTrue(stateManager.lastEntry(producerId).isDefined) val nextAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT) nextAppendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(), - new LogOffsetMetadata(26L), 30L, isTransactional = false) + new LogOffsetMetadata(26L), 30L, false) assertTrue(stateManager.lastEntry(producerId).isDefined) var lastEntry = stateManager.lastEntry(producerId).get @@ -448,7 +451,7 @@ class ProducerStateManagerTest { val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT) appendInfo.appendDataBatch(producerEpoch, 1, 5, time.milliseconds(), - new LogOffsetMetadata(16L), 20L, isTransactional = true) + new LogOffsetMetadata(16L), 20L, true) var lastEntry = appendInfo.toEntry assertEquals(producerEpoch, lastEntry.producerEpoch) assertEquals(1, lastEntry.firstSeq) @@ -459,7 +462,7 @@ class ProducerStateManagerTest { assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) appendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(), - new LogOffsetMetadata(26L), 30L, isTransactional = true) + new LogOffsetMetadata(26L), 30L, true) lastEntry = appendInfo.toEntry assertEquals(producerEpoch, lastEntry.producerEpoch) assertEquals(1, lastEntry.firstSeq) @@ -471,7 +474,7 @@ class ProducerStateManagerTest { val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) val completedTxnOpt = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds()) - assertTrue(completedTxnOpt.isDefined) + assertTrue(completedTxnOpt.isPresent) val completedTxn = completedTxnOpt.get assertEquals(producerId, completedTxn.producerId) @@ -1101,7 +1104,7 @@ class ProducerStateManagerTest { timestamp: Long = time.milliseconds()): Option[CompletedTxn] = { val producerAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.COORDINATOR) val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch) - val completedTxnOpt = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp) + val completedTxnOpt = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp).asScala mapping.update(producerAppendInfo) completedTxnOpt.foreach(mapping.completeTxn) mapping.updateMapEndOffset(offset + 1) @@ -1129,7 +1132,7 @@ class ProducerStateManagerTest { batch: RecordBatch, origin: AppendOrigin): Unit = { val producerAppendInfo = stateManager.prepareUpdate(producerId, origin) - producerAppendInfo.append(batch, firstOffsetMetadataOpt = None) + producerAppendInfo.append(batch, Optional.empty()) stateManager.update(producerAppendInfo) stateManager.updateMapEndOffset(offset + 1) } diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java b/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java index 9d341c24e992e..e59b25eeca31f 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java @@ -18,8 +18,6 @@ import org.apache.kafka.common.record.DefaultRecordBatch; -import java.util.Objects; - public class BatchMetadata { public final int lastSeq; @@ -50,22 +48,31 @@ public long firstOffset() { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; + BatchMetadata that = (BatchMetadata) o; - return lastSeq == that.lastSeq && lastOffset == that.lastOffset && offsetDelta == that.offsetDelta && timestamp == that.timestamp; + + if (lastSeq != that.lastSeq) return false; + if (lastOffset != that.lastOffset) return false; + if (offsetDelta != that.offsetDelta) return false; + return timestamp == that.timestamp; } @Override public int hashCode() { - return Objects.hash(lastSeq, lastOffset, offsetDelta, timestamp); + int result = lastSeq; + result = 31 * result + (int) (lastOffset ^ (lastOffset >>> 32)); + result = 31 * result + offsetDelta; + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; } @Override public String toString() { - return "BatchMetadata{" + + return "BatchMetadata(" + "lastSeq=" + lastSeq + ", lastOffset=" + lastOffset + ", offsetDelta=" + offsetDelta + ", timestamp=" + timestamp + - '}'; + ')'; } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java b/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java index 27df9cbf63711..158bcaa58bfd3 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java @@ -36,20 +36,25 @@ public LastRecord(OptionalLong lastDataOffset, short producerEpoch) { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; + LastRecord that = (LastRecord) o; - return producerEpoch == that.producerEpoch && Objects.equals(lastDataOffset, that.lastDataOffset); + + if (producerEpoch != that.producerEpoch) return false; + return Objects.equals(lastDataOffset, that.lastDataOffset); } @Override public int hashCode() { - return Objects.hash(lastDataOffset, producerEpoch); + int result = lastDataOffset != null ? lastDataOffset.hashCode() : 0; + result = 31 * result + (int) producerEpoch; + return result; } @Override public String toString() { - return "LastRecord{" + + return "LastRecord(" + "lastDataOffset=" + lastDataOffset + ", producerEpoch=" + producerEpoch + - '}'; + ')'; } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java new file mode 100644 index 0000000000000..fae8a1bfa5bd3 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { + private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); + private final TopicPartition topicPartition; + public final long producerId; + private final ProducerStateEntry currentEntry; + private final AppendOrigin origin; + + private final List transactions = new ArrayList<>(); + private final ProducerStateEntry updatedEntry; + + /** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ + public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { + this.topicPartition = topicPartition; + this.producerId = producerId; + this.currentEntry = currentEntry; + this.origin = origin; + + updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), + currentEntry.coordinatorEpoch, + currentEntry.lastTimestamp, + currentEntry.currentTxnFirstOffset); + } + + private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { + checkProducerEpoch(producerEpoch, offset); + if (origin == AppendOrigin.CLIENT) { + checkSequence(producerEpoch, firstSeq, offset); + } + } + + private void checkProducerEpoch(short producerEpoch, long offset) { + if (producerEpoch < updatedEntry.producerEpoch()) { + String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + + "which is smaller than the last seen epoch %d", producerId, offset, topicPartition, producerEpoch, updatedEntry.producerEpoch()); + + if (origin == AppendOrigin.REPLICATION) { + log.warn(message); + } else { + // Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the + // producer send response callback to differentiate from the former fatal exception, + // letting client abort the ongoing transaction and retry. + throw new InvalidProducerEpochException(message); + } + } + } + + private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) { + if (producerEpoch != updatedEntry.producerEpoch()) { + if (appendFirstSeq != 0) { + if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { + throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); + } + } + } else { + int currentLastSeq; + if (!updatedEntry.isEmpty()) + currentLastSeq = updatedEntry.lastSeq(); + else if (producerEpoch == currentEntry.producerEpoch()) + currentLastSeq = currentEntry.lastSeq(); + else + currentLastSeq = RecordBatch.NO_SEQUENCE; + + // If there is no current producer epoch (possibly because all producer records have been deleted due to + // retention or the DeleteRecords API) accept writes with any sequence number + if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) { + throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " + + "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + + " (incoming seq. number), " + currentLastSeq + " (current end sequence number)"); + } + } + } + + private boolean inSequence(int lastSeq, int nextSeq) { + return nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Integer.MAX_VALUE); + } + + public Optional append(RecordBatch batch, Optional firstOffsetMetadataOpt) { + if (batch.isControlBatch()) { + Iterator recordIterator = batch.iterator(); + if (recordIterator.hasNext()) { + Record record = recordIterator.next(); + EndTransactionMarker endTxnMarker = EndTransactionMarker.deserialize(record); + return appendEndTxnMarker(endTxnMarker, batch.producerEpoch(), batch.baseOffset(), record.timestamp()); + } else { + // An empty control batch means the entire transaction has been cleaned from the log, so no need to append + return Optional.empty(); + } + } else { + LogOffsetMetadata firstOffsetMetadata = firstOffsetMetadataOpt.orElse(new LogOffsetMetadata(batch.baseOffset())); + appendDataBatch(batch.producerEpoch(), batch.baseSequence(), batch.lastSequence(), batch.maxTimestamp(), + firstOffsetMetadata, batch.lastOffset(), batch.isTransactional()); + return Optional.empty(); + } + } + + public void appendDataBatch(short epoch, + int firstSeq, + int lastSeq, + long lastTimestamp, + LogOffsetMetadata firstOffsetMetadata, + long lastOffset, + boolean isTransactional) { + long firstOffset = firstOffsetMetadata.messageOffset; + maybeValidateDataBatch(epoch, firstSeq, firstOffset); + updatedEntry.addBatch(epoch, lastSeq, lastOffset, (int) (lastOffset - firstOffset), lastTimestamp); + + OptionalLong currentTxnFirstOffset = updatedEntry.currentTxnFirstOffset; + if (currentTxnFirstOffset.isPresent()) { + if (!isTransactional) + // Received a non-transactional message while a transaction is active + throw new InvalidTxnStateException("Expected transactional write from producer " + producerId + " at " + + "offset " + firstOffsetMetadata + " in partition " + topicPartition); + } else { + if (isTransactional) { + // Began a new transaction + updatedEntry.currentTxnFirstOffset = OptionalLong.of(firstOffset); + transactions.add(new TxnMetadata(producerId, firstOffsetMetadata)); + } + } + } + + private void checkCoordinatorEpoch(EndTransactionMarker endTxnMarker, long offset) { + if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch()) { + if (origin == AppendOrigin.REPLICATION) { + log.info("Detected invalid coordinator epoch for producerId " + producerId + " at " + + "offset " + offset + " in partition $topicPartition: " + endTxnMarker.coordinatorEpoch() + + " is older than previously known coordinator epoch " + updatedEntry.coordinatorEpoch); + } else { + throw new TransactionCoordinatorFencedException("Invalid coordinator epoch for producerId " + producerId + " at " + + "offset " + offset + " in partition " + topicPartition + ": " + endTxnMarker.coordinatorEpoch() + + " (zombie), " + updatedEntry.coordinatorEpoch + " (current)"); + } + } + } + + public Optional appendEndTxnMarker( + EndTransactionMarker endTxnMarker, + short producerEpoch, + long offset, + long timestamp) { + checkProducerEpoch(producerEpoch, offset); + checkCoordinatorEpoch(endTxnMarker, offset); + + // Only emit the `CompletedTxn` for non-empty transactions. A transaction marker + // without any associated data will not have any impact on the last stable offset + // and would not need to be reflected in the transaction index. + Optional completedTxn = updatedEntry.currentTxnFirstOffset.isPresent() ? + Optional.of(new CompletedTxn(producerId, updatedEntry.currentTxnFirstOffset.getAsLong(), offset, + endTxnMarker.controlType() == ControlRecordType.ABORT)) + : Optional.empty(); + + updatedEntry.maybeUpdateProducerEpoch(producerEpoch); + updatedEntry.currentTxnFirstOffset = OptionalLong.empty(); + updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch(); + updatedEntry.lastTimestamp = timestamp; + + return completedTxn; + } + + public ProducerStateEntry toEntry() { + return updatedEntry; + } + + public List startedTransactions() { + return Collections.unmodifiableList(transactions); + } + + @Override + public String toString() { + return "ProducerAppendInfo(" + + "topicPartition=" + topicPartition + + ", producerId=" + producerId + + ", currentEntry=" + currentEntry + + ", origin=" + origin + + ", transactions=" + transactions + + ", updatedEntry=" + updatedEntry + + ')'; + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java index dc154551dc755..e103dca8ee1d2 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.record.RecordBatch; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -26,20 +27,24 @@ /** * The batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the - * batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NumBatchesToRetain} + * batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} * elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. */ public class ProducerStateEntry { - public static final int NumBatchesToRetain = 5; + public static final int NUM_BATCHES_TO_RETAIN = 5; public final long producerId; private final List batchMetadata; - public short producerEpoch; + private short producerEpoch; public int coordinatorEpoch; public long lastTimestamp; public OptionalLong currentTxnFirstOffset; public ProducerStateEntry(long producerId) { - this(producerId, Collections.emptyList(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); + this(producerId, new ArrayList<>(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); + } + + public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { + this(producerId, new ArrayList<>(), producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); } public ProducerStateEntry(long producerId, List batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { @@ -93,7 +98,7 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } private void addBatchMetadata(BatchMetadata batch) { - if (batchMetadata.size() == ProducerStateEntry.NumBatchesToRetain) batchMetadata.remove(0); + if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) batchMetadata.remove(0); batchMetadata.add(batch); } @@ -124,15 +129,15 @@ public short producerEpoch() { return producerEpoch; } - public int coordinatorEpoch() { - return coordinatorEpoch; - } - - public long lastTimestamp() { - return lastTimestamp; - } - - public OptionalLong currentTxnFirstOffset() { - return currentTxnFirstOffset; + @Override + public String toString() { + return "ProducerStateEntry(" + + "producerId=" + producerId + + ", batchMetadata=" + batchMetadata + + ", producerEpoch=" + producerEpoch + + ", coordinatorEpoch=" + coordinatorEpoch + + ", lastTimestamp=" + lastTimestamp + + ", currentTxnFirstOffset=" + currentTxnFirstOffset + + ')'; } } \ No newline at end of file diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java index 83ac8bb0747c3..b0e7d84f46388 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java @@ -43,21 +43,28 @@ public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; + TxnMetadata that = (TxnMetadata) o; - return producerId == that.producerId && Objects.equals(firstOffset, that.firstOffset) && Objects.equals(lastOffset, that.lastOffset); + + if (producerId != that.producerId) return false; + if (!Objects.equals(firstOffset, that.firstOffset)) return false; + return Objects.equals(lastOffset, that.lastOffset); } @Override public int hashCode() { - return Objects.hash(producerId, firstOffset, lastOffset); + int result = (int) (producerId ^ (producerId >>> 32)); + result = 31 * result + (firstOffset != null ? firstOffset.hashCode() : 0); + result = 31 * result + (lastOffset != null ? lastOffset.hashCode() : 0); + return result; } @Override public String toString() { - return "TxnMetadata{" + + return "TxnMetadata(" + "producerId=" + producerId + ", firstOffset=" + firstOffset + ", lastOffset=" + lastOffset + - '}'; + ')'; } } From 2d9b9ef564fd9c0f0eb3e07009b94d1cd7631368 Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Sun, 1 Jan 2023 08:51:00 +0530 Subject: [PATCH 04/11] Resolved a few failing tests --- .../scala/unit/kafka/log/LogSegmentTest.scala | 7 ++--- .../kafka/log/ProducerStateManagerTest.scala | 27 ++++++++++--------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index d7b5a5c145249..299b2a50cb607 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -358,9 +358,10 @@ class LogSegmentTest { // recover again, but this time assuming the transaction from pid2 began on a previous segment stateManager = newProducerStateManager() - stateManager.loadProducerEntry(new ProducerStateEntry(pid2, - util.Arrays.asList(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, - 0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L))) + val batchMetadata = new util.ArrayList[BatchMetadata]() + batchMetadata.add(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)) + stateManager.loadProducerEntry(new ProducerStateEntry(pid2, batchMetadata, producerEpoch,0, + RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L))) segment.recover(stateManager) assertEquals(108L, stateManager.mapEndOffset) diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index f4dd5ec646067..38808adb6be34 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -21,7 +21,7 @@ import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.file.{Files, StandardOpenOption} -import java.util.{Collections, Optional} +import java.util.{Collections, Optional, OptionalLong} import java.util.concurrent.atomic.AtomicInteger import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition @@ -34,6 +34,7 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.Mockito.{mock, when} +import java.util import scala.compat.java8.OptionConverters.RichOptionalGeneric import scala.jdk.CollectionConverters._ @@ -185,7 +186,7 @@ class ProducerStateManagerTest { val lastEntry = maybeLastEntry.get assertEquals(bumpedProducerEpoch, lastEntry.producerEpoch) - assertEquals(None, lastEntry.currentTxnFirstOffset) + assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset) assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.firstSeq) assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.lastSeq) @@ -458,8 +459,8 @@ class ProducerStateManagerTest { assertEquals(5, lastEntry.lastSeq) assertEquals(16L, lastEntry.firstDataOffset) assertEquals(20L, lastEntry.lastDataOffset) - assertEquals(Some(16L), lastEntry.currentTxnFirstOffset) - assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) + assertEquals(OptionalLong.of(16L), lastEntry.currentTxnFirstOffset) + assertEquals(java.util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) appendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(), new LogOffsetMetadata(26L), 30L, true) @@ -469,8 +470,8 @@ class ProducerStateManagerTest { assertEquals(10, lastEntry.lastSeq) assertEquals(16L, lastEntry.firstDataOffset) assertEquals(30L, lastEntry.lastDataOffset) - assertEquals(Some(16L), lastEntry.currentTxnFirstOffset) - assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) + assertEquals(OptionalLong.of(16L), lastEntry.currentTxnFirstOffset) + assertEquals(util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) val completedTxnOpt = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds()) @@ -490,8 +491,8 @@ class ProducerStateManagerTest { assertEquals(16L, lastEntry.firstDataOffset) assertEquals(30L, lastEntry.lastDataOffset) assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch) - assertEquals(None, lastEntry.currentTxnFirstOffset) - assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) + assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset) + assertEquals(java.util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) } @Test @@ -574,7 +575,7 @@ class ProducerStateManagerTest { assertEquals(1, loadedEntry.get.firstSeq) assertEquals(1, loadedEntry.get.lastDataOffset) assertEquals(1, loadedEntry.get.lastSeq) - assertEquals(Some(0), loadedEntry.get.currentTxnFirstOffset) + assertEquals(OptionalLong.of(0), loadedEntry.get.currentTxnFirstOffset) // entry added after recovery append(recoveredMapping, producerId, epoch, 2, 2L, isTransactional = true) @@ -598,7 +599,7 @@ class ProducerStateManagerTest { assertEquals(1, loadedEntry.get.firstSeq) assertEquals(1, loadedEntry.get.lastDataOffset) assertEquals(1, loadedEntry.get.lastSeq) - assertEquals(None, loadedEntry.get.currentTxnFirstOffset) + assertEquals(OptionalLong.empty(), loadedEntry.get.currentTxnFirstOffset) } @Test @@ -616,7 +617,7 @@ class ProducerStateManagerTest { val lastEntry = recoveredMapping.lastEntry(producerId) assertTrue(lastEntry.isDefined) assertEquals(appendTimestamp, lastEntry.get.lastTimestamp) - assertEquals(None, lastEntry.get.currentTxnFirstOffset) + assertEquals(OptionalLong.empty(), lastEntry.get.currentTxnFirstOffset) } @Test @@ -626,7 +627,7 @@ class ProducerStateManagerTest { appendEndTxnMarker(stateManager, producerId, (epoch + 1).toShort, ControlRecordType.ABORT, offset = 1L) val lastEntry = stateManager.lastEntry(producerId).get - assertEquals(None, lastEntry.currentTxnFirstOffset) + assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset) assertEquals(-1, lastEntry.lastDataOffset) assertEquals(-1, lastEntry.firstDataOffset) @@ -995,7 +996,7 @@ class ProducerStateManagerTest { // Appending the empty control batch should not throw and a new transaction shouldn't be started append(stateManager, producerId, baseOffset, batch, origin = AppendOrigin.CLIENT) - assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset) + assertEquals(OptionalLong.empty(), stateManager.lastEntry(producerId).get.currentTxnFirstOffset) } @Test From fc77d30ca818d0f4dd5fb1b16d7f089953c921df Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Tue, 3 Jan 2023 18:46:12 +0530 Subject: [PATCH 05/11] Addressed reveiw comments --- .../kafka/log/ProducerStateManager.scala | 8 ++-- .../src/main/scala/kafka/log/UnifiedLog.scala | 8 ++-- .../scala/unit/kafka/log/LogSegmentTest.scala | 2 +- .../server/log/internals/BatchMetadata.java | 15 +++---- .../server/log/internals/LastRecord.java | 9 ++--- .../log/internals/ProducerAppendInfo.java | 27 ++++++++----- .../log/internals/ProducerStateEntry.java | 40 +++++++++---------- .../server/log/internals/TxnMetadata.java | 1 + 8 files changed, 60 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 2782ab54f6218..d64860ae5b8ca 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -101,11 +101,11 @@ object ProducerStateManager { val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) - val lastAppendedDataBatches = new java.util.ArrayList[BatchMetadata] + val lastAppendedDataBatches = new java.util.ArrayDeque[BatchMetadata] if (offset >= 0) lastAppendedDataBatches.add(new BatchMetadata(seq, offset, offsetDelta, timestamp)) - val currentTxnFirstOffsetValue:OptionalLong = if (currentTxnFirstOffset >= 0) OptionalLong.of(currentTxnFirstOffset) else OptionalLong.empty() + val currentTxnFirstOffsetValue = if (currentTxnFirstOffset >= 0) OptionalLong.of(currentTxnFirstOffset) else OptionalLong.empty() new ProducerStateEntry(producerId, lastAppendedDataBatches, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetValue) } @@ -402,8 +402,8 @@ class ProducerStateManager( * Update the mapping with the given append information */ def update(appendInfo: ProducerAppendInfo): Unit = { - if (appendInfo.producerId == RecordBatch.NO_PRODUCER_ID) - throw new IllegalArgumentException(s"Invalid producer id ${appendInfo.producerId} passed to update " + + if (appendInfo.producerId() == RecordBatch.NO_PRODUCER_ID) + throw new IllegalArgumentException(s"Invalid producer id ${appendInfo.producerId()} passed to update " + s"for partition $topicPartition") trace(s"Updated producer ${appendInfo.producerId} state to $appendInfo") diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 10fd0e3e59617..3afad86228977 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 -import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator} +import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator, ProducerAppendInfo} import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.record.BrokerCompressionType @@ -686,7 +686,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, private[log] def lastRecordsOfActiveProducers: Map[Long, LastRecord] = lock synchronized { producerStateManager.activeProducers.map { case (producerId, producerIdEntry) => - val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0 ) Some(producerIdEntry.lastDataOffset) else None + val lastDataOffset = + if (producerIdEntry.lastDataOffset >= 0) OptionalLong.of(producerIdEntry.lastDataOffset) + else OptionalLong.empty() val lastRecord = new LastRecord(lastDataOffset, producerIdEntry.producerEpoch) producerId -> lastRecord } @@ -1979,7 +1981,7 @@ object UnifiedLog extends Logging { origin: AppendOrigin): Option[CompletedTxn] = { val producerId = batch.producerId val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin)) - appendInfo.append(batch, firstOffsetMetadata) + appendInfo.append(batch, firstOffsetMetadata.asJava).asScala } /** diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 299b2a50cb607..0e4f0d152320d 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -358,7 +358,7 @@ class LogSegmentTest { // recover again, but this time assuming the transaction from pid2 began on a previous segment stateManager = newProducerStateManager() - val batchMetadata = new util.ArrayList[BatchMetadata]() + val batchMetadata = new util.ArrayDeque[BatchMetadata]() batchMetadata.add(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)) stateManager.loadProducerEntry(new ProducerStateEntry(pid2, batchMetadata, producerEpoch,0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L))) diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java b/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java index e59b25eeca31f..1aa9bfa217987 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java @@ -51,27 +51,28 @@ public boolean equals(Object o) { BatchMetadata that = (BatchMetadata) o; - if (lastSeq != that.lastSeq) return false; - if (lastOffset != that.lastOffset) return false; - if (offsetDelta != that.offsetDelta) return false; - return timestamp == that.timestamp; + return lastSeq == that.lastSeq && + lastOffset == that.lastOffset && + offsetDelta == that.offsetDelta && + timestamp == that.timestamp; } @Override public int hashCode() { int result = lastSeq; - result = 31 * result + (int) (lastOffset ^ (lastOffset >>> 32)); + result = 31 * result + Long.hashCode(lastOffset); result = 31 * result + offsetDelta; - result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + Long.hashCode(timestamp); return result; } @Override public String toString() { return "BatchMetadata(" + + "firstSeq=" + firstSeq() + "lastSeq=" + lastSeq + + "firstOffset=" + firstOffset() + ", lastOffset=" + lastOffset + - ", offsetDelta=" + offsetDelta + ", timestamp=" + timestamp + ')'; } diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java b/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java index 158bcaa58bfd3..2952084849497 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java @@ -30,6 +30,7 @@ public final class LastRecord { public LastRecord(OptionalLong lastDataOffset, short producerEpoch) { this.lastDataOffset = lastDataOffset; this.producerEpoch = producerEpoch; + Objects.requireNonNull(lastDataOffset, "lastDataOffset must be non null"); } @Override @@ -39,15 +40,13 @@ public boolean equals(Object o) { LastRecord that = (LastRecord) o; - if (producerEpoch != that.producerEpoch) return false; - return Objects.equals(lastDataOffset, that.lastDataOffset); + return producerEpoch == that.producerEpoch && + lastDataOffset.equals(that.lastDataOffset); } @Override public int hashCode() { - int result = lastDataOffset != null ? lastDataOffset.hashCode() : 0; - result = 31 * result + (int) producerEpoch; - return result; + return 31 * lastDataOffset.hashCode() + (int) producerEpoch; } @Override diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java index fae8a1bfa5bd3..c38fcc5c3f20a 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java @@ -44,7 +44,7 @@ public class ProducerAppendInfo { private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); private final TopicPartition topicPartition; - public final long producerId; + private final long producerId; private final ProducerStateEntry currentEntry; private final AppendOrigin origin; @@ -52,6 +52,8 @@ public class ProducerAppendInfo { private final ProducerStateEntry updatedEntry; /** + * Creates a new instance with the provided parameters. + * * @param topicPartition topic partition * @param producerId The id of the producer appending to the log * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of @@ -78,6 +80,10 @@ public ProducerAppendInfo(TopicPartition topicPartition, currentEntry.currentTxnFirstOffset); } + public long producerId() { + return producerId; + } + private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { checkProducerEpoch(producerEpoch, offset); if (origin == AppendOrigin.CLIENT) { @@ -181,9 +187,8 @@ public void appendDataBatch(short epoch, private void checkCoordinatorEpoch(EndTransactionMarker endTxnMarker, long offset) { if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch()) { if (origin == AppendOrigin.REPLICATION) { - log.info("Detected invalid coordinator epoch for producerId " + producerId + " at " + - "offset " + offset + " in partition $topicPartition: " + endTxnMarker.coordinatorEpoch() + - " is older than previously known coordinator epoch " + updatedEntry.coordinatorEpoch); + log.info("Detected invalid coordinator epoch for producerId {} at offset {} in partition {}: {} is older than previously known coordinator epoch {}", + producerId, offset, topicPartition, endTxnMarker.coordinatorEpoch(), updatedEntry.coordinatorEpoch); } else { throw new TransactionCoordinatorFencedException("Invalid coordinator epoch for producerId " + producerId + " at " + "offset " + offset + " in partition " + topicPartition + ": " + endTxnMarker.coordinatorEpoch() + @@ -227,12 +232,14 @@ public List startedTransactions() { @Override public String toString() { return "ProducerAppendInfo(" + - "topicPartition=" + topicPartition + - ", producerId=" + producerId + - ", currentEntry=" + currentEntry + - ", origin=" + origin + - ", transactions=" + transactions + - ", updatedEntry=" + updatedEntry + + "producerId=" + producerId + + ", producerEpoch=" + updatedEntry.producerEpoch() + + ", firstSequence=" + updatedEntry.firstSeq() + + ", lastSequence=" + updatedEntry.lastSeq() + + ", currentTxnFirstOffset=" + updatedEntry.currentTxnFirstOffset + + ", coordinatorEpoch=" + updatedEntry.coordinatorEpoch + + ", lastTimestamp=" + updatedEntry.lastTimestamp + + ", startedTransactions=" + transactions + ')'; } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java index e103dca8ee1d2..a67802de8ff65 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java @@ -18,9 +18,10 @@ import org.apache.kafka.common.record.RecordBatch; -import java.util.ArrayList; +import java.util.ArrayDeque; +import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.Deque; import java.util.Optional; import java.util.OptionalLong; import java.util.stream.Stream; @@ -33,21 +34,21 @@ public class ProducerStateEntry { public static final int NUM_BATCHES_TO_RETAIN = 5; public final long producerId; - private final List batchMetadata; + private final Deque batchMetadata; private short producerEpoch; public int coordinatorEpoch; public long lastTimestamp; public OptionalLong currentTxnFirstOffset; public ProducerStateEntry(long producerId) { - this(producerId, new ArrayList<>(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); + this(producerId, new ArrayDeque<>(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); } public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { - this(producerId, new ArrayList<>(), producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); + this(producerId, new ArrayDeque<>(), producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); } - public ProducerStateEntry(long producerId, List batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { + public ProducerStateEntry(long producerId, Deque batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { this.producerId = producerId; this.batchMetadata = batchMetadata; this.producerEpoch = producerEpoch; @@ -57,24 +58,23 @@ public ProducerStateEntry(long producerId, List batchMetadata, sh } public int firstSeq() { - return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.get(0).firstSeq(); + return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.getFirst().firstSeq(); } - - public long firstDataOffset() { - return isEmpty() ? -1L : batchMetadata.get(0).firstOffset(); + public int lastSeq() { + return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.getLast().lastSeq; } - public int lastSeq() { - return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.get(batchMetadata.size() - 1).lastSeq; + public long firstDataOffset() { + return isEmpty() ? -1L : batchMetadata.getFirst().firstOffset(); } public long lastDataOffset() { - return isEmpty() ? -1L : batchMetadata.get(batchMetadata.size() - 1).lastOffset; + return isEmpty() ? -1L : batchMetadata.getLast().lastOffset; } public int lastOffsetDelta() { - return isEmpty() ? 0 : batchMetadata.get(batchMetadata.size() - 1).offsetDelta; + return isEmpty() ? 0 : batchMetadata.getLast().offsetDelta; } public boolean isEmpty() { @@ -98,13 +98,13 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } private void addBatchMetadata(BatchMetadata batch) { - if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) batchMetadata.remove(0); + if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) batchMetadata.removeFirst(); batchMetadata.add(batch); } public void update(ProducerStateEntry nextEntry) { maybeUpdateProducerEpoch(nextEntry.producerEpoch); - while (!nextEntry.batchMetadata.isEmpty()) addBatchMetadata(nextEntry.batchMetadata.remove(0)); + while (!nextEntry.batchMetadata.isEmpty()) addBatchMetadata(nextEntry.batchMetadata.removeFirst()); this.coordinatorEpoch = nextEntry.coordinatorEpoch; this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset; this.lastTimestamp = nextEntry.lastTimestamp; @@ -121,8 +121,8 @@ Optional batchWithSequenceRange(int firstSeq, int lastSeq) { return duplicate.findFirst(); } - public List batchMetadata() { - return Collections.unmodifiableList(batchMetadata); + public Collection batchMetadata() { + return Collections.unmodifiableCollection(batchMetadata); } public short producerEpoch() { @@ -133,11 +133,11 @@ public short producerEpoch() { public String toString() { return "ProducerStateEntry(" + "producerId=" + producerId + - ", batchMetadata=" + batchMetadata + ", producerEpoch=" + producerEpoch + + ", currentTxnFirstOffset=" + currentTxnFirstOffset + ", coordinatorEpoch=" + coordinatorEpoch + ", lastTimestamp=" + lastTimestamp + - ", currentTxnFirstOffset=" + currentTxnFirstOffset + + ", batchMetadata=" + batchMetadata + ')'; } } \ No newline at end of file diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java index b0e7d84f46388..7cadf0f0d8dfe 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java @@ -30,6 +30,7 @@ public TxnMetadata(long producerId, this.producerId = producerId; this.firstOffset = firstOffset; this.lastOffset = lastOffset; + Objects.requireNonNull(firstOffset, "firstOffset must be non null"); } public TxnMetadata(long producerId, long firstOffset) { this(producerId, new LogOffsetMetadata(firstOffset)); From 3ecb555a5a1de8ef1f4b0c8fe4305098d3bdbe28 Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Thu, 5 Jan 2023 14:19:54 +0530 Subject: [PATCH 06/11] Made producerId as private with getter --- .../main/scala/kafka/log/ProducerStateManager.scala | 6 +++--- .../server/log/internals/ProducerStateEntry.java | 11 ++++++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index d64860ae5b8ca..799973d3ef133 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -101,12 +101,12 @@ object ProducerStateManager { val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) - val lastAppendedDataBatches = new java.util.ArrayDeque[BatchMetadata] + val batchMetadata = new java.util.ArrayDeque[BatchMetadata] if (offset >= 0) - lastAppendedDataBatches.add(new BatchMetadata(seq, offset, offsetDelta, timestamp)) + batchMetadata.add(new BatchMetadata(seq, offset, offsetDelta, timestamp)) val currentTxnFirstOffsetValue = if (currentTxnFirstOffset >= 0) OptionalLong.of(currentTxnFirstOffset) else OptionalLong.empty() - new ProducerStateEntry(producerId, lastAppendedDataBatches, producerEpoch, + new ProducerStateEntry(producerId, batchMetadata, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetValue) } } catch { diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java index a67802de8ff65..34eca24d08c90 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java @@ -27,13 +27,14 @@ import java.util.stream.Stream; /** - * The batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the - * batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} + * This class represents the state of a specific producer-id. + * It contains batchMetadata queue which is ordered such that the batch with the lowest sequence is at the head of the + * queue while the batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} * elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. */ public class ProducerStateEntry { public static final int NUM_BATCHES_TO_RETAIN = 5; - public final long producerId; + private final long producerId; private final Deque batchMetadata; private short producerEpoch; public int coordinatorEpoch; @@ -129,6 +130,10 @@ public short producerEpoch() { return producerEpoch; } + public long producerId() { + return producerId; + } + @Override public String toString() { return "ProducerStateEntry(" + From 89e2ec33d88c24fa4c8e40e1a6b2e7e92031188b Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Thu, 5 Jan 2023 21:23:57 +0530 Subject: [PATCH 07/11] Minor cleanup to pass batachmetadata list and other minro changes. --- .../kafka/log/ProducerStateManager.scala | 18 +++++---------- .../scala/unit/kafka/log/LogSegmentTest.scala | 7 ++---- .../server/log/internals/BatchMetadata.java | 4 ++-- .../server/log/internals/LastRecord.java | 4 ++-- .../log/internals/ProducerStateEntry.java | 9 ++++---- .../server/log/internals/TxnMetadata.java | 22 +------------------ 6 files changed, 17 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 799973d3ef133..35a29d0eea843 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -34,14 +34,6 @@ import java.util.concurrent.ConcurrentSkipListMap import scala.collection.{immutable, mutable} import scala.jdk.CollectionConverters._ - - - - - - - - object ProducerStateManager { val LateTransactionBufferMs = 5 * 60 * 1000 @@ -101,13 +93,13 @@ object ProducerStateManager { val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) - val batchMetadata = new java.util.ArrayDeque[BatchMetadata] - if (offset >= 0) - batchMetadata.add(new BatchMetadata(seq, offset, offsetDelta, timestamp)) + val batchMetadata: java.util.List[BatchMetadata] = + if (offset >= 0) + java.util.Collections.singletonList[BatchMetadata](new BatchMetadata(seq, offset, offsetDelta, timestamp)) + else java.util.Collections.emptyList[BatchMetadata]() val currentTxnFirstOffsetValue = if (currentTxnFirstOffset >= 0) OptionalLong.of(currentTxnFirstOffset) else OptionalLong.empty() - new ProducerStateEntry(producerId, batchMetadata, producerEpoch, - coordinatorEpoch, timestamp, currentTxnFirstOffsetValue) + new ProducerStateEntry(producerId, batchMetadata, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetValue) } } catch { case e: SchemaException => diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 0e4f0d152320d..436e4890b1fb1 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -17,7 +17,6 @@ package kafka.log import java.io.File -import java.util import java.util.OptionalLong import kafka.server.checkpoints.LeaderEpochCheckpoint @@ -358,10 +357,8 @@ class LogSegmentTest { // recover again, but this time assuming the transaction from pid2 began on a previous segment stateManager = newProducerStateManager() - val batchMetadata = new util.ArrayDeque[BatchMetadata]() - batchMetadata.add(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)) - stateManager.loadProducerEntry(new ProducerStateEntry(pid2, batchMetadata, producerEpoch,0, - RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L))) + stateManager.loadProducerEntry(new ProducerStateEntry(pid2, java.util.Collections.singletonList(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), + producerEpoch, 0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L))) segment.recover(stateManager) assertEquals(108L, stateManager.mapEndOffset) diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java b/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java index 1aa9bfa217987..668456c3518cb 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java @@ -70,8 +70,8 @@ public int hashCode() { public String toString() { return "BatchMetadata(" + "firstSeq=" + firstSeq() + - "lastSeq=" + lastSeq + - "firstOffset=" + firstOffset() + + ", lastSeq=" + lastSeq + + ", firstOffset=" + firstOffset() + ", lastOffset=" + lastOffset + ", timestamp=" + timestamp + ')'; diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java b/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java index 2952084849497..78568da7897ca 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java @@ -28,9 +28,9 @@ public final class LastRecord { public final short producerEpoch; public LastRecord(OptionalLong lastDataOffset, short producerEpoch) { + Objects.requireNonNull(lastDataOffset, "lastDataOffset must be non null"); this.lastDataOffset = lastDataOffset; this.producerEpoch = producerEpoch; - Objects.requireNonNull(lastDataOffset, "lastDataOffset must be non null"); } @Override @@ -46,7 +46,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return 31 * lastDataOffset.hashCode() + (int) producerEpoch; + return 31 * lastDataOffset.hashCode() + producerEpoch; } @Override diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java index 34eca24d08c90..5342a5b4437a7 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Deque; +import java.util.List; import java.util.Optional; import java.util.OptionalLong; import java.util.stream.Stream; @@ -42,16 +43,16 @@ public class ProducerStateEntry { public OptionalLong currentTxnFirstOffset; public ProducerStateEntry(long producerId) { - this(producerId, new ArrayDeque<>(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); + this(producerId, Collections.emptyList(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); } public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { - this(producerId, new ArrayDeque<>(), producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); + this(producerId, Collections.emptyList(), producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); } - public ProducerStateEntry(long producerId, Deque batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { + public ProducerStateEntry(long producerId, List batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { this.producerId = producerId; - this.batchMetadata = batchMetadata; + this.batchMetadata = new ArrayDeque<>(batchMetadata); this.producerEpoch = producerEpoch; this.coordinatorEpoch = coordinatorEpoch; this.lastTimestamp = lastTimestamp; diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java index 7cadf0f0d8dfe..76fcb4c528f1a 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java @@ -27,10 +27,10 @@ public final class TxnMetadata { public TxnMetadata(long producerId, LogOffsetMetadata firstOffset, OptionalLong lastOffset) { + Objects.requireNonNull(firstOffset, "firstOffset must be non null"); this.producerId = producerId; this.firstOffset = firstOffset; this.lastOffset = lastOffset; - Objects.requireNonNull(firstOffset, "firstOffset must be non null"); } public TxnMetadata(long producerId, long firstOffset) { this(producerId, new LogOffsetMetadata(firstOffset)); @@ -40,26 +40,6 @@ public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) { this(producerId, firstOffset, OptionalLong.empty()); } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TxnMetadata that = (TxnMetadata) o; - - if (producerId != that.producerId) return false; - if (!Objects.equals(firstOffset, that.firstOffset)) return false; - return Objects.equals(lastOffset, that.lastOffset); - } - - @Override - public int hashCode() { - int result = (int) (producerId ^ (producerId >>> 32)); - result = 31 * result + (firstOffset != null ? firstOffset.hashCode() : 0); - result = 31 * result + (lastOffset != null ? lastOffset.hashCode() : 0); - return result; - } - @Override public String toString() { return "TxnMetadata(" + From 499bf7017f9dacc9e9fbf2ba196e218506901b33 Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Fri, 6 Jan 2023 13:49:01 +0530 Subject: [PATCH 08/11] Fixed a few tests failed in ProducerManagerTest introducing equality of ProducerStateEntry instances. --- .../kafka/log/ProducerStateManagerTest.scala | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 38808adb6be34..d76d98aab121d 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -239,7 +239,7 @@ class ProducerStateManagerTest { // Start one transaction in a separate append val firstAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT) appendData(16L, 20L, firstAppend) - assertEquals(new TxnMetadata(producerId, 16L), firstAppend.startedTransactions.asScala.head) + assertTxnMetadataEquals(new TxnMetadata(producerId, 16L), firstAppend.startedTransactions.asScala.head) stateManager.update(firstAppend) stateManager.onHighWatermarkUpdated(21L) assertEquals(Some(new LogOffsetMetadata(16L)), stateManager.firstUnstableOffset) @@ -259,8 +259,8 @@ class ProducerStateManagerTest { appendData(30L, 31L, secondAppend) assertEquals(2, secondAppend.startedTransactions.size) - assertEquals(new TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.asScala.head) - assertEquals(new TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.asScala.last) + assertTxnMetadataEquals(new TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.asScala.head) + assertTxnMetadataEquals(new TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.asScala.last) stateManager.update(secondAppend) stateManager.completeTxn(firstCompletedTxn.get) stateManager.completeTxn(secondCompletedTxn.get) @@ -268,6 +268,19 @@ class ProducerStateManagerTest { assertEquals(Some(new LogOffsetMetadata(30L)), stateManager.firstUnstableOffset) } + def assertTxnMetadataEquals(expected: java.util.List[TxnMetadata], actual: java.util.List[TxnMetadata]): Unit = { + val expectedIter = expected.iterator() + val actualIter = actual.iterator() + while(expectedIter.hasNext && actualIter.hasNext) { + assertTxnMetadataEquals(expectedIter.next(), actualIter.next()) + } + } + + def assertTxnMetadataEquals(expected: TxnMetadata, actual:TxnMetadata) : Unit = { + assertEquals(expected.producerId, actual.producerId) + assertEquals(expected.firstOffset, actual.firstOffset) + } + @Test def testHasLateTransaction(): Unit = { val producerId1 = 39L @@ -460,7 +473,7 @@ class ProducerStateManagerTest { assertEquals(16L, lastEntry.firstDataOffset) assertEquals(20L, lastEntry.lastDataOffset) assertEquals(OptionalLong.of(16L), lastEntry.currentTxnFirstOffset) - assertEquals(java.util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) + assertTxnMetadataEquals(java.util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) appendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(), new LogOffsetMetadata(26L), 30L, true) @@ -471,7 +484,7 @@ class ProducerStateManagerTest { assertEquals(16L, lastEntry.firstDataOffset) assertEquals(30L, lastEntry.lastDataOffset) assertEquals(OptionalLong.of(16L), lastEntry.currentTxnFirstOffset) - assertEquals(util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) + assertTxnMetadataEquals(util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) val completedTxnOpt = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds()) @@ -492,7 +505,7 @@ class ProducerStateManagerTest { assertEquals(30L, lastEntry.lastDataOffset) assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch) assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset) - assertEquals(java.util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) + assertTxnMetadataEquals(java.util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) } @Test From a01c9ec8205e80cbc036ee8a3df034d95abafd33 Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Fri, 6 Jan 2023 18:28:27 +0530 Subject: [PATCH 09/11] Addressed review comments. List arg is replaced with a single instance of BatchMetadata. --- .../main/scala/kafka/log/ProducerStateManager.scala | 6 +----- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 3 +-- .../server/log/internals/ProducerStateEntry.java | 11 +++++------ 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 35a29d0eea843..3fe2d4b7655f1 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -93,11 +93,7 @@ object ProducerStateManager { val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) - val batchMetadata: java.util.List[BatchMetadata] = - if (offset >= 0) - java.util.Collections.singletonList[BatchMetadata](new BatchMetadata(seq, offset, offsetDelta, timestamp)) - else java.util.Collections.emptyList[BatchMetadata]() - + val batchMetadata: BatchMetadata = if (offset >= 0) new BatchMetadata(seq, offset, offsetDelta, timestamp) else null val currentTxnFirstOffsetValue = if (currentTxnFirstOffset >= 0) OptionalLong.of(currentTxnFirstOffset) else OptionalLong.empty() new ProducerStateEntry(producerId, batchMetadata, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetValue) } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 436e4890b1fb1..920a1e1f19ff2 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -357,8 +357,7 @@ class LogSegmentTest { // recover again, but this time assuming the transaction from pid2 began on a previous segment stateManager = newProducerStateManager() - stateManager.loadProducerEntry(new ProducerStateEntry(pid2, java.util.Collections.singletonList(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), - producerEpoch, 0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L))) + stateManager.loadProducerEntry(new ProducerStateEntry(pid2, new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP), producerEpoch, 0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L))) segment.recover(stateManager) assertEquals(108L, stateManager.mapEndOffset) diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java index 5342a5b4437a7..1fbef1ea69334 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Deque; -import java.util.List; import java.util.Optional; import java.util.OptionalLong; import java.util.stream.Stream; @@ -36,27 +35,27 @@ public class ProducerStateEntry { public static final int NUM_BATCHES_TO_RETAIN = 5; private final long producerId; - private final Deque batchMetadata; + private final Deque batchMetadata = new ArrayDeque<>(); private short producerEpoch; public int coordinatorEpoch; public long lastTimestamp; public OptionalLong currentTxnFirstOffset; public ProducerStateEntry(long producerId) { - this(producerId, Collections.emptyList(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); + this(producerId, null, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); } public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { - this(producerId, Collections.emptyList(), producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); + this(producerId, null, producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); } - public ProducerStateEntry(long producerId, List batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { + public ProducerStateEntry(long producerId, BatchMetadata firstBatchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { this.producerId = producerId; - this.batchMetadata = new ArrayDeque<>(batchMetadata); this.producerEpoch = producerEpoch; this.coordinatorEpoch = coordinatorEpoch; this.lastTimestamp = lastTimestamp; this.currentTxnFirstOffset = currentTxnFirstOffset; + if (firstBatchMetadata != null) batchMetadata.add(firstBatchMetadata); } public int firstSeq() { From ef3b5cd8c6b0669c4ca5ca8a382db24a8483aca6 Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Sat, 7 Jan 2023 07:30:55 +0530 Subject: [PATCH 10/11] Addressed review comments --- core/src/main/scala/kafka/log/LogCleaner.scala | 3 +-- .../main/scala/kafka/log/ProducerStateManager.scala | 8 +++++--- .../src/test/scala/unit/kafka/log/LogSegmentTest.scala | 2 +- .../unit/kafka/log/ProducerStateManagerTest.scala | 6 ++++-- .../kafka/server/log/internals/ProducerAppendInfo.java | 10 ++++------ .../kafka/server/log/internals/ProducerStateEntry.java | 10 +++------- 6 files changed, 18 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 83d1ff7475453..5a098790a3104 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -174,8 +174,7 @@ class LogCleaner(initialConfig: CleanerConfig, } override def validateReconfiguration(newConfig: KafkaConfig): Unit = { - val newCleanerConfig = LogCleaner.cleanerConfig(newConfig) - val numThreads = newCleanerConfig.numThreads + val numThreads = LogCleaner.cleanerConfig(newConfig).numThreads val currentThreads = config.numThreads if (numThreads < 1) throw new ConfigException(s"Log cleaner threads should be at least 1") diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 3fe2d4b7655f1..91ccdfe5fdcdc 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -29,7 +29,7 @@ import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.file.{Files, NoSuchFileException, StandardOpenOption} -import java.util.OptionalLong +import java.util.{Optional, OptionalLong} import java.util.concurrent.ConcurrentSkipListMap import scala.collection.{immutable, mutable} import scala.jdk.CollectionConverters._ @@ -93,9 +93,11 @@ object ProducerStateManager { val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) - val batchMetadata: BatchMetadata = if (offset >= 0) new BatchMetadata(seq, offset, offsetDelta, timestamp) else null + val batchMetadata = + if (offset >= 0) Optional.of(new BatchMetadata(seq, offset, offsetDelta, timestamp)) + else Optional.empty[BatchMetadata]() val currentTxnFirstOffsetValue = if (currentTxnFirstOffset >= 0) OptionalLong.of(currentTxnFirstOffset) else OptionalLong.empty() - new ProducerStateEntry(producerId, batchMetadata, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetValue) + new ProducerStateEntry(producerId, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetValue, batchMetadata) } } catch { case e: SchemaException => diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 920a1e1f19ff2..9bd5c56e7ac4e 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -357,7 +357,7 @@ class LogSegmentTest { // recover again, but this time assuming the transaction from pid2 began on a previous segment stateManager = newProducerStateManager() - stateManager.loadProducerEntry(new ProducerStateEntry(pid2, new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP), producerEpoch, 0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L))) + stateManager.loadProducerEntry(new ProducerStateEntry(pid2, producerEpoch, 0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L), java.util.Optional.of(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)))) segment.recover(stateManager) assertEquals(108L, stateManager.mapEndOffset) diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index d76d98aab121d..c9546c225487b 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -271,14 +271,16 @@ class ProducerStateManagerTest { def assertTxnMetadataEquals(expected: java.util.List[TxnMetadata], actual: java.util.List[TxnMetadata]): Unit = { val expectedIter = expected.iterator() val actualIter = actual.iterator() - while(expectedIter.hasNext && actualIter.hasNext) { + assertEquals(expected.size(), actual.size()) + while (expectedIter.hasNext && actualIter.hasNext) { assertTxnMetadataEquals(expectedIter.next(), actualIter.next()) } } - def assertTxnMetadataEquals(expected: TxnMetadata, actual:TxnMetadata) : Unit = { + def assertTxnMetadataEquals(expected: TxnMetadata, actual: TxnMetadata): Unit = { assertEquals(expected.producerId, actual.producerId) assertEquals(expected.firstOffset, actual.firstOffset) + assertEquals(expected.lastOffset, actual.lastOffset) } @Test diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java index c38fcc5c3f20a..1503f22fc45dd 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java @@ -74,10 +74,8 @@ public ProducerAppendInfo(TopicPartition topicPartition, this.currentEntry = currentEntry; this.origin = origin; - updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), - currentEntry.coordinatorEpoch, - currentEntry.lastTimestamp, - currentEntry.currentTxnFirstOffset); + updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), currentEntry.coordinatorEpoch, currentEntry.lastTimestamp, currentEntry.currentTxnFirstOffset, Optional.empty() + ); } public long producerId() { @@ -93,8 +91,8 @@ private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offs private void checkProducerEpoch(short producerEpoch, long offset) { if (producerEpoch < updatedEntry.producerEpoch()) { - String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + - "which is smaller than the last seen epoch %d", producerId, offset, topicPartition, producerEpoch, updatedEntry.producerEpoch()); + String message = "Epoch of producer " + producerId + " at offset " + offset + " in " + topicPartition + + " is " + producerEpoch + ", " + "which is smaller than the last seen epoch " + updatedEntry.producerEpoch(); if (origin == AppendOrigin.REPLICATION) { log.warn(message); diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java index 1fbef1ea69334..f9ac022e83638 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java @@ -42,20 +42,16 @@ public class ProducerStateEntry { public OptionalLong currentTxnFirstOffset; public ProducerStateEntry(long producerId) { - this(producerId, null, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); + this(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty()); } - public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { - this(producerId, null, producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); - } - - public ProducerStateEntry(long producerId, BatchMetadata firstBatchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { + public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset, Optional firstBatchMetadata) { this.producerId = producerId; this.producerEpoch = producerEpoch; this.coordinatorEpoch = coordinatorEpoch; this.lastTimestamp = lastTimestamp; this.currentTxnFirstOffset = currentTxnFirstOffset; - if (firstBatchMetadata != null) batchMetadata.add(firstBatchMetadata); + firstBatchMetadata.ifPresent(batchMetadata::add); } public int firstSeq() { From a4aaf8397b242e5edfa896252a2e4fcf73e1ef27 Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Sun, 8 Jan 2023 06:38:57 +0530 Subject: [PATCH 11/11] Addressed review comments --- .../kafka/log/ProducerStateManager.scala | 2 +- .../kafka/log/ProducerStateManagerTest.scala | 4 ++-- .../log/internals/ProducerAppendInfo.java | 22 ++++++++----------- .../log/internals/ProducerStateEntry.java | 12 +++++++--- 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 91ccdfe5fdcdc..2dc7748152dca 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -384,7 +384,7 @@ class ProducerStateManager( } def prepareUpdate(producerId: Long, origin: AppendOrigin): ProducerAppendInfo = { - val currentEntry = lastEntry(producerId).getOrElse(new ProducerStateEntry(producerId)) + val currentEntry = lastEntry(producerId).getOrElse(ProducerStateEntry.empty(producerId)) new ProducerAppendInfo(topicPartition, producerId, currentEntry, origin) } diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index c9546c225487b..903d51f94f27e 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -200,7 +200,7 @@ class ProducerStateManagerTest { val producerEpoch = 0.toShort val offset = 992342L val seq = 0 - val producerAppendInfo = new ProducerAppendInfo(partition, producerId, new ProducerStateEntry(producerId), AppendOrigin.CLIENT) + val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), AppendOrigin.CLIENT) val firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224) producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(), @@ -387,7 +387,7 @@ class ProducerStateManagerTest { val producerAppendInfo = new ProducerAppendInfo( partition, producerId, - new ProducerStateEntry(producerId), + ProducerStateEntry.empty(producerId), AppendOrigin.CLIENT ) val firstOffsetMetadata = new LogOffsetMetadata(startOffset, segmentBaseOffset, 50 * relativeOffset) diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java index 1503f22fc45dd..90329a45cef30 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java @@ -74,8 +74,7 @@ public ProducerAppendInfo(TopicPartition topicPartition, this.currentEntry = currentEntry; this.origin = origin; - updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), currentEntry.coordinatorEpoch, currentEntry.lastTimestamp, currentEntry.currentTxnFirstOffset, Optional.empty() - ); + updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), currentEntry.coordinatorEpoch, currentEntry.lastTimestamp, currentEntry.currentTxnFirstOffset, Optional.empty()); } public long producerId() { @@ -168,17 +167,14 @@ public void appendDataBatch(short epoch, updatedEntry.addBatch(epoch, lastSeq, lastOffset, (int) (lastOffset - firstOffset), lastTimestamp); OptionalLong currentTxnFirstOffset = updatedEntry.currentTxnFirstOffset; - if (currentTxnFirstOffset.isPresent()) { - if (!isTransactional) - // Received a non-transactional message while a transaction is active - throw new InvalidTxnStateException("Expected transactional write from producer " + producerId + " at " + - "offset " + firstOffsetMetadata + " in partition " + topicPartition); - } else { - if (isTransactional) { - // Began a new transaction - updatedEntry.currentTxnFirstOffset = OptionalLong.of(firstOffset); - transactions.add(new TxnMetadata(producerId, firstOffsetMetadata)); - } + if (currentTxnFirstOffset.isPresent() && !isTransactional) { + // Received a non-transactional message while a transaction is active + throw new InvalidTxnStateException("Expected transactional write from producer " + producerId + " at " + + "offset " + firstOffsetMetadata + " in partition " + topicPartition); + } else if (!currentTxnFirstOffset.isPresent() && isTransactional) { + // Began a new transaction + updatedEntry.currentTxnFirstOffset = OptionalLong.of(firstOffset); + transactions.add(new TxnMetadata(producerId, firstOffsetMetadata)); } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java index f9ac022e83638..bbb3e9f90411d 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java @@ -34,13 +34,19 @@ */ public class ProducerStateEntry { public static final int NUM_BATCHES_TO_RETAIN = 5; - private final long producerId; - private final Deque batchMetadata = new ArrayDeque<>(); - private short producerEpoch; + public int coordinatorEpoch; public long lastTimestamp; public OptionalLong currentTxnFirstOffset; + private final long producerId; + private final Deque batchMetadata = new ArrayDeque<>(); + private short producerEpoch; + + public static ProducerStateEntry empty(long producerId) { + return new ProducerStateEntry(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty()); + } + public ProducerStateEntry(long producerId) { this(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty()); }