diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 8f9c69ce..da7ba921 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -1046,10 +1046,8 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti limiter := pc.option.Limiter limiterOn := limiter != nil - if !limiterOn { - if _, ok := pc.crCh[mq.Topic]; !ok { - pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums) - } + if _, ok := pc.crCh[mq.Topic]; !ok { + pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums) } for count := 0; count < len(msgs); count++ { @@ -1065,9 +1063,8 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti if limiterOn { limiter(utils.WithoutNamespace(mq.Topic)) - } else { - pc.crCh[mq.Topic] <- struct{}{} } + pc.crCh[mq.Topic] <- struct{}{} go primitive.WithRecover(func() { defer func() { @@ -1077,9 +1074,7 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti rlog.LogKeyConsumerGroup: pc.consumerGroup, }) } - if !limiterOn { - <-pc.crCh[mq.Topic] - } + <-pc.crCh[mq.Topic] }() RETRY: if pq.IsDroppd() {