diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 687811409e0..244b459d630 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -19,6 +19,7 @@ import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import java.util.BitSet; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.common.KeyBuilder; @@ -186,46 +187,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo); if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) { - // order - String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId; - long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); - if (ackOffset < oldOffset) { - return; - } - while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) { - } - try { - oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); - if (ackOffset < oldOffset) { - return; - } - long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext( - topic, consumeGroup, - qId, ackOffset, - popTime); - if (nextOffset > -1) { - if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset( - topic, consumeGroup, qId)) { - this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), - consumeGroup, topic, qId, nextOffset); - } - if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic, - consumeGroup, qId, invisibleTime)) { - this.brokerController.getPopMessageProcessor().notifyMessageArriving( - topic, consumeGroup, qId); - } - } else if (nextOffset == -1) { - String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s", - lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress()); - POP_LOGGER.warn(errorInfo); - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - response.setRemark(errorInfo); - return; - } - } finally { - this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey); - } - brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); + ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response); return; } @@ -250,17 +212,22 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA } BatchAckMsg batchAckMsg = new BatchAckMsg(); - for (int i = 0; batchAck.getBitSet() != null && i < batchAck.getBitSet().length(); i++) { - if (!batchAck.getBitSet().get(i)) { - continue; + BitSet bitSet = batchAck.getBitSet(); + for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { + if (i == Integer.MAX_VALUE) { + break; } long offset = startOffset + i; if (offset < minOffset || offset > maxOffset) { continue; } - batchAckMsg.getAckOffsetList().add(offset); + if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) { + ackOrderly(topic, consumeGroup, qId, offset, popTime, invisibleTime, channel, response); + } else { + batchAckMsg.getAckOffsetList().add(offset); + } } - if (batchAckMsg.getAckOffsetList().isEmpty()) { + if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) { return; } @@ -311,4 +278,46 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus()); brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); } + + protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOffset, long popTime, long invisibleTime, Channel channel, RemotingCommand response) { + String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId; + long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); + if (ackOffset < oldOffset) { + return; + } + while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) { + } + try { + oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); + if (ackOffset < oldOffset) { + return; + } + long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext( + topic, consumeGroup, + qId, ackOffset, + popTime); + if (nextOffset > -1) { + if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset( + topic, consumeGroup, qId)) { + this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), + consumeGroup, topic, qId, nextOffset); + } + if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic, + consumeGroup, qId, invisibleTime)) { + this.brokerController.getPopMessageProcessor().notifyMessageArriving( + topic, consumeGroup, qId); + } + } else if (nextOffset == -1) { + String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s", + lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress()); + POP_LOGGER.warn(errorInfo); + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark(errorInfo); + return; + } + } finally { + this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey); + } + brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, 1); + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 5101ffc8e42..213c26fd66a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -54,6 +55,7 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.AclConfig; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; @@ -76,7 +78,8 @@ import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.common.topic.TopicValidator; -import org.apache.rocketmq.common.BoundaryType; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -101,7 +104,10 @@ import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.BatchAck; +import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody; import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; +import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; @@ -114,7 +120,6 @@ import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; -import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; import org.apache.rocketmq.remoting.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody; @@ -196,6 +201,10 @@ import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader; @@ -207,10 +216,6 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.PutKVConfigRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; @@ -221,8 +226,6 @@ import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.rpchook.DynamicalExtFieldRPCHook; import org.apache.rocketmq.remoting.rpchook.StreamTypeRPCHook; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS; @@ -885,9 +888,77 @@ public void ackMessageAsync( final String addr, final long timeOut, final AckCallback ackCallback, - final AckMessageRequestHeader requestHeader // + final AckMessageRequestHeader requestHeader + ) throws RemotingException, MQBrokerException, InterruptedException { + ackMessageAsync(addr, timeOut, ackCallback, requestHeader, null); + } + + public void batchAckMessageAsync( + final String addr, + final long timeOut, + final AckCallback ackCallback, + final String topic, + final String consumerGroup, + final List extraInfoList + ) throws RemotingException, MQBrokerException, InterruptedException { + String brokerName = null; + Map batchAckMap = new HashMap<>(); + for (String extraInfo : extraInfoList) { + String[] extraInfoData = ExtraInfoUtil.split(extraInfo); + if (brokerName == null) { + brokerName = ExtraInfoUtil.getBrokerName(extraInfoData); + } + String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" + + ExtraInfoUtil.getQueueId(extraInfoData) + "@" + + ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" + + ExtraInfoUtil.getPopTime(extraInfoData); + BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> { + BatchAck newBatchAck = new BatchAck(); + newBatchAck.setConsumerGroup(consumerGroup); + newBatchAck.setTopic(topic); + newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData)); + newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData)); + newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData)); + newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData)); + newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData)); + newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData)); + newBatchAck.setBitSet(new BitSet()); + return newBatchAck; + }); + bAck.getBitSet().set((int) (ExtraInfoUtil.getQueueOffset(extraInfoData) - ExtraInfoUtil.getCkQueueOffset(extraInfoData))); + } + + BatchAckMessageRequestBody requestBody = new BatchAckMessageRequestBody(); + requestBody.setBrokerName(brokerName); + requestBody.setAcks(new ArrayList<>(batchAckMap.values())); + batchAckMessageAsync(addr, timeOut, ackCallback, requestBody); + } + + public void batchAckMessageAsync( + final String addr, + final long timeOut, + final AckCallback ackCallback, + final BatchAckMessageRequestBody requestBody ) throws RemotingException, MQBrokerException, InterruptedException { - final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + ackMessageAsync(addr, timeOut, ackCallback, null, requestBody); + } + + protected void ackMessageAsync( + final String addr, + final long timeOut, + final AckCallback ackCallback, + final AckMessageRequestHeader requestHeader, + final BatchAckMessageRequestBody requestBody + ) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand request; + if (requestHeader != null) { + request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + } else { + request = RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null); + if (requestBody != null) { + request.setBody(requestBody.encode()); + } + } this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) { @Override diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java index 496bd6da432..09c60c0b45f 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.test.client.rmq; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.AckCallback; @@ -140,6 +141,27 @@ public void onException(Throwable e) { return future; } + public CompletableFuture batchAckMessageAsync(String brokerAddr, String topic, String consumerGroup, + List extraInfoList) { + CompletableFuture future = new CompletableFuture<>(); + try { + this.mqClientAPI.batchAckMessageAsync(brokerAddr, DEFAULT_TIMEOUT, new AckCallback() { + @Override + public void onSuccess(AckResult ackResult) { + future.complete(ackResult); + } + + @Override + public void onException(Throwable e) { + future.completeExceptionally(e); + } + }, topic, consumerGroup, extraInfoList); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; + } + public CompletableFuture changeInvisibleTimeAsync(String brokerAddr, String brokerName, String topic, String consumerGroup, String extraInfo, long invisibleTime) { String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java index 952fbe3f5f5..2e29b95a5af 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java @@ -63,4 +63,10 @@ protected CompletableFuture popMessageAsync(long invisibleTime, int m brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout, true, ConsumeInitMode.MIN, false, ExpressionType.TAG, "*"); } + + protected CompletableFuture popMessageAsync(long invisibleTime, int maxNums) { + return client.popMessageAsync( + brokerAddr, messageQueue, invisibleTime, maxNums, group, 3000, false, + ConsumeInitMode.MIN, false, ExpressionType.TAG, "*"); + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java new file mode 100644 index 00000000000..ec9153ccc98 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.pop; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.apache.rocketmq.client.consumer.AckResult; +import org.apache.rocketmq.client.consumer.AckStatus; +import org.apache.rocketmq.client.consumer.PopResult; +import org.apache.rocketmq.client.consumer.PopStatus; +import org.apache.rocketmq.common.attribute.CQType; +import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.common.constant.ConsumeInitMode; +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.base.IntegrationTestBase; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.client.rmq.RMQPopClient; +import org.apache.rocketmq.test.util.MQRandomUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; + +public class BatchAckIT extends BasePop { + + protected String topic; + protected String group; + protected RMQNormalProducer producer = null; + protected RMQPopClient client = null; + protected String brokerAddr; + protected MessageQueue messageQueue; + + @Before + public void setUp() { + brokerAddr = brokerController1.getBrokerAddr(); + topic = MQRandomUtils.getRandomTopic(); + group = initConsumerGroup(); + IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 8, CQType.SimpleCQ, TopicMessageType.NORMAL); + producer = getProducer(NAMESRV_ADDR, topic); + client = getRMQPopClient(); + messageQueue = new MessageQueue(topic, BROKER1_NAME, -1); + } + + @After + public void tearDown() { + shutdown(); + } + + @Test + public void testBatchAckNormallyWithPopBuffer() throws Throwable { + brokerController1.getBrokerConfig().setEnablePopBufferMerge(true); + brokerController2.getBrokerConfig().setEnablePopBufferMerge(true); + + testBatchAck(() -> { + try { + return popMessageAsync().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testBatchAckNormallyWithOutPopBuffer() throws Throwable { + brokerController1.getBrokerConfig().setEnablePopBufferMerge(false); + brokerController2.getBrokerConfig().setEnablePopBufferMerge(false); + + testBatchAck(() -> { + try { + return popMessageAsync().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testBatchAckOrderly() throws Throwable { + testBatchAck(() -> { + try { + return popMessageOrderlyAsync().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + public void testBatchAck(Supplier popResultSupplier) throws Throwable { + // Send 10 messages but do not ack, let them enter the retry topic + producer.send(10); + AtomicInteger firstMsgRcvNum = new AtomicInteger(); + await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { + PopResult popResult = popResultSupplier.get(); + if (popResult.getPopStatus().equals(PopStatus.FOUND)) { + firstMsgRcvNum.addAndGet(popResult.getMsgFoundList().size()); + } + assertEquals(10, firstMsgRcvNum.get()); + }); + // sleep 6s, expect messages to enter the retry topic + TimeUnit.SECONDS.sleep(6); + + producer.send(20); + List extraInfoList = new ArrayList<>(); + await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { + PopResult popResult = popResultSupplier.get(); + if (popResult.getPopStatus().equals(PopStatus.FOUND)) { + for (MessageExt messageExt : popResult.getMsgFoundList()) { + extraInfoList.add(messageExt.getProperty(MessageConst.PROPERTY_POP_CK)); + } + } + assertEquals(30, extraInfoList.size()); + }); + + AckResult ackResult = client.batchAckMessageAsync(brokerAddr, topic, group, extraInfoList).get(); + assertEquals(AckStatus.OK, ackResult.getStatus()); + + // sleep 6s, expected that messages that have been acked will not be re-consumed + TimeUnit.SECONDS.sleep(6); + PopResult popResult = popResultSupplier.get(); + assertEquals(PopStatus.POLLING_NOT_FOUND, popResult.getPopStatus()); + } + + private CompletableFuture popMessageAsync() { + return client.popMessageAsync( + brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, group, 3000, false, + ConsumeInitMode.MIN, false, ExpressionType.TAG, "*"); + } + + private CompletableFuture popMessageOrderlyAsync() { + return client.popMessageAsync( + brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, group, 3000, false, + ConsumeInitMode.MIN, true, ExpressionType.TAG, "*", null); + } +}