Skip to content

Commit

Permalink
tcc api
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue committed Feb 20, 2024
1 parent bb41a20 commit b8cc88f
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 74 deletions.
4 changes: 0 additions & 4 deletions rocketmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@
<artifactId>rocketmq-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class SeataMQProducer extends TransactionMQProducer {
public static String PROPERTY_SEATA_BRANCHID = RootContext.KEY_BRANCHID;
private TransactionListener transactionListener;

private TCCRocketMQ tccRocketMQ;

SeataMQProducer(final String producerGroup) {
this(null, producerGroup, null);
}
Expand Down Expand Up @@ -94,10 +96,10 @@ public SendResult send(Message msg) throws MQClientException, MQBrokerException,
@Override
public SendResult send(Message msg, long timeout) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
if (RootContext.inGlobalTransaction()) {
if (SeataMQProducerFactory.getTccRocketMQ() == null) {
if (tccRocketMQ == null) {
throw new RuntimeException("TCCRocketMQ is not initialized");
}
return SeataMQProducerFactory.getTccRocketMQ().prepare(msg, timeout);
return tccRocketMQ.prepare(msg, timeout);
} else {
return super.send(msg, timeout);
}
Expand Down Expand Up @@ -140,4 +142,8 @@ public SendResult doSendMessageInTransaction(final Message msg, long timeout, St
public TransactionListener getTransactionListener() {
return transactionListener;
}

public void setTccRocketMQ(TCCRocketMQ tccRocketMQ){
this.tccRocketMQ = tccRocketMQ;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,21 @@
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.seata.common.exception.NotSupportYetException;
import org.apache.seata.core.model.BranchType;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.apache.seata.integration.tx.api.util.ProxyUtil;

/**
* SeataMQProducer Factory
**/
public class SeataMQProducerFactory implements ApplicationContextAware, InitializingBean {
public class SeataMQProducerFactory{

public static final String ROCKET_TCC_NAME = "tccRocketMQ";
public static final BranchType ROCKET_BRANCH_TYPE = BranchType.TCC;
private static TCCRocketMQ tccRocketMQ;

/**
* Default Producer, it can be replaced to Map after multi-resource is supported
*/
private static SeataMQProducer defaultProducer;
private ApplicationContext applicationContext;

@Override
public void afterPropertiesSet() throws Exception {
tccRocketMQ = (TCCRocketMQ) applicationContext.getBean(ROCKET_TCC_NAME);
tccRocketMQ.setProducer(defaultProducer);
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

public static SeataMQProducer createSingle(String nameServer, String producerGroup) throws MQClientException {
return createSingle(nameServer, null, producerGroup, null);
Expand All @@ -63,9 +48,9 @@ public static SeataMQProducer createSingle(String nameServer, String namespace,
if (defaultProducer == null) {
defaultProducer = new SeataMQProducer(namespace, groupName, rpcHook);
defaultProducer.setNamesrvAddr(nameServer);
if (tccRocketMQ != null) {
tccRocketMQ.setProducer(defaultProducer);
}
TCCRocketMQ tccRocketMQProxy = ProxyUtil.createProxy(new TCCRocketMQImpl());
tccRocketMQProxy.setProducer(defaultProducer);
defaultProducer.setTccRocketMQ(tccRocketMQProxy);
defaultProducer.start();
}
}
Expand All @@ -83,7 +68,4 @@ public static SeataMQProducer getProducer() {
return defaultProducer;
}

public static TCCRocketMQ getTccRocketMQ() {
return tccRocketMQ;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
/**
* The interface Tcc rocket mq.
*/
@LocalTCC
public interface TCCRocketMQ {


Expand All @@ -51,7 +50,6 @@ public interface TCCRocketMQ {
* @param timeout the timeout
* @return SendResult
*/
@TwoPhaseBusinessAction(name = SeataMQProducerFactory.ROCKET_TCC_NAME)
SendResult prepare(Message message, long timeout) throws MQClientException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.seata.rm.tcc.api.LocalTCC;
import org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +38,7 @@
/**
* the type TCCRocketMQImpl
*/
@LocalTCC
public class TCCRocketMQImpl implements TCCRocketMQ {
private static final Logger LOGGER = LoggerFactory.getLogger(TCCRocketMQImpl.class);

Expand All @@ -50,6 +53,7 @@ public void setProducer(SeataMQProducer producer) {
}

@Override
@TwoPhaseBusinessAction(name = SeataMQProducerFactory.ROCKET_TCC_NAME)
public SendResult prepare(Message message, long timeout) throws MQClientException {
BusinessActionContext context = BusinessActionContextUtil.getContext();
LOGGER.info("RocketMQ message send prepare, xid = {}", context.getXid());
Expand Down

This file was deleted.

0 comments on commit b8cc88f

Please sign in to comment.