Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,9 @@ class TransactionCoordinator(txnConfig: TransactionConfig,

def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId)

// Package-private for testing
private[kafka] def transactionManager: TransactionStateManager = txnManager
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make txnManager private[kafka] instead of adding another getter

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example:

class TransactionCoordinator(txnConfig: TransactionConfig,
                             scheduler: Scheduler,
                             createProducerIdManager: () => ProducerIdManager,
                             private[kafka] val txnManager: TransactionStateManager, // <-- here
                             txnMarkerChannelManager: TransactionMarkerChannelManager,
                             time: Time,
                             logContext: LogContext)


private def onEndTransactionComplete(txnIdAndPidEpoch: TransactionalIdAndProducerIdEpoch)(error: Errors, newProducerId: Long, newProducerEpoch: Short): Unit = {
error match {
case Errors.NONE =>
Expand Down
117 changes: 117 additions & 0 deletions core/src/test/scala/integration/kafka/api/TransactionsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka.api

import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue}
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.TransactionState
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.{KafkaException, TopicPartition}
Expand Down Expand Up @@ -695,6 +696,122 @@ class TransactionsTest extends IntegrationTestHarness {
assertThrows(classOf[IllegalStateException], () => producer.initTransactions())
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
def testRecoveryFromEpochOverflow(groupProtocol: String): Unit = {
// We could encounter a bug (see https://issues.apache.org/jira/browse/KAFKA-20090)
// that only reproduces when epoch gets to Short.MaxValue - 1 and transaction is
// aborted on timeout.
val transactionalId = "test-overflow"
var producer = createTransactionalProducer(transactionalId, transactionTimeoutMs = 500)
val abortedRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1, 0, "key".getBytes, "aborted".getBytes)

// Create a transaction, produce one record, and abort
producer.initTransactions()
producer.beginTransaction()
producer.send(abortedRecord)
producer.abortTransaction()
producer.close()

// Find the transaction coordinator partition for this transactional ID
val adminClient = createAdminClient()
try {
val txnDescription = adminClient.describeTransactions(java.util.List.of(transactionalId))
.description(transactionalId).get()
val coordinatorId = txnDescription.coordinatorId()

// Access the transaction coordinator and update the epoch to Short.MaxValue - 2
val coordinatorBroker = brokers.find(_.config.brokerId == coordinatorId).get
val txnCoordinator = coordinatorBroker.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator

// Get the transaction metadata and update the epoch close to Short.MaxValue
// to trigger the overflow scenario. We'll set it high enough that subsequent
// operations will cause it to reach Short.MaxValue - 1 before the timeout.
txnCoordinator.transactionManager.getTransactionState(transactionalId).foreach { txnMetadataOpt =>
txnMetadataOpt.foreach { epochAndMetadata =>
epochAndMetadata.transactionMetadata.inLock(() => {
epochAndMetadata.transactionMetadata.setProducerEpoch((Short.MaxValue - 2).toShort)
null // inLock expects a Supplier that returns a value
})
}
}
} finally {
adminClient.close()
}

// Re-initialize the producer which will bump epoch
producer = createTransactionalProducer(transactionalId, transactionTimeoutMs = 500)
producer.initTransactions()

// Start a transaction
producer.beginTransaction()
// Produce one record and wait for it to complete
producer.send(abortedRecord).get()
producer.flush()

// Check and assert that epoch of the transaction is Short.MaxValue - 1 (before timeout)
val adminClient2 = createAdminClient()
try {
val coordinatorId2 = adminClient2.describeTransactions(java.util.List.of(transactionalId))
.description(transactionalId).get().coordinatorId()
val coordinatorBroker2 = brokers.find(_.config.brokerId == coordinatorId2).get
val txnCoordinator2 = coordinatorBroker2.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator

txnCoordinator2.transactionManager.getTransactionState(transactionalId).foreach { txnMetadataOpt =>
txnMetadataOpt.foreach { epochAndMetadata =>
val currentEpoch = epochAndMetadata.transactionMetadata.producerEpoch()
assertEquals((Short.MaxValue - 1).toShort, currentEpoch,
s"Expected epoch to be ${Short.MaxValue - 1}, but got $currentEpoch")
}
}

// Wait until state is complete abort
waitUntilTrue(() => {
val listResult = adminClient2.listTransactions()
val txns = listResult.all().get().asScala
txns.exists(txn =>
txn.transactionalId() == transactionalId &&
txn.state() == TransactionState.COMPLETE_ABORT
)
}, "Transaction was not aborted on timeout")
} finally {
adminClient2.close()
}

// Abort, this should be treated as retry of the abort caused by timeout
producer.abortTransaction()

// Start a transaction, it would use the state from abort
producer.beginTransaction()
// Produce one record and wait for it to complete
producer.send(abortedRecord).get()
producer.flush()

// Now init new producer and commit a transaction with a distinct value
val producer2 = createTransactionalProducer(transactionalId, transactionTimeoutMs = 500)
producer2.initTransactions()
producer2.beginTransaction()
val committedRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1, 0, "key".getBytes, "committed".getBytes)
producer2.send(committedRecord).get()
producer2.commitTransaction()

// Verify that exactly one record is visible in read-committed mode
val consumer = createReadCommittedConsumer("test-consumer-group")
try {
val tp = new TopicPartition(topic1, 0)
consumer.assign(java.util.Set.of(tp))
val records = consumeRecords(consumer, 1)

val record = records.head
assertArrayEquals("key".getBytes, record.key, "Record key should match")
assertArrayEquals("committed".getBytes, record.value, "Record value should be 'committed'")
assertEquals(0, record.partition, "Record should be in partition 0")
assertEquals(topic1, record.topic, "Record should be in topic1")
} finally {
consumer.close()
}
}

@ParameterizedTest
@CsvSource(Array(
"classic,false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,14 @@ class TransactionMetadataTest {
time.milliseconds(),
TV_0)
assertTrue(txnMetadata.isProducerEpochExhausted)
assertThrows(classOf[IllegalStateException], () => txnMetadata.prepareFenceProducerEpoch())

// When epoch is at max, prepareFenceProducerEpoch logs an error but doesn't throw
// This allows graceful recovery through producer ID rotation
val preparedMetadata = txnMetadata.prepareFenceProducerEpoch()

// Epoch should remain at Short.MaxValue (not overflow to negative)
assertEquals(Short.MaxValue, preparedMetadata.producerEpoch)
assertEquals(TransactionState.PREPARE_EPOCH_FENCE, preparedMetadata.txnState)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,11 @@ private void checkProducerEpoch(short producerEpoch, long offset, short transact
// In both cases, the transaction has already ended (currentTxnFirstOffset is empty).
// We suppress the InvalidProducerEpochException and allow the duplicate marker to
// be written to the log.
// In some buggy scenarios we may start transaction with MAX_VALUE. We allow
// code to gracefully recover from that.
if (transactionVersion >= 2 &&
producerEpoch == current &&
updatedEntry.currentTxnFirstOffset().isEmpty()) {
(updatedEntry.currentTxnFirstOffset().isEmpty() || producerEpoch == Short.MAX_VALUE)) {
log.info("Idempotent transaction marker retry detected for producer {} epoch {}. " +
"Transaction already completed, allowing duplicate marker write.",
producerId, producerEpoch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,12 @@ public TxnTransitMetadata prepareNoTransit() {

public TxnTransitMetadata prepareFenceProducerEpoch() {
if (producerEpoch == Short.MAX_VALUE)
throw new IllegalStateException("Cannot fence producer with epoch equal to Short.MaxValue since this would overflow");
LOGGER.error("Fencing producer {} {} with epoch equal to Short.MaxValue, this must not happen unless there is a bug", transactionalId, producerId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What action do we expect users to take when they see this message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message is there so that there is trail that this scenario happened. It would help to RCA weird cases.


// If we've already failed to fence an epoch (because the write to the log failed), we don't increase it again.
// This is safe because we never return the epoch to client if we fail to fence the epoch
short bumpedEpoch = hasFailedEpochFence ? producerEpoch : (short) (producerEpoch + 1);
// Also don't increase if producerEpoch is already at max, to avoid overflow.
short bumpedEpoch = hasFailedEpochFence || producerEpoch == Short.MAX_VALUE ? producerEpoch : (short) (producerEpoch + 1);
Comment on lines +146 to +147
Copy link
Contributor

@chickenchickenlove chickenchickenlove Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch!
I have an opinion here!
Please take a look.

In my view, this code seems to affect only TV1. Were you intending to make a change specifically for TV1? If not, this may end up being an unintended change.

AFAIK, Both TV1 and TV2 call this code line.
However, it seems that TV2 don' use this epoch generated here actually.

Even when TV2 is fenced, it actually uses the epoch obtained from txnMetadata.prepareAbortOrCommit().

https://github.com/apache/kafka/blob/trunk/core%2Fsrc%2Fmain%2Fscala%2Fkafka%2Fcoordinator%2Ftransaction%2FTransactionCoordinator.scala#L831-L838

Therefore, I believe any change here would affect only TV1...!

What do you think?
If I'm wrong, sorry for making you confused!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct -- the bumped epoch is then ignored in TV2 code path. But if we don't make changes here, the TV2 code path wouldn't get through.

Copy link
Contributor

@chickenchickenlove chickenchickenlove Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comments!!! 🙇‍♂️
Sorry to bother you!
I have two more questtions!

###1.
Sorry for my poor understanding...🙇‍♂️
I’m not fully sure which part of the TV2 code path would fail to proceed if we don’t make changes here. Could you please share a bit more detail or point me to where it gets blocked?

###2.
This change in the code/PR seems to introduce a new path where TV1 could potentially reach an epoch value of 32767.
If TV1 has never experienced this issue in practice, would it make sense to handle TV1 and TV2 differently—for example, similar to how prepareAbortOrCommit() checks supportEpochBump()?

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the question 1 there are a couple points:
1a. For TV2 when a transaction is aborted on timeout or on InitiProducerId it would first call prepareFenceProducerEpoch. If an exception is thrown here, the function wouldn't pass further.
2a. The goal of this change is not to fix a specific bug that could lead to max value epoch, specific bugs should get fixed correspondingly. The goal of this change is to provide a path to recovery from the state that is made by a bug. So even though there are no known bugs with TV1 that could result in this state, the "way out" should be present.
For the question 2 -- the path was already present for TV1 I believe, when fencing happens the epoch is first bumped and can be max value. The logic is already handled differently (TV1 handling even has its own function).


TransitionData data = new TransitionData(TransactionState.PREPARE_EPOCH_FENCE);
data.producerEpoch = bumpedEpoch;
Expand Down Expand Up @@ -238,8 +239,14 @@ public TxnTransitMetadata prepareAbortOrCommit(TransactionState newState,
boolean noPartitionAdded) {
TransitionData data = new TransitionData(newState);
if (clientTransactionVersion.supportsEpochBump()) {
// We already ensured that we do not overflow here. MAX_SHORT is the highest possible value.
data.producerEpoch = (short) (producerEpoch + 1);
if (producerEpoch == Short.MAX_VALUE && newState == TransactionState.PREPARE_ABORT) {
// If we're already in a broken state, we let the abort go through without
// epoch overflow, so that we can recover and continue.
LOGGER.error("Aborting producer {} {} with epoch equal to Short.MaxValue, this must not happen unless there is a bug", transactionalId, producerId);
} else {
// We already ensured that we do not overflow here. MAX_SHORT is the highest possible value.
data.producerEpoch = (short) (producerEpoch + 1);
}
data.lastProducerEpoch = producerEpoch;
} else {
data.producerEpoch = producerEpoch;
Expand Down