diff --git a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java index 991a2d8e..5ce21ff2 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java +++ b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java @@ -43,6 +43,8 @@ public class RMQConfigure { //use rocketmq.namesrv.addr first,if it is empty,than use system proerty or system env private volatile String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); + private volatile String proxyAddr; + private volatile String isVIPChannel = System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"); @@ -62,6 +64,8 @@ public class RMQConfigure { private List namesrvAddrs = new ArrayList<>(); + private List proxyAddrs = new ArrayList<>(); + public String getAccessKey() { return accessKey; } @@ -86,6 +90,25 @@ public List getNamesrvAddrs() { return namesrvAddrs; } + public List getProxyAddrs() { + return this.proxyAddrs; + } + + public void setProxyAddrs(List proxyAddrs) { + this.proxyAddrs = proxyAddrs; + if (CollectionUtils.isNotEmpty(proxyAddrs)) { + this.setProxyAddr(proxyAddrs.get(0)); + } + } + + public String getProxyAddr() { + return proxyAddr; + } + + public void setProxyAddr(String proxyAddr) { + this.proxyAddr = proxyAddr; + } + public void setNamesrvAddrs(List namesrvAddrs) { this.namesrvAddrs = namesrvAddrs; if (CollectionUtils.isNotEmpty(namesrvAddrs)) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java index d9f22e4a..96fc0569 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java @@ -47,14 +47,14 @@ public class ConsumerController { @RequestMapping(value = "/groupList.query") @ResponseBody - public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup) { - return consumerService.queryGroupList(skipSysGroup); + public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup, String address) { + return consumerService.queryGroupList(skipSysGroup, address); } @RequestMapping(value = "/group.query") @ResponseBody - public Object groupQuery(@RequestParam String consumerGroup) { - return consumerService.queryGroup(consumerGroup); + public Object groupQuery(@RequestParam String consumerGroup, String address) { + return consumerService.queryGroup(consumerGroup, address); } @RequestMapping(value = "/resetOffset.do", method = {RequestMethod.POST}) @@ -99,14 +99,14 @@ public Object fetchBrokerNameList(@RequestParam String consumerGroup) { @RequestMapping(value = "/queryTopicByConsumer.query") @ResponseBody - public Object queryConsumerByTopic(@RequestParam String consumerGroup) { - return consumerService.queryConsumeStatsListByGroupName(consumerGroup); + public Object queryConsumerByTopic(@RequestParam String consumerGroup, String address) { + return consumerService.queryConsumeStatsListByGroupName(consumerGroup, address); } @RequestMapping(value = "/consumerConnection.query") @ResponseBody - public Object consumerConnection(@RequestParam(required = false) String consumerGroup) { - ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup); + public Object consumerConnection(@RequestParam(required = false) String consumerGroup, String address) { + ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup, address); consumerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(consumerConnection.getConnectionSet())); return consumerConnection; } diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java new file mode 100644 index 00000000..27aa59db --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.controller; + +import org.apache.rocketmq.dashboard.permisssion.Permission; +import org.apache.rocketmq.dashboard.service.ProxyService; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; + +import javax.annotation.Resource; + +@Controller +@RequestMapping("/proxy") +@Permission +public class ProxyController { + @Resource + private ProxyService proxyService; + @RequestMapping(value = "/homePage.query", method = RequestMethod.GET) + @ResponseBody + public Object homePage() { + return proxyService.getProxyHomePage(); + } + + @RequestMapping(value = "/addProxyAddr.do", method = RequestMethod.POST) + @ResponseBody + public Object addProxyAddr(@RequestParam String newProxyAddr) { + proxyService.addProxyAddrList(newProxyAddr); + return true; + } + + @RequestMapping(value = "/updateProxyAddr.do", method = RequestMethod.POST) + @ResponseBody + public Object updateProxyAddr(@RequestParam String proxyAddr) { + proxyService.updateProxyAddrList(proxyAddr); + return true; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java index 0d19af9e..db11c418 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java @@ -19,12 +19,15 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import java.util.List; + public class GroupConsumeInfo implements Comparable { private String group; private String version; private int count; private ConsumeType consumeType; private MessageModel messageModel; + private List address; private int consumeTps; private long diffTotal = -1; private String subGroupType = "NORMAL"; @@ -70,6 +73,22 @@ public void setDiffTotal(long diffTotal) { this.diffTotal = diffTotal; } + public List getAddress() { + return address; + } + + public void setAddress(List address) { + this.address = address; + } + + public String getSubGroupType() { + return subGroupType; + } + + public void setSubGroupType(String subGroupType) { + this.subGroupType = subGroupType; + } + @Override public int compareTo(GroupConsumeInfo o) { if (this.count != o.count) { @@ -93,12 +112,4 @@ public String getVersion() { public void setVersion(String version) { this.version = version; } - - public String getSubGroupType() { - return subGroupType; - } - - public void setSubGroupType(String subGroupType) { - this.subGroupType = subGroupType; - } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java index c475931f..e284c442 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java @@ -31,12 +31,12 @@ import java.util.Set; public interface ConsumerService { - List queryGroupList(boolean skipSysGroup); + List queryGroupList(boolean skipSysGroup,String address); - GroupConsumeInfo queryGroup(String consumerGroup); + GroupConsumeInfo queryGroup(String consumerGroup, String address); - List queryConsumeStatsListByGroupName(String groupName); + List queryConsumeStatsListByGroupName(String groupName, String address); List queryConsumeStatsList(String topic, String groupName); @@ -52,7 +52,7 @@ public interface ConsumerService { Set fetchBrokerNameSetBySubscriptionGroup(String group); - ConsumerConnection getConsumerConnection(String consumerGroup); + ConsumerConnection getConsumerConnection(String consumerGroup, String address); ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java new file mode 100644 index 00000000..2a64680f --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service; + +import java.util.Map; + +public interface ProxyService { + + void addProxyAddrList(String proxyAddr); + + void updateProxyAddrList(String proxyAddr); + + Map getProxyHomePage(); +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java index 360c0e31..0281c5cb 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java @@ -627,7 +627,7 @@ public ConsumeStats examineConsumeStats(String brokerAddr, String consumerGroup, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'examineConsumeStats'"); + return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(brokerAddr, consumerGroup, topicName, timeoutMillis); } @Override @@ -639,8 +639,7 @@ public AdminToolResult examineConsumeStatsConcurrent(String consum @Override public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'examineConsumerConnectionInfo'"); + return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup, brokerAddr); } @Override diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java new file mode 100644 index 00000000..4344c7ca --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service.client; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; + +public interface ProxyAdmin { + + ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException; +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java new file mode 100644 index 00000000..eadae12f --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service.client; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONSUMER_CONNECTION_LIST; + +@Slf4j +@Service +public class ProxyAdminImpl implements ProxyAdmin { + @Autowired + private GenericObjectPool mqAdminExtPool; + + @Override + public ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + try { + MQAdminInstance.createMQAdmin(mqAdminExtPool); + RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); + GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + RemotingCommand request = RemotingCommand.createRequestCommand(GET_CONSUMER_CONNECTION_LIST, requestHeader); + RemotingCommand response = remotingClient.invokeSync(addr, request, 3000); + switch (response.getCode()) { + case 0: + return ConsumerConnection.decode(response.getBody(), ConsumerConnection.class); + default: + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); + } + } finally { + MQAdminInstance.returnMQAdmin(mqAdminExtPool); + } + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java index a1cf9ff6..9bc37ab8 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java @@ -23,8 +23,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -44,6 +46,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.dashboard.service.client.ProxyAdmin; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; import org.apache.rocketmq.common.message.MessageQueue; @@ -77,6 +80,8 @@ public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean { private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); + @Resource + protected ProxyAdmin proxyAdmin; @Resource private RMQConfigure configure; @@ -119,25 +124,33 @@ public void destroy() { } @Override - public List queryGroupList(boolean skipSysGroup) { - Set consumerGroupSet = Sets.newHashSet(); + public List queryGroupList(boolean skipSysGroup, String address) { + HashMap> consumerGroupMap = Maps.newHashMap(); try { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); - consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()); + for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) { + if (!consumerGroupMap.containsKey(groupName)) { + consumerGroupMap.putIfAbsent(groupName, new ArrayList<>()); + } + List addresses = consumerGroupMap.get(groupName); + addresses.add(brokerData.selectBrokerAddr()); + consumerGroupMap.put(groupName, addresses); + } } - } - catch (Exception err) { + } catch (Exception err) { Throwables.throwIfUnchecked(err); throw new RuntimeException(err); } List groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList()); - CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size()); - for (String consumerGroup : consumerGroupSet) { + CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size()); + for (Map.Entry> entry : consumerGroupMap.entrySet()) { + String consumerGroup = entry.getKey(); executorService.submit(() -> { try { - GroupConsumeInfo consumeInfo = queryGroup(consumerGroup); + GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, address); + consumeInfo.setAddress(entry.getValue()); groupConsumeInfoList.add(consumeInfo); } catch (Exception e) { logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e); @@ -165,7 +178,7 @@ public List queryGroupList(boolean skipSysGroup) { } @Override - public GroupConsumeInfo queryGroup(String consumerGroup) { + public GroupConsumeInfo queryGroup(String consumerGroup, String address) { GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo(); try { ConsumeStats consumeStats = null; @@ -182,9 +195,12 @@ public GroupConsumeInfo queryGroup(String consumerGroup) { .allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly); try { - consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); - } - catch (Exception e) { + if (StringUtils.isNotEmpty(address)) { + consumerConnection = proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup); + } else { + consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); + } + } catch (Exception e) { logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage()); } @@ -217,8 +233,18 @@ public GroupConsumeInfo queryGroup(String consumerGroup) { } @Override - public List queryConsumeStatsListByGroupName(String groupName) { - return queryConsumeStatsList(null, groupName); + public List queryConsumeStatsListByGroupName(String groupName, String address) { + ConsumeStats consumeStats; + String topic = null; + try { + String[] addresses = address.split(","); + String addr = addresses[0]; + consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + return toTopicConsumerInfoList(topic, consumeStats, groupName); } @Override @@ -231,6 +257,10 @@ public List queryConsumeStatsList(final String topic, String Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } + return toTopicConsumerInfoList(topic, consumeStats, groupName); + } + + private List toTopicConsumerInfoList(String topic, ConsumeStats consumeStats, String groupName) { List mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate() { @Override public boolean apply(MessageQueue o) { @@ -431,11 +461,12 @@ public Set fetchBrokerNameSetBySubscriptionGroup(String group) { } @Override - public ConsumerConnection getConsumerConnection(String consumerGroup) { + public ConsumerConnection getConsumerConnection(String consumerGroup, String address) { try { - return mqAdminExt.examineConsumerConnectionInfo(consumerGroup); - } - catch (Exception e) { + String[] addresses = address.split(","); + String addr = addresses[0]; + return mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr); + } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java new file mode 100644 index 00000000..07e63b34 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service.impl; + +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.dashboard.config.RMQConfigure; +import org.apache.rocketmq.dashboard.service.ProxyService; +import org.apache.rocketmq.dashboard.service.client.ProxyAdmin; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; + +@Slf4j +@Service +public class ProxyServiceImpl implements ProxyService { + @Resource + protected ProxyAdmin proxyAdmin; + @Resource + private RMQConfigure configure; + + @Override + public void addProxyAddrList(String proxyAddr) { + List proxyAddrs = configure.getProxyAddrs(); + if (proxyAddrs != null && !proxyAddrs.contains(proxyAddr)) { + proxyAddrs.add(proxyAddr); + } + configure.setProxyAddrs(proxyAddrs); + } + + @Override + public void updateProxyAddrList(String proxyAddr) { + configure.setProxyAddr(proxyAddr); + } + + @Override + public Map getProxyHomePage() { + Map homePageInfoMap = Maps.newHashMap(); + homePageInfoMap.put("currentProxyAddr", configure.getProxyAddr()); + homePageInfoMap.put("proxyAddrList", configure.getProxyAddrs()); + return homePageInfoMap; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java b/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java index 710929b0..3c8a77ee 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java +++ b/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java @@ -40,7 +40,7 @@ public class MonitorTask { // @Scheduled(cron = "* * * * * ?") public void scanProblemConsumeGroup() { for (Map.Entry configEntry : monitorService.queryConsumerMonitorConfig().entrySet()) { - GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey()); + GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey(), null); if (consumeInfo.getCount() < configEntry.getValue().getMinCount() || consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) { logger.info("op=look consumeInfo {}", JsonUtil.obj2String(consumeInfo)); // notify the alert system } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 090e4216..fe4d283f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -59,6 +59,9 @@ rocketmq: # must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required loginRequired: false useTLS: false + proxyAddr: 127.0.0.1:8080 + proxyAddrs: + - 127.0.0.1:8080 # set the accessKey and secretKey if you used acl # accessKey: rocketmq2 # secretKey: 12345678 diff --git a/src/main/resources/static/index.html b/src/main/resources/static/index.html index c2bf349c..ee3c3fe7 100644 --- a/src/main/resources/static/index.html +++ b/src/main/resources/static/index.html @@ -104,6 +104,7 @@ + diff --git a/src/main/resources/static/src/app.js b/src/main/resources/static/src/app.js index a7ca1bef..1bbb6509 100644 --- a/src/main/resources/static/src/app.js +++ b/src/main/resources/static/src/app.js @@ -213,6 +213,9 @@ app.config(['$routeProvider', '$httpProvider','$cookiesProvider','getDictNamePro }).when('/ops', { templateUrl: 'view/pages/ops.html', controller:'opsController' + }).when('/proxy', { + templateUrl: 'view/pages/proxy.html', + controller:'proxyController' }).when('/acl', { templateUrl: 'view/pages/acl.html', controller: 'aclController' diff --git a/src/main/resources/static/src/consumer.js b/src/main/resources/static/src/consumer.js index 8c0833eb..d192334f 100644 --- a/src/main/resources/static/src/consumer.js +++ b/src/main/resources/static/src/consumer.js @@ -79,6 +79,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific url: "consumer/groupList.query", params: { skipSysGroup: false, + address: localStorage.getItem('isV5') ? localStorage.getItem('proxyAddr') : null } }).success(function (resp) { if (resp.status == 0) { @@ -243,11 +244,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific } }); }; - $scope.detail = function (consumerGroupName) { + $scope.detail = function (consumerGroupName, address) { $http({ method: "GET", url: "consumer/queryTopicByConsumer.query", - params: {consumerGroup: consumerGroupName} + params: {consumerGroup: consumerGroupName, address: address} }).success(function (resp) { if (resp.status == 0) { console.log(resp); @@ -262,11 +263,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific }); }; - $scope.client = function (consumerGroupName) { + $scope.client = function (consumerGroupName, address) { $http({ method: "GET", url: "consumer/consumerConnection.query", - params: {consumerGroup: consumerGroupName} + params: {consumerGroup: consumerGroupName, address: address} }).success(function (resp) { if (resp.status == 0) { console.log(resp); diff --git a/src/main/resources/static/src/i18n/en.js b/src/main/resources/static/src/i18n/en.js index 6bc16cd3..83083d71 100644 --- a/src/main/resources/static/src/i18n/en.js +++ b/src/main/resources/static/src/i18n/en.js @@ -100,6 +100,7 @@ var en = { "RESET_OFFSET":"resetOffset", "CLUSTER_NAME":"clusterName", "OPS":"OPS", + "PROXY":"Proxy", "AUTO_REFRESH":"AUTO_REFRESH", "REFRESH":"REFRESH", "LOGOUT":"Logout", diff --git a/src/main/resources/static/src/i18n/zh.js b/src/main/resources/static/src/i18n/zh.js index f71ae346..f8c3c1d3 100644 --- a/src/main/resources/static/src/i18n/zh.js +++ b/src/main/resources/static/src/i18n/zh.js @@ -101,6 +101,7 @@ var zh = { "RESET_OFFSET":"重置位点", "CLUSTER_NAME":"集群名", "OPS":"运维", + "PROXY":"代理", "AUTO_REFRESH":"自动刷新", "REFRESH":"刷新", "LOGOUT":"退出", diff --git a/src/main/resources/static/src/proxy.js b/src/main/resources/static/src/proxy.js new file mode 100644 index 00000000..4461b09a --- /dev/null +++ b/src/main/resources/static/src/proxy.js @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +var module = app; +module.controller('proxyController', ['$scope', '$location', '$http', 'Notification', 'remoteApi', 'tools', '$window', + function ($scope, $location, $http, Notification, remoteApi, tools, $window) { + $scope.proxyAddrList = []; + $scope.userRole = $window.sessionStorage.getItem("userrole"); + $scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false); + $scope.inputReadonly = !$scope.writeOperationEnabled; + $scope.newProxyAddr = ""; + $scope.allProxyConfig = {}; + + $http({ + method: "GET", + url: "proxy/homePage.query" + }).success(function (resp) { + if (resp.status == 0) { + $scope.proxyAddrList = resp.data.proxyAddrList; + $scope.selectedProxy = resp.data.currentProxyAddr; + $scope.showProxyDetailConfig($scope.selectedProxy); + localStorage.setItem('proxyAddr',$scope.selectedProxy); + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + + $scope.eleChange = function (data) { + $scope.proxyAddrList = data; + } + $scope.showDetailConf = function () { + $(".proxyModal").modal(); + } + + + $scope.showProxyDetailConfig = function (proxyAddr) { + $http({ + method: "GET", + url: "proxy/proxyDetailConfig.query", + params: {proxyAddress: proxyAddr} + }).success(function (resp) { + if (resp.status == 0) { + $scope.allProxyConfig = resp.data; + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + }; + + $scope.updateProxyAddr = function () { + $http({ + method: "POST", + url: "proxy/updateProxyAddr.do", + params: {proxyAddr: $scope.selectedProxy} + }).success(function (resp) { + if (resp.status == 0) { + localStorage.setItem('proxyAddr', $scope.selectedProxy); + Notification.info({message: "SUCCESS", delay: 2000}); + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + $scope.showProxyDetailConfig($scope.selectedProxy); + }; + + $scope.addProxyAddr = function () { + $http({ + method: "POST", + url: "proxy/addProxyAddr.do", + params: {newProxyAddr: $scope.newProxyAddr} + }).success(function (resp) { + if (resp.status == 0) { + if ($scope.proxyAddrList.indexOf($scope.newProxyAddr) == -1) { + $scope.proxyAddrList.push($scope.newProxyAddr); + } + $("#proxyAddr").val(""); + $scope.newProxyAddr = ""; + Notification.info({message: "SUCCESS", delay: 2000}); + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + }; + }]) diff --git a/src/main/resources/static/view/layout/_header.html b/src/main/resources/static/view/layout/_header.html index a78b9f29..81591386 100644 --- a/src/main/resources/static/view/layout/_header.html +++ b/src/main/resources/static/view/layout/_header.html @@ -28,6 +28,7 @@