Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionAbortableException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand Down Expand Up @@ -771,6 +773,15 @@ public synchronized void maybeTransitionToErrorState(RuntimeException exception)
|| exception instanceof InvalidPidMappingException) {
transitionToFatalError(exception);
} else if (isTransactional()) {
// RetriableExceptions from the Sender thread are converted to Abortable errors
// because they indicate that the transaction cannot be completed after all retry attempts.
// This conversion ensures the application layer treats these errors as abortable,
// preventing duplicate message delivery.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe not a change we need now, but it isn't totally clear from the method name that this should only be called from the sender thread. Maybe we should refactor this in the future.

Copy link
Contributor Author

@k-raina k-raina Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.. Got me confused too.
Thanks for review

if (exception instanceof RetriableException ||
exception instanceof InvalidTxnStateException) {
exception = new TransactionAbortableException("Transaction Request was aborted after exhausting retries.", exception);
}

if (needToTriggerEpochBumpFromClient() && !isCompleting()) {
clientSideEpochBumpRequired = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
package org.apache.kafka.common.errors;

public class TransactionAbortableException extends ApiException {

private static final long serialVersionUID = 1L;

public TransactionAbortableException(String message, Throwable cause) {
super(message, cause);
}

public TransactionAbortableException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.InOrder;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -205,7 +207,7 @@ private static Map<TopicPartition, MemoryRecords> partitionRecords(ProduceReques
}));
return Collections.unmodifiableMap(partitionRecords);
}

@Test
public void testSimple() throws Exception {
long offset = 0;
Expand Down Expand Up @@ -3031,8 +3033,62 @@ public void testCustomErrorMessage() throws Exception {
verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0, -1, errorMessage), errorMessage);
}

@ParameterizedTest
@EnumSource(value = Errors.class, names = {"COORDINATOR_LOAD_IN_PROGRESS", "INVALID_TXN_STATE"})
public void testTransactionShouldTransitionToAbortableForSenderAPI(Errors error) throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TransactionManager txnManager = new TransactionManager(
logContext,
"testRetriableException",
60000,
RETRY_BACKOFF_MS,
apiVersions,
false
);

// Setup with transaction state and initialize transactions with single retry
setupWithTransactionState(txnManager, false, null, 1);
doInitTransactions(txnManager, producerIdAndEpoch);

// Begin transaction and add partition
txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();

// First produce request
appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, error, -1));
sender.runOnce();

// Sleep for retry backoff
time.sleep(RETRY_BACKOFF_MS);

// Second attempt to process record - PREPARE the response before sending
client.prepareResponse(produceResponse(tp0, -1, error, -1));
sender.runOnce();

// Now transaction should be in abortable state after retry is exhausted
assertTrue(txnManager.hasAbortableError());

// Second produce request - should fail with TransactionAbortableException
Future<RecordMetadata> future2 = appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1));
// Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state
sender.runOnce();
assertFutureFailure(future2, TransactionAbortableException.class);

// Verify transaction API requests also fail with TransactionAbortableException
try {
txnManager.beginCommit();
fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state");
} catch (KafkaException e) {
assertEquals(TransactionAbortableException.class, e.getCause().getClass());
}
}

@Test
public void testSenderShouldRetryWithBackoffOnRetriableError() {
public void testSenderShouldRetryWithBackoffOnRetriableError() throws InterruptedException {
final long producerId = 343434L;
TransactionManager transactionManager = createTransactionManager();
setupWithTransactionState(transactionManager);
Expand Down Expand Up @@ -3674,6 +3730,10 @@ private void setupWithTransactionState(TransactionManager transactionManager, bo
setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE, 0);
}

private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool, int retries) {
setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, retries, 0);
}

private void setupWithTransactionState(
TransactionManager transactionManager,
boolean guaranteeOrder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2958,7 +2958,7 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException {
"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
runUntil(commitResult::isCompleted); // the commit shouldn't be completed without being sent since the produce request failed.
assertFalse(commitResult.isSuccessful()); // the commit shouldn't succeed since the produce request failed.
assertThrows(TimeoutException.class, commitResult::await);
assertInstanceOf(TimeoutException.class, assertThrows(TransactionAbortableException.class, commitResult::await).getCause());

assertTrue(transactionManager.hasAbortableError());
assertTrue(transactionManager.hasOngoingTransaction());
Expand Down