Skip to content

Commit 8c494e5

Browse files
committed
Defer triggerAfterCompletion invocation in doRollbackOnCommitException
Closes gh-34595 (cherry picked from commit cc986cd)
1 parent 62d7396 commit 8c494e5

File tree

2 files changed

+35
-12
lines changed

2 files changed

+35
-12
lines changed

spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerTests.java

+27
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.r2dbc.spi.ConnectionFactory;
2424
import io.r2dbc.spi.IsolationLevel;
2525
import io.r2dbc.spi.R2dbcBadGrammarException;
26+
import io.r2dbc.spi.R2dbcTransientResourceException;
2627
import io.r2dbc.spi.Statement;
2728
import org.junit.jupiter.api.BeforeEach;
2829
import org.junit.jupiter.api.Test;
@@ -32,6 +33,7 @@
3233
import reactor.core.publisher.Mono;
3334
import reactor.test.StepVerifier;
3435

36+
import org.springframework.dao.TransientDataAccessResourceException;
3537
import org.springframework.r2dbc.BadSqlGrammarException;
3638
import org.springframework.transaction.CannotCreateTransactionException;
3739
import org.springframework.transaction.IllegalTransactionStateException;
@@ -315,6 +317,31 @@ void testConnectionReleasedWhenRollbackFails() {
315317
verify(connectionMock).close();
316318
}
317319

320+
@Test
321+
void testCommitAndRollbackFails() {
322+
when(connectionMock.isAutoCommit()).thenReturn(false);
323+
when(connectionMock.commitTransaction()).thenReturn(Mono.defer(() ->
324+
Mono.error(new R2dbcBadGrammarException("Commit should fail"))));
325+
when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() ->
326+
Mono.error(new R2dbcTransientResourceException("Rollback should also fail"))));
327+
328+
TransactionalOperator operator = TransactionalOperator.create(tm);
329+
330+
ConnectionFactoryUtils.getConnection(connectionFactoryMock)
331+
.doOnNext(connection -> connection.createStatement("foo")).then()
332+
.as(operator::transactional)
333+
.as(StepVerifier::create)
334+
.verifyError(TransientDataAccessResourceException.class);
335+
336+
verify(connectionMock).isAutoCommit();
337+
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
338+
verify(connectionMock).createStatement("foo");
339+
verify(connectionMock).commitTransaction();
340+
verify(connectionMock).rollbackTransaction();
341+
verify(connectionMock).close();
342+
verifyNoMoreInteractions(connectionMock);
343+
}
344+
318345
@Test
319346
void testTransactionSetRollbackOnly() {
320347
when(connectionMock.isAutoCommit()).thenReturn(false);

spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java

+8-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -494,21 +494,17 @@ else if (ErrorPredicates.TRANSACTION_EXCEPTION.test(ex)) {
494494
}));
495495
}
496496
else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) {
497-
Mono<Void> mono;
497+
Mono<Void> mono = Mono.empty();
498498
if (!beforeCompletionInvoked.get()) {
499499
mono = triggerBeforeCompletion(synchronizationManager, status);
500500
}
501-
else {
502-
mono = Mono.empty();
503-
}
504501
result = mono.then(doRollbackOnCommitException(synchronizationManager, status, ex))
505502
.then(propagateException);
506503
}
507-
508504
return result;
509505
})
510506
.then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
511-
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex)))
507+
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex)))
512508
.then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED))
513509
.then(Mono.defer(() -> {
514510
if (status.isNewTransaction()) {
@@ -518,8 +514,8 @@ else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) {
518514
}))));
519515

520516
return commit
521-
.onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status)
522-
.then(Mono.error(ex))).then(cleanupAfterCompletion(synchronizationManager, status));
517+
.onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(ex)))
518+
.then(cleanupAfterCompletion(synchronizationManager, status));
523519
}
524520

525521
/**
@@ -571,8 +567,8 @@ private Mono<Void> processRollback(TransactionSynchronizationManager synchroniza
571567
}
572568
return beforeCompletion;
573569
}
574-
})).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> triggerAfterCompletion(
575-
synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
570+
})).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex ->
571+
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
576572
.then(Mono.defer(() -> {
577573
if (status.isNewTransaction()) {
578574
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, ex));
@@ -623,7 +619,7 @@ else if (status.hasTransaction()) {
623619
return Mono.empty();
624620
}))
625621
.then(Mono.error(rbex));
626-
}).then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK))
622+
}).then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)))
627623
.then(Mono.defer(() -> {
628624
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
629625
return Mono.empty();

0 commit comments

Comments
 (0)