Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add topic message type #179

Merged
merged 1 commit into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class TopicConfigInfo {
private int perm;
private boolean order;

private String messageType;
public List<String> getClusterNameList() {
return clusterNameList;
}
Expand Down Expand Up @@ -91,6 +92,18 @@ public void setOrder(boolean order) {
this.order = order;
}


public String getMessageType() {
return messageType;
}

public void setMessageType(String messageType) {
this.messageType = messageType;
}




@Override
public boolean equals(Object o) {
if (this == o)
Expand All @@ -102,12 +115,13 @@ public boolean equals(Object o) {
readQueueNums == that.readQueueNums &&
perm == that.perm &&
order == that.order &&
Objects.equal(topicName, that.topicName);
Objects.equal(topicName, that.topicName) &&
Objects.equal(messageType, that.messageType);
}

@Override
public int hashCode() {
return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order);
return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order,messageType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.dashboard.service.impl;

import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
Expand All @@ -30,8 +31,10 @@
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

@Service
public class ClusterServiceImpl implements ClusterService {
Expand All @@ -56,6 +59,9 @@ public Map<String, Object> list() {
}
resultMap.put("clusterInfo", clusterInfo);
resultMap.put("brokerServer", brokerServer);
// add messageType
resultMap.put("messageTypes", Arrays.stream(TopicMessageType.values()).sorted()
.collect(Collectors.toMap(TopicMessageType::getValue, messageType ->String.format("MESSAGE_TYPE_%s",messageType.getValue()))));
return resultMap;
}
catch (Exception err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,37 @@
package org.apache.rocketmq.dashboard.service.impl;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.TopicService;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.joor.Reflect;
import org.springframework.beans.BeanUtils;
Expand All @@ -55,6 +60,11 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE;

@Service
public class TopicServiceImpl extends AbstractCommonService implements TopicService {
Expand All @@ -68,18 +78,18 @@ public TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndD
TopicList allTopics = mqAdminExt.fetchAllTopicList();
TopicList sysTopics = getSystemTopicList();
Set<String> topics =
allTopics.getTopicList().stream().map(topic -> {
if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) {
topic = String.format("%s%s", "%SYS%", topic);
}
return topic;
}).filter(topic -> {
if (skipRetryAndDlq) {
return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
|| topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX));
}
return true;
}).collect(Collectors.toSet());
allTopics.getTopicList().stream().map(topic -> {
if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) {
topic = String.format("%s%s", "%SYS%", topic);
}
return topic;
}).filter(topic -> {
if (skipRetryAndDlq) {
return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
|| topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX));
}
return true;
}).collect(Collectors.toSet());
allTopics.getTopicList().clear();
allTopics.getTopicList().addAll(topics);
return allTopics;
Expand Down Expand Up @@ -123,10 +133,15 @@ public GroupList queryTopicConsumerInfo(String topic) {
public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
TopicConfig topicConfig = new TopicConfig();
BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
String messageType = topicCreateOrUpdateRequest.getMessageType();
if (StringUtils.isBlank(messageType)) {
messageType = TopicMessageType.NORMAL.name();
}
topicConfig.setAttributes(ImmutableMap.of("+".concat(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName()), messageType));
try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) {
topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) {
mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig);
}
} catch (Exception err) {
Expand Down Expand Up @@ -156,6 +171,11 @@ public List<TopicConfigInfo> examineTopicConfig(String topic) {
TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName());
BeanUtils.copyProperties(topicConfig, topicConfigInfo);
topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName()));
String messageType = topicConfig.getAttributes().get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName());
if (StringUtils.isBlank(messageType)) {
messageType = TopicMessageType.UNSPECIFIED.name();
}
topicConfigInfo.setMessageType(messageType);
topicConfigInfoList.add(topicConfigInfo);
}
return topicConfigInfoList;
Expand Down Expand Up @@ -226,6 +246,12 @@ public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rp
return defaultMQProducer;
}

public TransactionMQProducer buildTransactionMQProducer(String producerGroup, RPCHook rpcHook, boolean traceEnabled) {
TransactionMQProducer defaultMQProducer = new TransactionMQProducer(null, producerGroup, rpcHook, traceEnabled, TopicValidator.RMQ_SYS_TRACE_TOPIC);
defaultMQProducer.setUseTLS(configure.isUseTLS());
return defaultMQProducer;
}

private TopicList getSystemTopicList() {
RPCHook rpcHook = null;
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
Expand All @@ -249,32 +275,61 @@ private TopicList getSystemTopicList() {

@Override
public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) {
DefaultMQProducer producer = null;
List<TopicConfigInfo> topicConfigInfos = examineTopicConfig(sendTopicMessageRequest.getTopic());
String messageType = topicConfigInfos.get(0).getMessageType();
AclClientRPCHook rpcHook = null;
if (configure.isACLEnabled()) {
rpcHook = new AclClientRPCHook(new SessionCredentials(
configure.getAccessKey(),
configure.getSecretKey()
configure.getAccessKey(),
configure.getSecretKey()
));
}
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled());
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(configure.getNamesrvAddr());
try {
producer.start();
Message msg = new Message(sendTopicMessageRequest.getTopic(),
sendTopicMessageRequest.getTag(),
sendTopicMessageRequest.getKey(),
sendTopicMessageRequest.getMessageBody().getBytes()
);
return producer.send(msg);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled());
producer.shutdown();
if (TopicMessageType.TRANSACTION.getValue().equals(messageType)) {
// transaction message
TransactionListener transactionListener = new TransactionListenerImpl();

TransactionMQProducer producer = buildTransactionMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled());
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(configure.getNamesrvAddr());
producer.setTransactionListener(transactionListener);
try {
producer.start();
Message msg = new Message(sendTopicMessageRequest.getTopic(),
sendTopicMessageRequest.getTag(),
sendTopicMessageRequest.getKey(),
sendTopicMessageRequest.getMessageBody().getBytes()
);
return producer.sendMessageInTransaction(msg, null);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled());
producer.shutdown();
}
} else {
// no transaction message
DefaultMQProducer producer = null;
producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled());
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(configure.getNamesrvAddr());
try {
producer.start();
Message msg = new Message(sendTopicMessageRequest.getTopic(),
sendTopicMessageRequest.getTag(),
sendTopicMessageRequest.getKey(),
sendTopicMessageRequest.getMessageBody().getBytes()
);
return producer.send(msg);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled());
producer.shutdown();
}
}

}

private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) {
Expand All @@ -296,4 +351,20 @@ private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnable
} catch (Exception ignore) {
}
}

static class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);

private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.COMMIT_MESSAGE;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
8 changes: 5 additions & 3 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ rocketmq:
# configure multiple namesrv addresses to manage multiple different clusters
namesrvAddrs:
- 127.0.0.1:9876
- 127.0.0.2:9876
# - 127.0.0.2:9876
# - 10.151.47.32:9876;10.151.47.33:9876;10.151.47.34:9876
# - 10.151.47.30:9876
# if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
isVIPChannel:
# timeout for mqadminExt, default 5000ms
Expand All @@ -58,8 +60,8 @@ rocketmq:
loginRequired: false
useTLS: false
# set the accessKey and secretKey if you used acl
accessKey: # if version > 4.4.0
secretKey: # if version > 4.4.0
# accessKey: rocketmq2
# secretKey: 12345678

threadpool:
config:
Expand Down
8 changes: 7 additions & 1 deletion src/main/resources/static/src/i18n/en.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,11 @@ var en = {
"GROUP_PERM":"Group Permission",
"SYNCHRONIZE":"Synchronize Data",
"SHOW":"Show",
"HIDE":"Hide"
"HIDE":"Hide",
"MESSAGE_TYPE":"messageType",
"MESSAGE_TYPE_UNSPECIFIED": "UNSPECIFIED, is NORMAL",
"MESSAGE_TYPE_NORMAL": "NORMAL",
"MESSAGE_TYPE_FIFO": "FIFO",
"MESSAGE_TYPE_DELAY": "DELAY",
"MESSAGE_TYPE_TRANSACTION": "TRANSACTION",
}
8 changes: 7 additions & 1 deletion src/main/resources/static/src/i18n/zh.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,11 @@ var zh = {
"GROUP_PERM":"消费组权限",
"SYNCHRONIZE":"同步",
"SHOW":"显示",
"HIDE":"隐藏"
"HIDE":"隐藏",
"MESSAGE_TYPE":"消息类型",
"MESSAGE_TYPE_UNSPECIFIED": "未指定,为普通消息",
"MESSAGE_TYPE_NORMAL": "普通消息",
"MESSAGE_TYPE_FIFO": "顺序消息",
"MESSAGE_TYPE_DELAY": "定时/延时消息",
"MESSAGE_TYPE_TRANSACTION": "事务消息",
}
5 changes: 3 additions & 2 deletions src/main/resources/static/src/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
var bIsUpdate = true;
if (request == null) {
request = [{
writeQueueNums: 16,
readQueueNums: 16,
writeQueueNums: 8,
readQueueNums: 8,
perm: 6,
order: false,
topicName: "",
Expand All @@ -355,6 +355,7 @@ module.controller('topicController', ['$scope', 'ngDialog', '$http', 'Notificati
topicRequestList: request,
allClusterNameList: Object.keys(resp.data.clusterInfo.clusterAddrTable),
allBrokerNameList: Object.keys(resp.data.brokerServer),
allMessageTypeList: resp.data.messageTypes,
bIsUpdate: bIsUpdate,
writeOperationEnabled: $scope.writeOperationEnabled
}
Expand Down
13 changes: 13 additions & 0 deletions src/main/resources/static/view/pages/topic.html
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<button class="btn btn-raised btn-sm btn-primary" type="button"
ng-click="openUpdateDialog(topic, sysFlag)">topic {{'CONFIG' |translate}}
</button>
<!-- todo 发送消息,根据消息类型判断-->
<button class="btn btn-raised btn-sm btn-primary" type="button"
ng-show="{{!sysFlag}}"
ng-click="openSendTopicMessageDialog(topic)">{{'SEND_MSG' | translate}}
Expand Down Expand Up @@ -189,6 +190,18 @@ <h4 class="modal-title">{{'TOPIC_CHANGE'| translate }}</h4>
<span class="text-danger" ng-show="addAppForm.topicName.$error.required">{{'TOPIC_NAME'|translate}}不能为空.</span>
</div>
</div>
<!-- 设置topic 类型 -->
<div class="form-group">
<label class="control-label col-sm-2">{{'MESSAGE_TYPE'|translate}}:</label>
<div class="col-sm-10">
<select name="mySelectMessageType" chosen ng-disabled="ngDialogData.bIsUpdate"
ng-model="item.messageType"
ng-options="messageType as value | translate disable when messageType=='UNSPECIFIED' for (messageType , value) in ngDialogData.allMessageTypeList"
>
<option value=""></option>
</select>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-2">{{'WRITE_QUEUE_NUMS'|translate}}:</label>
<div class="col-sm-10">
Expand Down