Skip to content

Commit

Permalink
[ISSUE #953] fix limiter with goroutine cover (#952)
Browse files Browse the repository at this point in the history
* fix limiter with goroutine cover

* fix limiter with goroutine cover

Co-authored-by: 鲁扬 <qingxiang.mqx@alibaba-inc.com>
  • Loading branch information
maqingxiang and 鲁扬 authored Nov 2, 2022
1 parent 8afd69f commit 96f00c4
Showing 1 changed file with 4 additions and 9 deletions.
13 changes: 4 additions & 9 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,10 +1046,8 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti

limiter := pc.option.Limiter
limiterOn := limiter != nil
if !limiterOn {
if _, ok := pc.crCh[mq.Topic]; !ok {
pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums)
}
if _, ok := pc.crCh[mq.Topic]; !ok {
pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums)
}

for count := 0; count < len(msgs); count++ {
Expand All @@ -1065,9 +1063,8 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti

if limiterOn {
limiter(utils.WithoutNamespace(mq.Topic))
} else {
pc.crCh[mq.Topic] <- struct{}{}
}
pc.crCh[mq.Topic] <- struct{}{}

go primitive.WithRecover(func() {
defer func() {
Expand All @@ -1077,9 +1074,7 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
}
if !limiterOn {
<-pc.crCh[mq.Topic]
}
<-pc.crCh[mq.Topic]
}()
RETRY:
if pq.IsDroppd() {
Expand Down

0 comments on commit 96f00c4

Please sign in to comment.