Skip to content

Update Transactional producer to translate retriable into abortable exxceptions #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionAbortableException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand Down Expand Up @@ -1073,6 +1074,11 @@ private void transitionTo(State target, RuntimeException error) {
} else if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) {
if (error == null)
throw new IllegalArgumentException("Cannot transition to " + target + " with a null exception");

if (error instanceof RetriableException) {
error = new TransactionAbortableException("Transaction Request was aborted after exhausting retries.", error);
}
Comment on lines +1078 to +1080
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify test coverage for exception conversion

The wrapping of RetriableException in TransactionAbortableException is a behavior change that should be thoroughly tested. Ensure test cases cover scenarios where retries are exhausted and confirm the exception type received by the application layer.


🏁 Script executed:

#!/bin/bash
# Searching for tests covering the TransactionAbortableException scenario

echo "Checking for tests that verify exception conversion:"
rg -l "TransactionAbortableException.*RetriableException|RetriableException.*TransactionAbortableException" --type java clients/src/test/

echo -e "\nChecking for tests with 'exhausted' retries:"
rg -l "exhausted.*retries|retries.*exhausted" --type java clients/src/test/

Length of output: 525


Add test for exception conversion in TransactionManager

The new wrapping logic in TransactionManager (clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:1078‑1080) is a behavior change and needs explicit coverage. Although SenderTest.java already checks for “exhausted retries” (clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java), it does not verify that a RetriableException is wrapped as a TransactionAbortableException.

Please add or update a unit test (e.g. in a new TransactionManagerTest) to:

  • Simulate a failed transactional request that throws a RetriableException on the final retry.
  • Assert that the thrown exception is a TransactionAbortableException.
  • Verify that the original RetriableException is preserved as the cause.


lastError = error;
} else {
lastError = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
package org.apache.kafka.common.errors;

public class TransactionAbortableException extends ApiException {

private static final long serialVersionUID = 1L;

public TransactionAbortableException(String message, Throwable cause) {
super(message, cause);
}

public TransactionAbortableException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private static Map<TopicPartition, MemoryRecords> partitionRecords(ProduceReques
}));
return Collections.unmodifiableMap(partitionRecords);
}

@Test
public void testSimple() throws Exception {
long offset = 0;
Expand Down Expand Up @@ -3001,7 +3001,61 @@ public void testCustomErrorMessage() throws Exception {
}

@Test
public void testSenderShouldRetryWithBackoffOnRetriableError() {
public void testSenderShouldTransitionToAbortableAfterRetriesExhausted() throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TransactionManager txnManager = new TransactionManager(
logContext,
"testRetriableException",
60000,
RETRY_BACKOFF_MS,
apiVersions
);

// Setup with transaction state and initialize transactions with single retry
setupWithTransactionState(txnManager, false, null, 1);
doInitTransactions(txnManager, producerIdAndEpoch);

// Begin transaction and add partition
txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();

// First produce request
appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
sender.runOnce();

// Sleep for retry backoff
time.sleep(RETRY_BACKOFF_MS);

// Second attempt to process record - PREPARE the response before sending
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
sender.runOnce();

// Now transaction should be in abortable state after retry is exhausted
assertTrue(txnManager.hasAbortableError());

// Second produce request - should fail with TransactionAbortableException
Future<RecordMetadata> future2 = appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1));
// Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state
sender.runOnce();
assertFutureFailure(future2, TransactionAbortableException.class);


// Verify transaction API requests also fail with TransactionAbortableException
try {
txnManager.beginCommit();
fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state");
} catch (KafkaException e) {
assertEquals(TransactionAbortableException.class, e.getCause().getClass());
}

}
Comment on lines +3004 to +3055
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Harden the new abort‑on‑retry‑exhaustion test

The test does the job functionally, but a few tweaks will make it more deterministic and easier to maintain:

@@
-    // No timeout guard – a hang here stalls the whole suite
+    @Timeout(10)   // keep the suite moving even if the sender hangs

@@
-        // Sleep for retry backoff
-        time.sleep(RETRY_BACKOFF_MS);
+        // Advance mock time instead of real sleeping (keeps the test instant)
+        time.sleep(RETRY_BACKOFF_MS);
@@
-        // Second produce request - should fail with TransactionAbortableException
-        Future<RecordMetadata> future2 = appendToAccumulator(tp0);
-        client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1));
-        // Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state
-        sender.runOnce();
-        assertFutureFailure(future2, TransactionAbortableException.class);
+        // Second produce request – should be failed immediately
+        Future<RecordMetadata> future2 = appendToAccumulator(tp0);
+        sender.runOnce(); // no response required – batch is aborted locally
+        assertFutureFailure(future2, TransactionAbortableException.class);
@@
-        try {
-            txnManager.beginCommit();
-            fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state");
-        } catch (KafkaException e) {
-            assertEquals(TransactionAbortableException.class, e.getCause().getClass());
-        }
+        assertThrows(TransactionAbortableException.class, txnManager::beginCommit,
+            "beginCommit() should fail once the manager is in the abortable state");

Rationale:

  1. @Timeout prevents the build hanging if the state machine changes.
  2. Advancing the same MockTime is already correct; explicitly noting this avoids confusion about real sleeps.
  3. client.prepareResponse(... Errors.NONE ...) is never consumed (no request is sent once the manager is abortable) and may leave a stray response in the mock queue—better to drop it.
  4. Use assertThrows instead of try/catch for readability and to avoid accidental swallowing of the wrong exception.

To confirm JUnit versions and existing imports before adjusting annotations or assertions, let’s inspect the top of SenderTest.java and search for existing @Test, assertThrows, and any usage of @Timeout across the repo.


🏁 Script executed:

#!/bin/bash
# Show the first 200 lines of SenderTest.java to inspect imports and annotations
head -n 200 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

# Find all @Test annotations in this file (to see JUnit4 vs JUnit5)
rg "@Test" -n clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java || true

# Check for any assertThrows usage in this test
rg "assertThrows" -n clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java || true

# Scan the repository for @Timeout (to see if JUnit5’s Timeout is already used elsewhere)
rg "@Timeout" -n .

Length of output: 49834


Harden testSenderShouldTransitionToAbortableAfterRetriesExhausted

Apply a timeout guard, clarify mock‐time usage, drop the unused mock response, and switch to assertThrows for readability:

@@ clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
-    @Test
+    @Timeout(10L)   // prevent suite hang if state machine changes
+    @Test
     public void testSenderShouldTransitionToAbortableAfterRetriesExhausted() throws InterruptedException {
@@
-        // Sleep for retry backoff
-        time.sleep(RETRY_BACKOFF_MS);
+        // Advance mock time for retry backoff (instant)
+        time.sleep(RETRY_BACKOFF_MS);
@@
-        Future<RecordMetadata> future2 = appendToAccumulator(tp0);
-        client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1));
-        // Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state
-        sender.runOnce();
-        assertFutureFailure(future2, TransactionAbortableException.class);
+        Future<RecordMetadata> future2 = appendToAccumulator(tp0);
+        // Batch is aborted locally in abortable state; no mock response needed
+        sender.runOnce();
+        assertFutureFailure(future2, TransactionAbortableException.class);
@@
-        try {
-            txnManager.beginCommit();
-            fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state");
-        } catch (KafkaException e) {
-            assertEquals(TransactionAbortableException.class, e.getCause().getClass());
-        }
+        assertThrows(TransactionAbortableException.class, txnManager::beginCommit,
+            "beginCommit() should fail once the manager is in the abortable state");

Rationale:

  • @Timeout guards against potential hangs.
  • The comment now reflects that MockTime.sleep is instantaneous.
  • Removing the unused prepareResponse prevents stray responses in the mock queue.
  • assertThrows improves clarity and avoids silencing unexpected exceptions.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public void testSenderShouldTransitionToAbortableAfterRetriesExhausted() throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TransactionManager txnManager = new TransactionManager(
logContext,
"testRetriableException",
60000,
RETRY_BACKOFF_MS,
apiVersions
);
// Setup with transaction state and initialize transactions with single retry
setupWithTransactionState(txnManager, false, null, 1);
doInitTransactions(txnManager, producerIdAndEpoch);
// Begin transaction and add partition
txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();
// First produce request
appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
sender.runOnce();
// Sleep for retry backoff
time.sleep(RETRY_BACKOFF_MS);
// Second attempt to process record - PREPARE the response before sending
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
sender.runOnce();
// Now transaction should be in abortable state after retry is exhausted
assertTrue(txnManager.hasAbortableError());
// Second produce request - should fail with TransactionAbortableException
Future<RecordMetadata> future2 = appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1));
// Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state
sender.runOnce();
assertFutureFailure(future2, TransactionAbortableException.class);
// Verify transaction API requests also fail with TransactionAbortableException
try {
txnManager.beginCommit();
fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state");
} catch (KafkaException e) {
assertEquals(TransactionAbortableException.class, e.getCause().getClass());
}
}
@Timeout(10L) // prevent suite hang if state machine changes
@Test
public void testSenderShouldTransitionToAbortableAfterRetriesExhausted() throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TransactionManager txnManager = new TransactionManager(
logContext,
"testRetriableException",
60000,
RETRY_BACKOFF_MS,
apiVersions
);
// Setup with transaction state and initialize transactions with single retry
setupWithTransactionState(txnManager, false, null, 1);
doInitTransactions(txnManager, producerIdAndEpoch);
// Begin transaction and add partition
txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();
// First produce request
appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
sender.runOnce();
// Advance mock time for retry backoff (instant)
time.sleep(RETRY_BACKOFF_MS);
// Second attempt to process record - PREPARE the response before sending
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
sender.runOnce();
// Now transaction should be in abortable state after retry is exhausted
assertTrue(txnManager.hasAbortableError());
Future<RecordMetadata> future2 = appendToAccumulator(tp0);
// Batch is aborted locally in abortable state; no mock response needed
sender.runOnce();
assertFutureFailure(future2, TransactionAbortableException.class);
assertThrows(TransactionAbortableException.class, txnManager::beginCommit,
"beginCommit() should fail once the manager is in the abortable state");
}


@Test
public void testSenderShouldRetryWithBackoffOnRetriableError() throws InterruptedException {
final long producerId = 343434L;
TransactionManager transactionManager = createTransactionManager();
setupWithTransactionState(transactionManager);
Expand Down Expand Up @@ -3635,6 +3689,10 @@ private void setupWithTransactionState(TransactionManager transactionManager, bo
setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE, 0);
}

private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool, int retries) {
setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, retries, 0);
}

private void setupWithTransactionState(
TransactionManager transactionManager,
boolean guaranteeOrder,
Expand Down