Skip to content

Commit

Permalink
fix configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue committed Feb 7, 2024
1 parent 1580478 commit f1f37c0
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public SendResult doSendMessageInTransaction(final Message msg, long timeout, St
MessageAccessor.putProperty(msg, PROPERTY_SEATA_XID, xid);
MessageAccessor.putProperty(msg, PROPERTY_SEATA_BRANCHID, String.valueOf(branchId));
try {
sendResult = this.send(msg, timeout);
sendResult = super.send(msg, timeout);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
Expand All @@ -141,18 +141,6 @@ public SendResult doSendMessageInTransaction(final Message msg, long timeout, St
}


// public static SeataMQProducer create(String groupName, String ak, String sk, boolean isEnableMsgTrace, String customizedTraceTopic) {
// boolean isEnableAcl = !StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk);
// if (isEnableAcl) {
// LOGGER.warn("ACL is not supported yet in SeataMQProducer");
// }
// if (isEnableMsgTrace) {
// LOGGER.warn("MessageTrace is not supported yet in SeataMQProducer");
// }
// return new SeataMQProducer(groupName);
// }


@Override
public TransactionListener getTransactionListener() {
return transactionListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,69 @@
*/
package org.apache.seata.integration.rocketmq;

import org.apache.commons.lang.ObjectUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.seata.common.exception.NotSupportYetException;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

/**
* SeataMQProducer Factory
**/
public class SeataMQProducerFactory {
public class SeataMQProducerFactory implements ApplicationContextAware, InitializingBean {

public static final String ROCKET_TCC_NAME = "tccRocketMQ";
private static TCCRocketMQ tccRocketMQ;

private static Map<String, SeataMQProducer> PRODUCER_MAP = new ConcurrentHashMap<>();
private volatile static String SINGLE_PRODUCER_ID;
/**
* Default Producer, it can be replaced to Map after multi-resource is supported
*/
private static SeataMQProducer defaultProducer;
private ApplicationContext applicationContext;

@Override
public void afterPropertiesSet() throws Exception {
tccRocketMQ = (TCCRocketMQ) applicationContext.getBean(ROCKET_TCC_NAME);
tccRocketMQ.setProducer(defaultProducer);
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

public static SeataMQProducer create(String producerId, String nameServer, String producerGroup) {
return create(producerId, nameServer, null, producerGroup, null);
public static SeataMQProducer createSingle(String nameServer, String producerGroup) throws MQClientException {
return createSingle(nameServer, null, producerGroup, null);
}

public static SeataMQProducer create(String producerId, String nameServer, String namespace,
String groupName, RPCHook rpcHook) {
if (SINGLE_PRODUCER_ID == null) {
public static SeataMQProducer createSingle(String nameServer, String namespace,
String groupName, RPCHook rpcHook) throws MQClientException {
if (defaultProducer == null) {
synchronized (SeataMQProducerFactory.class) {
if (SINGLE_PRODUCER_ID == null) {
SINGLE_PRODUCER_ID = producerId;
SeataMQProducer producer = new SeataMQProducer(namespace, groupName, rpcHook);
producer.setNamesrvAddr(nameServer);
tccRocketMQ.setProducer(producer);
PRODUCER_MAP.put(producerId, producer);
if (defaultProducer == null) {
defaultProducer = new SeataMQProducer(namespace, groupName, rpcHook);
defaultProducer.setNamesrvAddr(nameServer);
if (tccRocketMQ != null) {
tccRocketMQ.setProducer(defaultProducer);
}
defaultProducer.start();
}
}
}

if (!SINGLE_PRODUCER_ID.equals(producerId)) {
throw new NotSupportYetException("only one producer is allowed");
if (!ObjectUtils.equals(nameServer, defaultProducer.getNamesrvAddr())
|| !ObjectUtils.equals(namespace, defaultProducer.getNamespace())
|| !ObjectUtils.equals(groupName, defaultProducer.getProducerGroup())
) {
throw new NotSupportYetException("only one seata producer is permitted");
}
return getProducer();
return defaultProducer;
}

public static SeataMQProducer getProducer() {
return PRODUCER_MAP.get(SINGLE_PRODUCER_ID);
}

public static void setTccRocketMQ(TCCRocketMQ tccRocket) {
tccRocketMQ = tccRocket;
return defaultProducer;
}

public static TCCRocketMQ getTccRocketMQ() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public interface TCCRocketMQ {
* @param timeout the timeout
* @return SendResult
*/
@TwoPhaseBusinessAction(name = "tccRocketMQ")
@TwoPhaseBusinessAction(name = SeataMQProducerFactory.ROCKET_TCC_NAME)
SendResult prepare(Message message, long timeout) throws MQClientException;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.integration.rocketmq;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;

/**
* the type rocket mq tcc definition registry
*/
public class TccRocketDefinitionRegistry implements BeanDefinitionRegistryPostProcessor {

@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
BeanDefinition beanDefinition = BeanDefinitionBuilder
.genericBeanDefinition(TCCRocketMQImpl.class)
.getBeanDefinition();

registry.registerBeanDefinition(SeataMQProducerFactory.ROCKET_TCC_NAME, beanDefinition);
}

@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,33 @@
package org.apache.seata.spring.boot.autoconfigure;

import org.apache.seata.integration.rocketmq.SeataMQProducerFactory;
import org.apache.seata.integration.rocketmq.TCCRocketMQ;
import org.apache.seata.integration.rocketmq.TCCRocketMQImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.apache.seata.integration.rocketmq.TccRocketDefinitionRegistry;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
* SeataRocketMQAutoConfiguration
*/
@ConditionalOnClass(name = "org.apache.rocketmq.client.producer.DefaultMQProducer")
@ConditionalOnExpression("${seata.enabled:true} && ${seata.rocketmq-enabled:false}")
@ConditionalOnExpression("${seata.enabled:true}")
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(SeataAutoConfiguration.class)
public class SeataRocketMQAutoConfiguration {

@Autowired
TCCRocketMQ tccRocketMQ;

// @Bean(name = SeataMQProducerFactory.ROCKET_TCC_NAME)
// public TCCRocketMQ tccRocketMQ() {
// return new TCCRocketMQImpl();
// }
@Bean
@ConditionalOnMissingBean
public TCCRocketMQ tccRocketMQ() {
return new TCCRocketMQImpl();
public TccRocketDefinitionRegistry tccRocketDefinitionRegistry() {
return new TccRocketDefinitionRegistry();
}

@PostConstruct
public void init() {
SeataMQProducerFactory.setTccRocketMQ(tccRocketMQ);
@Bean
public SeataMQProducerFactory seataMQProducerFactory() {
return new SeataMQProducerFactory();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\
org.apache.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\
org.apache.seata.spring.boot.autoconfigure.SeataHttpAutoConfiguration,\
org.apache.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration
org.apache.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration,\
org.apache.seata.spring.boot.autoconfigure.SeataRocketMQAutoConfiguration

0 comments on commit f1f37c0

Please sign in to comment.