Skip to content

Commit

Permalink
UserTransaction should fire context lifecycle events
Browse files Browse the repository at this point in the history
- such as @initialized(TransactionScoped.class)
- resolves quarkusio#28709
  • Loading branch information
mkouba committed Oct 21, 2022
1 parent 0cf278a commit 9b66eef
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
import io.quarkus.deployment.logging.LogCleanupFilterBuildItem;
import io.quarkus.gizmo.ClassCreator;
import io.quarkus.narayana.jta.runtime.CDIDelegatingTransactionManager;
import io.quarkus.narayana.jta.runtime.NarayanaJtaProducers;
import io.quarkus.narayana.jta.runtime.NarayanaJtaRecorder;
import io.quarkus.narayana.jta.runtime.TransactionManagerConfiguration;
Expand Down Expand Up @@ -85,7 +84,6 @@ public void build(NarayanaJtaRecorder recorder,
recorder.handleShutdown(shutdownContextBuildItem, transactions);
feature.produce(new FeatureBuildItem(Feature.NARAYANA_JTA));
additionalBeans.produce(new AdditionalBeanBuildItem(NarayanaJtaProducers.class));
additionalBeans.produce(new AdditionalBeanBuildItem(CDIDelegatingTransactionManager.class));
additionalBeans.produce(AdditionalBeanBuildItem.unremovableOf("io.quarkus.narayana.jta.RequestScopedTransaction"));

runtimeInit.produce(new RuntimeInitializedClassBuildItem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import javax.enterprise.inject.Produces;
import javax.inject.Singleton;
import javax.transaction.TransactionSynchronizationRegistry;
import javax.transaction.UserTransaction;

import org.jboss.tm.JBossXATerminator;
import org.jboss.tm.XAResourceRecoveryRegistry;
Expand All @@ -13,7 +14,6 @@
import com.arjuna.ats.internal.jbossatx.jta.jca.XATerminator;
import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple;
import com.arjuna.ats.jbossatx.jta.RecoveryManagerService;
import com.arjuna.ats.jta.UserTransaction;

import io.quarkus.arc.Unremovable;

Expand All @@ -28,8 +28,15 @@ public UserTransactionRegistry userTransactionRegistry() {

@Produces
@ApplicationScoped
public javax.transaction.UserTransaction userTransaction() {
return UserTransaction.userTransaction();
public UserTransaction userTransaction() {
return new NotifyingUserTransaction(com.arjuna.ats.jta.UserTransaction.userTransaction());
}

@Produces
@Unremovable
@Singleton
public javax.transaction.TransactionManager transactionManager() {
return new NotifyingTransactionManager();
}

@Produces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import javax.enterprise.context.Destroyed;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.InvalidTransactionException;
Expand All @@ -20,51 +18,20 @@

import org.jboss.logging.Logger;

import io.quarkus.arc.Unremovable;

/**
* A delegating transaction manager which receives an instance of Narayana transaction manager
* and delegates all calls to it.
* On top of it the implementation adds the CDI events processing for {@link TransactionScoped}.
*/
@Singleton
@Unremovable // used by Arc for transactional observers
public class CDIDelegatingTransactionManager implements TransactionManager, Serializable {

private static final Logger log = Logger.getLogger(CDIDelegatingTransactionManager.class);
public class NotifyingTransactionManager extends TransactionScopedNotifier implements TransactionManager, Serializable {

private static final long serialVersionUID = 1598L;

private final transient com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple delegate;

/**
* An {@link Event} that can {@linkplain Event#fire(Object) fire}
* {@link Transaction}s when the {@linkplain TransactionScoped transaction scope} is initialized.
*/
@Inject
@Initialized(TransactionScoped.class)
Event<Transaction> transactionScopeInitialized;
private static final Logger LOG = Logger.getLogger(NotifyingTransactionManager.class);

/**
* An {@link Event} that can {@linkplain Event#fire(Object) fire}
* {@link Object}s before the {@linkplain TransactionScoped transaction scope} is destroyed.
*/
@Inject
@BeforeDestroyed(TransactionScoped.class)
Event<Object> transactionScopeBeforeDestroyed;
private transient com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple delegate;

/**
* An {@link Event} that can {@linkplain Event#fire(Object) fire}
* {@link Object}s when the {@linkplain TransactionScoped transaction scope} is destroyed.
*/
@Inject
@Destroyed(TransactionScoped.class)
Event<Object> transactionScopeDestroyed;

/**
* Delegating transaction manager call to com.arjuna.ats.jta.{@link com.arjuna.ats.jta.TransactionManager}
*/
public CDIDelegatingTransactionManager() {
NotifyingTransactionManager() {
delegate = (com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple) com.arjuna.ats.jta.TransactionManager
.transactionManager();
}
Expand All @@ -80,9 +47,7 @@ public CDIDelegatingTransactionManager() {
@Override
public void begin() throws NotSupportedException, SystemException {
delegate.begin();
if (this.transactionScopeInitialized != null) {
this.transactionScopeInitialized.fire(this.getTransaction());
}
initialized(getTransaction());
}

/**
Expand All @@ -97,16 +62,11 @@ public void begin() throws NotSupportedException, SystemException {
@Override
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException,
IllegalStateException, SystemException {
if (this.transactionScopeBeforeDestroyed != null) {
this.transactionScopeBeforeDestroyed.fire(this.getTransaction());
}

beforeDestroyed(this.getTransaction());
try {
delegate.commit();
} finally {
if (this.transactionScopeDestroyed != null) {
this.transactionScopeDestroyed.fire(this.toString());
}
destroyed(toString());
}
}

Expand All @@ -122,20 +82,16 @@ public void commit() throws RollbackException, HeuristicMixedException, Heuristi
@Override
public void rollback() throws IllegalStateException, SecurityException, SystemException {
try {
if (this.transactionScopeBeforeDestroyed != null) {
this.transactionScopeBeforeDestroyed.fire(this.getTransaction());
}
beforeDestroyed(getTransaction());
} catch (Throwable t) {
log.error("Failed to fire @BeforeDestroyed(TransactionScoped.class)", t);
LOG.error("Failed to fire @BeforeDestroyed(TransactionScoped.class)", t);
}

try {
delegate.rollback();
} finally {
//we don't need a catch block here, if this one fails we just let the exception propagate
if (this.transactionScopeDestroyed != null) {
this.transactionScopeDestroyed.fire(this.toString());
}
destroyed(toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.quarkus.narayana.jta.runtime;

import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import org.jboss.logging.Logger;

public class NotifyingUserTransaction extends TransactionScopedNotifier implements UserTransaction {

private static final Logger LOG = Logger.getLogger(NotifyingUserTransaction.class);

private final UserTransaction delegate;

public NotifyingUserTransaction(UserTransaction delegate) {
this.delegate = delegate;
}

@Override
public void begin() throws NotSupportedException, SystemException {
delegate.begin();
initialized(delegate);
}

@Override
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException,
IllegalStateException, SystemException {
beforeDestroyed(delegate);
try {
delegate.commit();
} finally {
destroyed(toString());
}
}

@Override
public void rollback() throws IllegalStateException, SecurityException, SystemException {
try {
beforeDestroyed(delegate);
} catch (Throwable t) {
LOG.error("Failed to fire @BeforeDestroyed(TransactionScoped.class)", t);
}
try {
delegate.rollback();
} finally {
destroyed(toString());
}
}

@Override
public void setRollbackOnly() throws IllegalStateException, SystemException {
delegate.setRollbackOnly();
}

@Override
public int getStatus() throws SystemException {
return delegate.getStatus();
}

@Override
public void setTransactionTimeout(int seconds) throws SystemException {
delegate.setTransactionTimeout(seconds);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.quarkus.narayana.jta.runtime;

import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.context.Destroyed;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Event;
import javax.transaction.TransactionScoped;

import io.quarkus.arc.Arc;

abstract class TransactionScopedNotifier {

private transient Event<Object> initialized;
private transient Event<Object> beforeDestroyed;
private transient Event<Object> destroyed;

void initialized(Object payload) {
if (initialized == null) {
initialized = Arc.container().beanManager().getEvent()
.select(Initialized.Literal.of(TransactionScoped.class));
}
initialized.fire(payload);
}

void beforeDestroyed(Object payload) {
if (beforeDestroyed == null) {
beforeDestroyed = Arc.container().beanManager().getEvent()
.select(BeforeDestroyed.Literal.of(TransactionScoped.class));
}
beforeDestroyed.fire(payload);
}

void destroyed(Object payload) {
if (destroyed == null) {
destroyed = Arc.container().beanManager().getEvent()
.select(Destroyed.Literal.of(TransactionScoped.class));
}
destroyed.fire(payload);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.arjuna.ats.jta.logging.jtaLogger;

import io.quarkus.arc.runtime.InterceptorBindings;
import io.quarkus.narayana.jta.runtime.CDIDelegatingTransactionManager;
import io.quarkus.narayana.jta.runtime.NotifyingTransactionManager;
import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
import io.quarkus.transaction.annotations.Rollback;
import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -110,7 +110,7 @@ protected Object invokeInOurTx(InvocationContext ic, TransactionManager tm, Runn

int timeoutConfiguredForMethod = getTransactionTimeoutFromAnnotation(ic);

int currentTmTimeout = ((CDIDelegatingTransactionManager) transactionManager).getTransactionTimeout();
int currentTmTimeout = ((NotifyingTransactionManager) transactionManager).getTransactionTimeout();

if (timeoutConfiguredForMethod > 0) {
tm.setTransactionTimeout(timeoutConfiguredForMethod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import javax.transaction.TransactionManager;
import javax.transaction.TransactionScoped;
import javax.transaction.Transactional;
import javax.transaction.UserTransaction;

import org.jboss.logging.Logger;

Expand Down Expand Up @@ -110,10 +111,13 @@ void transactionScopeActivated(@Observes @Initialized(TransactionScoped.class) f
log.error("Context on @Initialized has to be active");
throw new IllegalStateException("Context on @Initialized has to be active");
}
if (!(event instanceof Transaction)) {
log.error("@Initialized scope expects event payload being the " + Transaction.class.getName());
if (!(event instanceof Transaction) && !(event instanceof UserTransaction)) {
log.error(
"@Initialized scope expects event payload being the " + Transaction.class.getName() + " or "
+ UserTransaction.class.getName());
throw new IllegalStateException(
"@Initialized scope expects event payload being the " + Transaction.class.getName());
"@Initialized scope expects event payload being the " + Transaction.class.getName() + " or "
+ UserTransaction.class.getName());
}

initializedCount++;
Expand All @@ -130,17 +134,19 @@ void transactionScopePreDestroy(@Observes @BeforeDestroyed(TransactionScoped.cla
try {
ctx = beanManager.getContext(TransactionScoped.class);
} catch (Exception e) {
log.error("Context on @Initialized is not available");
log.error("Context on @BeforeDestroyed is not available");
throw e;
}
if (!ctx.isActive()) {
log.error("Context on @BeforeDestroyed has to be active");
throw new IllegalStateException("Context on @BeforeDestroyed has to be active");
}
if (!(event instanceof Transaction)) {
log.error("@Initialized scope expects event payload being the " + Transaction.class.getName());
if (!(event instanceof Transaction) && !(event instanceof UserTransaction)) {
log.error("@BeforeDestroyed scope expects event payload being the " + Transaction.class.getName() + " or "
+ UserTransaction.class.getName());
throw new IllegalStateException(
"@Initialized scope expects event payload being the " + Transaction.class.getName());
"@BeforeDestroyed scope expects event payload being the " + Transaction.class.getName() + " or "
+ UserTransaction.class.getName());
}

beforeDestroyedCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ void transactionScopedInTransaction() throws Exception {
}

@Test
void scopeEventsAreEmitted() {
beanEvents.cleanCounts();
void scopeEventsAreEmitted() throws Exception {
TransactionBeanWithEvents.cleanCounts();

beanEvents.doInTransaction(true);

Expand All @@ -70,11 +70,14 @@ void scopeEventsAreEmitted() {
// expect runtime exception to rollback the call
}

assertEquals(2, beanEvents.getInitialized(), "Expected @Initialized to be observed");
assertEquals(2, beanEvents.getBeforeDestroyed(), "Expected @BeforeDestroyed to be observer");
assertEquals(2, beanEvents.getDestroyed(), "Expected @Destroyed to be observer");
assertEquals(1, beanEvents.getCommited(), "Expected commit to be called once");
assertEquals(1, beanEvents.getRolledBack(), "Expected rollback to be called once");
tx.begin();
tx.commit();

assertEquals(3, TransactionBeanWithEvents.getInitialized(), "Expected @Initialized to be observed");
assertEquals(3, TransactionBeanWithEvents.getBeforeDestroyed(), "Expected @BeforeDestroyed to be observer");
assertEquals(3, TransactionBeanWithEvents.getDestroyed(), "Expected @Destroyed to be observer");
assertEquals(1, TransactionBeanWithEvents.getCommited(), "Expected commit to be called once");
assertEquals(1, TransactionBeanWithEvents.getRolledBack(), "Expected rollback to be called once");
}

}

0 comments on commit 9b66eef

Please sign in to comment.