Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #301] Adding GET_CONSUMER_RUNNING_INFO #311

Merged
merged 3 commits into from
Nov 28, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,13 @@ type defaultConsumer struct {
*
* See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
*/
consumerGroup string
model MessageModel
allocate func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
unitMode bool
consumeOrderly bool
fromWhere ConsumeFromWhere
consumerGroup string
model MessageModel
allocate func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
unitMode bool
consumeOrderly bool
fromWhere ConsumeFromWhere
consumerStartTimestamp int64

cType ConsumeType
client internal.RMQClient
Expand All @@ -250,7 +251,7 @@ type defaultConsumer struct {
pause bool
once sync.Once
option consumerOptions
// key: int, hash(*primitive.MessageQueue)
// key: primitive.MessageQueue
// value: *processQueue
processQueueTable sync.Map

Expand Down Expand Up @@ -287,7 +288,7 @@ func (dc *defaultConsumer) start() error {
dc.client.UpdateTopicRouteInfo()
dc.client.Start()
dc.state = internal.StateRunning

dc.consumerStartTimestamp = time.Now().UnixNano() / 1e6
wenfengwang marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down
51 changes: 51 additions & 0 deletions consumer/process_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
gods_util "github.com/emirpasic/gods/utils"
uatomic "go.uber.org/atomic"

"github.com/apache/rocketmq-client-go/internal"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
)
Expand Down Expand Up @@ -277,6 +278,28 @@ func (pq *processQueue) Max() int64 {
return -1
}

func (pq *processQueue) MinOrderlyCache() int64 {
if pq.consumingMsgOrderlyTreeMap.Empty() {
return -1
}
k, _ := pq.consumingMsgOrderlyTreeMap.Min()
if k != nil {
return k.(int64)
}
return -1
}

func (pq *processQueue) MaxOrderlyCache() int64 {
if pq.consumingMsgOrderlyTreeMap.Empty() {
return -1
}
k, _ := pq.consumingMsgOrderlyTreeMap.Max()
if k != nil {
return k.(int64)
}
return -1
}

func (pq *processQueue) clear() {
pq.mutex.Lock()
pq.msgCache.Clear()
Expand All @@ -302,3 +325,31 @@ func (pq *processQueue) commit() int64 {
pq.consumingMsgOrderlyTreeMap.Clear()
return offset + 1
}

func (pq *processQueue) currentInfo() internal.ProcessQueueInfo {
pq.mutex.RLock()
defer pq.mutex.RUnlock()
info := internal.ProcessQueueInfo{
Locked: pq.locked.Load(),
TryUnlockTimes: pq.tryUnlockTimes,
LastLockTimestamp: pq.lastLockTime.UnixNano() / 10e6,
Dropped: pq.dropped.Load(),
LastPullTimestamp: pq.lastPullTime.UnixNano() / 10e6,
LastConsumeTimestamp: pq.lastConsumeTime.UnixNano() / 10e6,
}

if !pq.msgCache.Empty() {
info.CachedMsgMinOffset = pq.Min()
info.CachedMsgMaxOffset = pq.Max()
info.CachedMsgCount = pq.msgCache.Size()
info.CachedMsgSizeInMiB = pq.cachedMsgSize / int64(1024*1024)
}

if !pq.consumingMsgOrderlyTreeMap.Empty() {
info.TransactionMsgMinOffset = pq.MinOrderlyCache()
info.TransactionMsgMaxOffset = pq.MaxOrderlyCache()
info.TransactionMsgCount = pq.consumingMsgOrderlyTreeMap.Size()
}

return info
}
39 changes: 39 additions & 0 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,45 @@ func (pc *pushConsumer) IsUnitMode() bool {
return pc.unitMode
}

func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
info := internal.NewConsumerRunningInfo()

pc.subscriptionDataTable.Range(func(key, value interface{}) bool {
topic := key.(string)
info.SubscriptionData[value.(*internal.SubscriptionData)] = true
status := internal.ConsumeStatus{
PullRT: getPullRT(topic, pc.consumerGroup).avgpt,
PullTPS: getPullTPS(topic, pc.consumerGroup).tps,
ConsumeRT: getConsumeRT(topic, pc.consumerGroup).avgpt,
ConsumeOKTPS: getConsumeOKTPS(topic, pc.consumerGroup).tps,
ConsumeFailedTPS: getConsumeFailedTPS(topic, pc.consumerGroup).tps,
ConsumeFailedMsgs: topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + pc.consumerGroup).sum,
}
info.StatusTable[topic] = status
return true
})

pc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
pInfo := pq.currentInfo()
pInfo.CommitOffset = pc.storage.read(&mq, _ReadMemoryThenStore)
info.MQTable[mq] = pInfo
return true
})

nsAddr := ""
for _, value := range pc.namesrv.AddrList() {
nsAddr += fmt.Sprintf("%s;", value)
}
info.Properties[internal.PropNameServerAddr] = nsAddr
info.Properties[internal.PropConsumeType] = string(pc.cType)
info.Properties[internal.PropConsumeOrderly] = strconv.FormatBool(pc.consumeOrderly)
info.Properties[internal.PropThreadPoolCoreSize] = "-1"
info.Properties[internal.PropConsumerStartTimestamp] = strconv.FormatInt(pc.consumerStartTimestamp, 10)
return info
}

func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
v, exit := pc.subscriptionDataTable.Load(topic)
if !exit {
Expand Down
49 changes: 37 additions & 12 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ limitations under the License.
package internal

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -36,6 +35,7 @@ import (
)

const (
clientVersion = "v2.0.0-alpha3"
defaultTraceRegionID = "DefaultRegion"

// tracing message switch
Expand Down Expand Up @@ -83,6 +83,7 @@ type InnerConsumer interface {
SubscriptionDataList() []*SubscriptionData
Rebalance()
IsUnitMode() bool
GetConsumerRunningInfo() *ConsumerRunningInfo
}

func DefaultClientOptions() ClientOptions {
Expand Down Expand Up @@ -220,8 +221,30 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R

client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
rlog.Info("receive get consumer running info request...", nil)
header := new(GetConsumerRunningInfoHeader)
header.Decode(req.ExtFields)
val, exist := clientMap.Load(header.clientID)
res := remote.NewRemotingCommand(ResError, nil, nil)
res.Remark = "the go client has not supported consumer running info"
if !exist {
res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID)
} else {
cli, ok := val.(*rmqClient)
var runningInfo *ConsumerRunningInfo
if ok {
runningInfo = cli.getConsumerRunningInfo(header.consumerGroup)
}
if runningInfo != nil {
res.Code = ResSuccess
data, err := runningInfo.Encode()
if err != nil {
res.Remark = fmt.Sprintf("json marshal error: %s", err.Error())
} else {
res.Body = data
}
} else {
res.Remark = "there is unexpected error when get running info, please check log"
}
}
return res
})
}
Expand Down Expand Up @@ -659,6 +682,18 @@ func (c *rmqClient) isNeedUpdateSubscribeInfo(topic string) bool {
return result
}

func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
consumer, exist := c.consumerMap.Load(group)
if !exist {
return nil
}
info := consumer.(InnerConsumer).GetConsumerRunningInfo()
if info != nil {
info.Properties[PropClientVersion] = clientVersion
}
return info
}

func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*primitive.MessageQueue {
list := make([]*primitive.MessageQueue, 0)
for idx := range data.QueueDataList {
Expand All @@ -676,16 +711,6 @@ func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*primitive.Me
return list
}

func encodeMessages(message []*primitive.Message) []byte {
var buffer bytes.Buffer
index := 0
for index < len(message) {
buffer.Write(message[index].Body)
index++
}
return buffer.Bytes()
}

func brokerVIPChannel(brokerAddr string) string {
if !_VIPChannelEnable {
return brokerAddr
Expand Down
30 changes: 30 additions & 0 deletions internal/mock_namesrv.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading