Skip to content

Commit

Permalink
to many getXX
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue committed Feb 26, 2024
1 parent e5a4790 commit 4faffc7
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {

@Override
public SendResult send(Message msg) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
return send(msg, this.defaultMQProducerImpl.getDefaultMQProducer().getSendMsgTimeout());
return send(msg, this.getSendMsgTimeout());
}

@Override
Expand All @@ -112,11 +112,11 @@ public SendResult doSendMessageInTransaction(final Message msg, long timeout, St
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}

Validators.checkMessage(msg, this.defaultMQProducerImpl.getDefaultMQProducer());
Validators.checkMessage(msg, this);

SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducerImpl.getDefaultMQProducer().getProducerGroup());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.getProducerGroup());
MessageAccessor.putProperty(msg, PROPERTY_SEATA_XID, xid);
MessageAccessor.putProperty(msg, PROPERTY_SEATA_BRANCHID, String.valueOf(branchId));
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.seata.integration.rocketmq;

import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.rm.tcc.api.BusinessActionContext;
import org.apache.seata.rm.tcc.api.BusinessActionContextUtil;
Expand Down Expand Up @@ -46,10 +47,12 @@ public class TCCRocketMQImpl implements TCCRocketMQ {
private static final String ROCKET_SEND_RESULT_KEY = "ROCKET_SEND_RESULT";

private SeataMQProducer producer;
private DefaultMQProducerImpl producerImpl;

@Override
public void setProducer(SeataMQProducer producer) {
this.producer = producer;
this.producerImpl = producer.getDefaultMQProducerImpl();
}

@Override
Expand All @@ -73,7 +76,7 @@ public boolean commit(BusinessActionContext context)
if (message == null || sendResult == null) {
throw new TransactionException("TCCRocketMQ commit but cannot find message and sendResult");
}
producer.getDefaultMQProducerImpl().endTransaction(message, sendResult, LocalTransactionState.COMMIT_MESSAGE, null);
this.producerImpl.endTransaction(message, sendResult, LocalTransactionState.COMMIT_MESSAGE, null);
LOGGER.info("RocketMQ message send commit, xid = {}, branchId = {}", context.getXid(), context.getBranchId());
return true;
}
Expand All @@ -86,7 +89,7 @@ public boolean rollback(BusinessActionContext context)
if (message == null || sendResult == null) {
LOGGER.error("TCCRocketMQ rollback but cannot find message and sendResult");
}
producer.getDefaultMQProducerImpl().endTransaction(message, sendResult, LocalTransactionState.ROLLBACK_MESSAGE, null);
this.producerImpl.endTransaction(message, sendResult, LocalTransactionState.ROLLBACK_MESSAGE, null);
LOGGER.info("RocketMQ message send rollback, xid = {}, branchId = {}", context.getXid(), context.getBranchId());
return true;
}
Expand Down

0 comments on commit 4faffc7

Please sign in to comment.