Skip to content

Commit

Permalink
GH-2981: Suppress abortTransaction in KafkaTemplate
Browse files Browse the repository at this point in the history
Fixes: #2981

When `producer.abortTransaction()` fails, the original exception is lost

* Catch an exception on `producer.abortTransaction()` and `ex.addSuppressed(abortException)`

**Cherry-pick to `3.0.x`**
  • Loading branch information
artembilan committed Jan 11, 2024
1 parent c683f31 commit bd5b131
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2023 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -657,9 +657,14 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
catch (SkipAbortException e) { // NOSONAR - exception flow control
throw ((RuntimeException) e.getCause()); // NOSONAR - lost stack trace
}
catch (Exception e) {
producer.abortTransaction();
throw e;
catch (Exception ex) {
try {
producer.abortTransaction();
}
catch (Exception abortException) {
ex.addSuppressed(abortException);
}
throw ex;
}
finally {
this.producers.remove(currentThread);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -506,6 +506,31 @@ public void testAbort() {
verify(producer, never()).commitTransaction();
}

@Test
public void abortFiledOriginalExceptionRethrown() {
MockProducer<String, String> producer = spy(new MockProducer<>());
producer.initTransactions();
producer.abortTransactionException = new RuntimeException("abort failed");

ProducerFactory<String, String> pf = new MockProducerFactory<>((tx, id) -> producer, null);

KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);

assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() ->
template.executeInTransaction(t -> {
throw new RuntimeException("intentional");
}))
.withMessage("intentional")
.withStackTraceContaining("abort failed");

assertThat(producer.transactionCommitted()).isFalse();
assertThat(producer.transactionAborted()).isFalse();
assertThat(producer.closed()).isTrue();
verify(producer, never()).commitTransaction();
}

@Test
public void testExecuteInTransactionNewInnerTx() {
@SuppressWarnings("unchecked")
Expand Down

0 comments on commit bd5b131

Please sign in to comment.