diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 1e348b19b3e1b..0a0fd9260f14f 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -1006,6 +1006,9 @@ class TransactionCoordinator(txnConfig: TransactionConfig, def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId) + // Package-private for testing + private[kafka] def transactionManager: TransactionStateManager = txnManager + private def onEndTransactionComplete(txnIdAndPidEpoch: TransactionalIdAndProducerIdEpoch)(error: Errors, newProducerId: Long, newProducerEpoch: Short): Unit = { error match { case Errors.NONE => diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 023e9f15b0075..25b274cf3fefe 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -19,6 +19,7 @@ package kafka.api import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue} import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.TransactionState import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.{KafkaException, TopicPartition} @@ -695,6 +696,122 @@ class TransactionsTest extends IntegrationTestHarness { assertThrows(classOf[IllegalStateException], () => producer.initTransactions()) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly")) + def testRecoveryFromEpochOverflow(groupProtocol: String): Unit = { + // We could encounter a bug (see https://issues.apache.org/jira/browse/KAFKA-20090) + // that only reproduces when epoch gets to Short.MaxValue - 1 and transaction is + // aborted on timeout. + val transactionalId = "test-overflow" + var producer = createTransactionalProducer(transactionalId, transactionTimeoutMs = 500) + val abortedRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1, 0, "key".getBytes, "aborted".getBytes) + + // Create a transaction, produce one record, and abort + producer.initTransactions() + producer.beginTransaction() + producer.send(abortedRecord) + producer.abortTransaction() + producer.close() + + // Find the transaction coordinator partition for this transactional ID + val adminClient = createAdminClient() + try { + val txnDescription = adminClient.describeTransactions(java.util.List.of(transactionalId)) + .description(transactionalId).get() + val coordinatorId = txnDescription.coordinatorId() + + // Access the transaction coordinator and update the epoch to Short.MaxValue - 2 + val coordinatorBroker = brokers.find(_.config.brokerId == coordinatorId).get + val txnCoordinator = coordinatorBroker.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator + + // Get the transaction metadata and update the epoch close to Short.MaxValue + // to trigger the overflow scenario. We'll set it high enough that subsequent + // operations will cause it to reach Short.MaxValue - 1 before the timeout. + txnCoordinator.transactionManager.getTransactionState(transactionalId).foreach { txnMetadataOpt => + txnMetadataOpt.foreach { epochAndMetadata => + epochAndMetadata.transactionMetadata.inLock(() => { + epochAndMetadata.transactionMetadata.setProducerEpoch((Short.MaxValue - 2).toShort) + null // inLock expects a Supplier that returns a value + }) + } + } + } finally { + adminClient.close() + } + + // Re-initialize the producer which will bump epoch + producer = createTransactionalProducer(transactionalId, transactionTimeoutMs = 500) + producer.initTransactions() + + // Start a transaction + producer.beginTransaction() + // Produce one record and wait for it to complete + producer.send(abortedRecord).get() + producer.flush() + + // Check and assert that epoch of the transaction is Short.MaxValue - 1 (before timeout) + val adminClient2 = createAdminClient() + try { + val coordinatorId2 = adminClient2.describeTransactions(java.util.List.of(transactionalId)) + .description(transactionalId).get().coordinatorId() + val coordinatorBroker2 = brokers.find(_.config.brokerId == coordinatorId2).get + val txnCoordinator2 = coordinatorBroker2.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator + + txnCoordinator2.transactionManager.getTransactionState(transactionalId).foreach { txnMetadataOpt => + txnMetadataOpt.foreach { epochAndMetadata => + val currentEpoch = epochAndMetadata.transactionMetadata.producerEpoch() + assertEquals((Short.MaxValue - 1).toShort, currentEpoch, + s"Expected epoch to be ${Short.MaxValue - 1}, but got $currentEpoch") + } + } + + // Wait until state is complete abort + waitUntilTrue(() => { + val listResult = adminClient2.listTransactions() + val txns = listResult.all().get().asScala + txns.exists(txn => + txn.transactionalId() == transactionalId && + txn.state() == TransactionState.COMPLETE_ABORT + ) + }, "Transaction was not aborted on timeout") + } finally { + adminClient2.close() + } + + // Abort, this should be treated as retry of the abort caused by timeout + producer.abortTransaction() + + // Start a transaction, it would use the state from abort + producer.beginTransaction() + // Produce one record and wait for it to complete + producer.send(abortedRecord).get() + producer.flush() + + // Now init new producer and commit a transaction with a distinct value + val producer2 = createTransactionalProducer(transactionalId, transactionTimeoutMs = 500) + producer2.initTransactions() + producer2.beginTransaction() + val committedRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1, 0, "key".getBytes, "committed".getBytes) + producer2.send(committedRecord).get() + producer2.commitTransaction() + + // Verify that exactly one record is visible in read-committed mode + val consumer = createReadCommittedConsumer("test-consumer-group") + try { + val tp = new TopicPartition(topic1, 0) + consumer.assign(java.util.Set.of(tp)) + val records = consumeRecords(consumer, 1) + + val record = records.head + assertArrayEquals("key".getBytes, record.key, "Record key should match") + assertArrayEquals("committed".getBytes, record.value, "Record value should be 'committed'") + assertEquals(0, record.partition, "Record should be in partition 0") + assertEquals(topic1, record.topic, "Record should be in topic1") + } finally { + consumer.close() + } + } + @ParameterizedTest @CsvSource(Array( "classic,false", diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala index 87a18b18dc09b..4914d5e135775 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala @@ -497,7 +497,14 @@ class TransactionMetadataTest { time.milliseconds(), TV_0) assertTrue(txnMetadata.isProducerEpochExhausted) - assertThrows(classOf[IllegalStateException], () => txnMetadata.prepareFenceProducerEpoch()) + + // When epoch is at max, prepareFenceProducerEpoch logs an error but doesn't throw + // This allows graceful recovery through producer ID rotation + val preparedMetadata = txnMetadata.prepareFenceProducerEpoch() + + // Epoch should remain at Short.MaxValue (not overflow to negative) + assertEquals(Short.MaxValue, preparedMetadata.producerEpoch) + assertEquals(TransactionState.PREPARE_EPOCH_FENCE, preparedMetadata.txnState) } @Test diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java index 687d3a5522729..94e4558feb3a6 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java @@ -127,9 +127,11 @@ private void checkProducerEpoch(short producerEpoch, long offset, short transact // In both cases, the transaction has already ended (currentTxnFirstOffset is empty). // We suppress the InvalidProducerEpochException and allow the duplicate marker to // be written to the log. + // In some buggy scenarios we may start transaction with MAX_VALUE. We allow + // code to gracefully recover from that. if (transactionVersion >= 2 && producerEpoch == current && - updatedEntry.currentTxnFirstOffset().isEmpty()) { + (updatedEntry.currentTxnFirstOffset().isEmpty() || producerEpoch == Short.MAX_VALUE)) { log.info("Idempotent transaction marker retry detected for producer {} epoch {}. " + "Transaction already completed, allowing duplicate marker write.", producerId, producerEpoch); diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java index 8efeedc3ec43d..c2703a8539def 100644 --- a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java @@ -139,11 +139,12 @@ public TxnTransitMetadata prepareNoTransit() { public TxnTransitMetadata prepareFenceProducerEpoch() { if (producerEpoch == Short.MAX_VALUE) - throw new IllegalStateException("Cannot fence producer with epoch equal to Short.MaxValue since this would overflow"); + LOGGER.error("Fencing producer {} {} with epoch equal to Short.MaxValue, this must not happen unless there is a bug", transactionalId, producerId); // If we've already failed to fence an epoch (because the write to the log failed), we don't increase it again. // This is safe because we never return the epoch to client if we fail to fence the epoch - short bumpedEpoch = hasFailedEpochFence ? producerEpoch : (short) (producerEpoch + 1); + // Also don't increase if producerEpoch is already at max, to avoid overflow. + short bumpedEpoch = hasFailedEpochFence || producerEpoch == Short.MAX_VALUE ? producerEpoch : (short) (producerEpoch + 1); TransitionData data = new TransitionData(TransactionState.PREPARE_EPOCH_FENCE); data.producerEpoch = bumpedEpoch; @@ -238,8 +239,14 @@ public TxnTransitMetadata prepareAbortOrCommit(TransactionState newState, boolean noPartitionAdded) { TransitionData data = new TransitionData(newState); if (clientTransactionVersion.supportsEpochBump()) { - // We already ensured that we do not overflow here. MAX_SHORT is the highest possible value. - data.producerEpoch = (short) (producerEpoch + 1); + if (producerEpoch == Short.MAX_VALUE && newState == TransactionState.PREPARE_ABORT) { + // If we're already in a broken state, we let the abort go through without + // epoch overflow, so that we can recover and continue. + LOGGER.error("Aborting producer {} {} with epoch equal to Short.MaxValue, this must not happen unless there is a bug", transactionalId, producerId); + } else { + // We already ensured that we do not overflow here. MAX_SHORT is the highest possible value. + data.producerEpoch = (short) (producerEpoch + 1); + } data.lastProducerEpoch = producerEpoch; } else { data.producerEpoch = producerEpoch;