Skip to content

Commit

Permalink
[ISSUE apache#7205] support batch ack for pop orderly (apache#7206)
Browse files Browse the repository at this point in the history
  • Loading branch information
xdkxlk authored Aug 18, 2023
1 parent 05e7cde commit 72d796f
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.BitSet;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.common.KeyBuilder;
Expand Down Expand Up @@ -186,46 +187,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);

if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
// order
String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId;
long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
if (ackOffset < oldOffset) {
return;
}
while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) {
}
try {
oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
if (ackOffset < oldOffset) {
return;
}
long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext(
topic, consumeGroup,
qId, ackOffset,
popTime);
if (nextOffset > -1) {
if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
topic, consumeGroup, qId)) {
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
consumeGroup, topic, qId, nextOffset);
}
if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
consumeGroup, qId, invisibleTime)) {
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
topic, consumeGroup, qId);
}
} else if (nextOffset == -1) {
String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s",
lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress());
POP_LOGGER.warn(errorInfo);
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(errorInfo);
return;
}
} finally {
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
}
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response);
return;
}

Expand All @@ -250,17 +212,22 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
}

BatchAckMsg batchAckMsg = new BatchAckMsg();
for (int i = 0; batchAck.getBitSet() != null && i < batchAck.getBitSet().length(); i++) {
if (!batchAck.getBitSet().get(i)) {
continue;
BitSet bitSet = batchAck.getBitSet();
for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
if (i == Integer.MAX_VALUE) {
break;
}
long offset = startOffset + i;
if (offset < minOffset || offset > maxOffset) {
continue;
}
batchAckMsg.getAckOffsetList().add(offset);
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
ackOrderly(topic, consumeGroup, qId, offset, popTime, invisibleTime, channel, response);
} else {
batchAckMsg.getAckOffsetList().add(offset);
}
}
if (batchAckMsg.getAckOffsetList().isEmpty()) {
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) {
return;
}

Expand Down Expand Up @@ -311,4 +278,46 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
}

protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOffset, long popTime, long invisibleTime, Channel channel, RemotingCommand response) {
String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId;
long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
if (ackOffset < oldOffset) {
return;
}
while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) {
}
try {
oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
if (ackOffset < oldOffset) {
return;
}
long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext(
topic, consumeGroup,
qId, ackOffset,
popTime);
if (nextOffset > -1) {
if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
topic, consumeGroup, qId)) {
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
consumeGroup, topic, qId, nextOffset);
}
if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
consumeGroup, qId, invisibleTime)) {
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
topic, consumeGroup, qId);
}
} else if (nextOffset == -1) {
String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s",
lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress());
POP_LOGGER.warn(errorInfo);
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(errorInfo);
return;
}
} finally {
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
}
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
Expand All @@ -76,7 +78,8 @@
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
Expand All @@ -101,7 +104,10 @@
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.BatchAck;
import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
Expand All @@ -114,7 +120,6 @@
import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
Expand Down Expand Up @@ -196,6 +201,10 @@
import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
Expand All @@ -207,10 +216,6 @@
import org.apache.rocketmq.remoting.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
Expand All @@ -221,8 +226,6 @@
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.rpchook.DynamicalExtFieldRPCHook;
import org.apache.rocketmq.remoting.rpchook.StreamTypeRPCHook;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;

Expand Down Expand Up @@ -885,9 +888,77 @@ public void ackMessageAsync(
final String addr,
final long timeOut,
final AckCallback ackCallback,
final AckMessageRequestHeader requestHeader //
final AckMessageRequestHeader requestHeader
) throws RemotingException, MQBrokerException, InterruptedException {
ackMessageAsync(addr, timeOut, ackCallback, requestHeader, null);
}

public void batchAckMessageAsync(
final String addr,
final long timeOut,
final AckCallback ackCallback,
final String topic,
final String consumerGroup,
final List<String> extraInfoList
) throws RemotingException, MQBrokerException, InterruptedException {
String brokerName = null;
Map<String, BatchAck> batchAckMap = new HashMap<>();
for (String extraInfo : extraInfoList) {
String[] extraInfoData = ExtraInfoUtil.split(extraInfo);
if (brokerName == null) {
brokerName = ExtraInfoUtil.getBrokerName(extraInfoData);
}
String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" +
ExtraInfoUtil.getQueueId(extraInfoData) + "@" +
ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" +
ExtraInfoUtil.getPopTime(extraInfoData);
BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> {
BatchAck newBatchAck = new BatchAck();
newBatchAck.setConsumerGroup(consumerGroup);
newBatchAck.setTopic(topic);
newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData));
newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData));
newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData));
newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData));
newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData));
newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData));
newBatchAck.setBitSet(new BitSet());
return newBatchAck;
});
bAck.getBitSet().set((int) (ExtraInfoUtil.getQueueOffset(extraInfoData) - ExtraInfoUtil.getCkQueueOffset(extraInfoData)));
}

BatchAckMessageRequestBody requestBody = new BatchAckMessageRequestBody();
requestBody.setBrokerName(brokerName);
requestBody.setAcks(new ArrayList<>(batchAckMap.values()));
batchAckMessageAsync(addr, timeOut, ackCallback, requestBody);
}

public void batchAckMessageAsync(
final String addr,
final long timeOut,
final AckCallback ackCallback,
final BatchAckMessageRequestBody requestBody
) throws RemotingException, MQBrokerException, InterruptedException {
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
ackMessageAsync(addr, timeOut, ackCallback, null, requestBody);
}

protected void ackMessageAsync(
final String addr,
final long timeOut,
final AckCallback ackCallback,
final AckMessageRequestHeader requestHeader,
final BatchAckMessageRequestBody requestBody
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request;
if (requestHeader != null) {
request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
if (requestBody != null) {
request.setBody(requestBody.encode());
}
}
this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.test.client.rmq;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.AckCallback;
Expand Down Expand Up @@ -140,6 +141,27 @@ public void onException(Throwable e) {
return future;
}

public CompletableFuture<AckResult> batchAckMessageAsync(String brokerAddr, String topic, String consumerGroup,
List<String> extraInfoList) {
CompletableFuture<AckResult> future = new CompletableFuture<>();
try {
this.mqClientAPI.batchAckMessageAsync(brokerAddr, DEFAULT_TIMEOUT, new AckCallback() {
@Override
public void onSuccess(AckResult ackResult) {
future.complete(ackResult);
}

@Override
public void onException(Throwable e) {
future.completeExceptionally(e);
}
}, topic, consumerGroup, extraInfoList);
} catch (Throwable t) {
future.completeExceptionally(t);
}
return future;
}

public CompletableFuture<AckResult> changeInvisibleTimeAsync(String brokerAddr, String brokerName, String topic,
String consumerGroup, String extraInfo, long invisibleTime) {
String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,10 @@ protected CompletableFuture<PopResult> popMessageAsync(long invisibleTime, int m
brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout, true,
ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
}

protected CompletableFuture<PopResult> popMessageAsync(long invisibleTime, int maxNums) {
return client.popMessageAsync(
brokerAddr, messageQueue, invisibleTime, maxNums, group, 3000, false,
ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
}
}
Loading

0 comments on commit 72d796f

Please sign in to comment.