targetList;
+
+ private CountDownLatch countDownLatch;
+
+ public AsyncShutdownHelper() {
+ this.targetList = new ArrayList<>();
+ this.shutdown = new AtomicBoolean(false);
+ }
+
+ public void addTarget(Shutdown target) {
+ if (shutdown.get()) {
+ return;
+ }
+ targetList.add(target);
+ }
+
+ public AsyncShutdownHelper shutdown() {
+ if (shutdown.get()) {
+ return this;
+ }
+ if (targetList.isEmpty()) {
+ return this;
+ }
+ this.countDownLatch = new CountDownLatch(targetList.size());
+ for (Shutdown target : targetList) {
+ Runnable runnable = () -> {
+ try {
+ target.shutdown();
+ } catch (Exception ignored) {
+
+ } finally {
+ countDownLatch.countDown();
+ }
+ };
+ new Thread(runnable).start();
+ }
+ return this;
+ }
+
+ public boolean await(long time, TimeUnit unit) throws InterruptedException {
+ if (shutdown.get()) {
+ return false;
+ }
+ try {
+ return this.countDownLatch.await(time, unit);
+ } finally {
+ shutdown.compareAndSet(false, true);
+ }
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ServiceProvider.java b/common/src/main/java/org/apache/rocketmq/common/utils/ServiceProvider.java
index 65dea47b5ea..49e2c442b23 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/ServiceProvider.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ServiceProvider.java
@@ -50,7 +50,7 @@ public class ServiceProvider {
* Returns a string that uniquely identifies the specified object, including its class.
*
* The returned string is of form "classname@hashcode", ie is the same as the return value of the Object.toString()
- * method, but works even when the specified object's class has overidden the toString method.
+ * method, but works even when the specified object's class has overridden the toString method.
*
* @param o may be null.
* @return a string of form classname@hashcode, or "null" if param o is null.
diff --git a/container/pom.xml b/container/pom.xml
index b9514defdb8..cc177abeea9 100644
--- a/container/pom.xml
+++ b/container/pom.xml
@@ -18,7 +18,7 @@
org.apache.rocketmq
rocketmq-all
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
diff --git a/controller/pom.xml b/controller/pom.xml
index 82b6fc7d969..7092ca2b3cd 100644
--- a/controller/pom.xml
+++ b/controller/pom.xml
@@ -19,7 +19,7 @@
rocketmq-all
org.apache.rocketmq
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
jar
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index be487849ce5..3421010340a 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -101,7 +101,7 @@ public class DLedgerController implements Controller {
private final List brokerLifecycleListeners;
- // Usr for checking whether the broker is alive
+ // use for checking whether the broker is alive
private BrokerValidPredicate brokerAlivePredicate;
// use for elect a master
private ElectPolicy electPolicy;
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 60fc6170bbe..88521fbede7 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -20,7 +20,7 @@
org.apache.rocketmq
rocketmq-all
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
rocketmq-distribution
rocketmq-distribution ${project.version}
diff --git a/docs/cn/best_practice.md b/docs/cn/best_practice.md
index 5cc5b37643f..36d6acff6bd 100755
--- a/docs/cn/best_practice.md
+++ b/docs/cn/best_practice.md
@@ -253,7 +253,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul
| clientIP | 本机IP | 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 |
| instanceName | DEFAULT | 客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等) |
| clientCallbackExecutorThreads | 4 | 通信层异步回调线程数 |
-| pollNameServerInteval | 30000 | 轮询Name Server间隔时间,单位毫秒 |
+| pollNameServerInterval | 30000 | 轮询Name Server间隔时间,单位毫秒 |
| heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 |
| persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 |
diff --git a/docs/cn/controller/design.md b/docs/cn/controller/design.md
index 563a624eddc..13eba7764a6 100644
--- a/docs/cn/controller/design.md
+++ b/docs/cn/controller/design.md
@@ -121,13 +121,13 @@ nextTransferFromWhere + size > currentTransferEpochEndOffset,则将 selectMapp
![示意图](../image/controller/controller_design_3.png)
-`current state(4byte) + Two flags(4byte) + slaveAddressLength(4byte) + slaveAddress(50byte)`
+`current state(4byte) + Two flags(4byte) + slaveBrokerId(8byte)`
- Current state 代表当前的 HAConnectionState,也即 HANDSHAKE。
- Two flags 是两个状态标志位,其中,isSyncFromLastFile 代表是否要从 Master 的最后一个文件开始复制,isAsyncLearner 代表该 Slave 是否是异步复制,并以 Learner 的形式接入 Master。
-- slaveAddressLength 与 slaveAddress 代表了该 Slave 的地址,用于后续加入 SyncStateSet 。
+- slaveBrokerId 代表了该 Slave 的 brokerId,用于后续加入 SyncStateSet 。
2.AutoSwitchHaConnection (Master) 会向 Slave 回送 HandShake 包,如下:
diff --git a/docs/cn/image/controller/controller_design_3.png b/docs/cn/image/controller/controller_design_3.png
index 8c475bcecf1..0379c231d46 100644
Binary files a/docs/cn/image/controller/controller_design_3.png and b/docs/cn/image/controller/controller_design_3.png differ
diff --git a/docs/en/Configuration_Client.md b/docs/en/Configuration_Client.md
index 4d999b2feda..4679957af5a 100644
--- a/docs/en/Configuration_Client.md
+++ b/docs/en/Configuration_Client.md
@@ -48,7 +48,7 @@ HTTP static server addressing is recommended, because it is simple client deploy
| clientIP | local IP | Client local ip address, some machines will fail to recognize the client IP address, which needs to be enforced in the code |
| instanceName | DEFAULT | Name of the client instance, Multiple producers and consumers created by the client actually share one internal instance (this instance contains network connection, thread resources, etc.). |
| clientCallbackExecutorThreads | 4 | Number of communication layer asynchronous callback threads |
-| pollNameServerInteval | 30000 | Polling the Name Server interval in milliseconds |
+| pollNameServerInterval | 30000 | Polling the Name Server interval in milliseconds |
| heartbeatBrokerInterval | 30000 | The heartbeat interval, in milliseconds, is sent to the Broker |
| persistConsumerOffsetInterval | 5000 | The persistent Consumer consumes the progress interval in milliseconds |
diff --git a/docs/en/controller/design.md b/docs/en/controller/design.md
index ba2de58af14..af4958a4d3e 100644
--- a/docs/en/controller/design.md
+++ b/docs/en/controller/design.md
@@ -112,13 +112,13 @@ According to the above, we can know the AutoSwitchHaService protocol divides log
![示意图](../image/controller/controller_design_3.png)
-`current state(4byte) + Two flags(4byte) + slaveAddressLength(4byte) + slaveAddress(50byte)`
+`current state(4byte) + Two flags(4byte) + slaveBrokerId(8byte)`
- `Current state` represents the current HAConnectionState, which is HANDSHAKE.
- Two flags are two status flags, where `isSyncFromLastFile` indicates whether to start copying from the Master's last file, and `isAsyncLearner` indicates whether the Slave is an asynchronous copy and joins the Master as a Learner.
-- `slaveAddressLength` and `slaveAddress` represent the address of the Slave, which will be used later to join the SyncStateSet.
+- `slaveBrokerId` represent the brokerId of the Slave, which will be used later to join the SyncStateSet.
2.AutoSwitchHaConnection (Master) will send a HandShake packet back to the Slave as follows:
diff --git a/docs/en/image/controller/controller_design_3.png b/docs/en/image/controller/controller_design_3.png
index 8c475bcecf1..0379c231d46 100644
Binary files a/docs/en/image/controller/controller_design_3.png and b/docs/en/image/controller/controller_design_3.png differ
diff --git a/example/pom.xml b/example/pom.xml
index 7685a811690..19047c2f552 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -19,7 +19,7 @@
rocketmq-all
org.apache.rocketmq
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
diff --git a/filter/pom.xml b/filter/pom.xml
index 0acaa73f8ae..262177b61c2 100644
--- a/filter/pom.xml
+++ b/filter/pom.xml
@@ -20,7 +20,7 @@
rocketmq-all
org.apache.rocketmq
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index d53540601e6..012ebafe064 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml
index 09ab5ed2586..8ea4745b25d 100644
--- a/openmessaging/pom.xml
+++ b/openmessaging/pom.xml
@@ -20,7 +20,7 @@
rocketmq-all
org.apache.rocketmq
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
index 36ac27f417a..46e607a5802 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
@@ -82,7 +82,7 @@ public V get(final long timeout) {
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
- LOG.error("promise get value interrupted,excepiton:{}", e.getMessage());
+ LOG.error("promise get value interrupted,exception:{}", e.getMessage());
}
if (!isDoing()) {
diff --git a/pom.xml b/pom.xml
index 8449bd6fb88..b18d9bbb439 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,7 +28,7 @@
2012
org.apache.rocketmq
rocketmq-all
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
pom
Apache RocketMQ ${project.version}
http://rocketmq.apache.org/
@@ -108,7 +108,7 @@
3.20.0-GA
4.2.2
3.12.0
- 2.7
+ 2.14.0
32.0.1-jre
2.9.0
0.3.1-alpha
diff --git a/proxy/pom.xml b/proxy/pom.xml
index 41e6fa95f55..e608d9f587f 100644
--- a/proxy/pom.xml
+++ b/proxy/pom.xml
@@ -20,7 +20,7 @@
rocketmq-all
org.apache.rocketmq
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/GrpcUtils.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/GrpcUtils.java
new file mode 100644
index 00000000000..5c50de4426e
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/GrpcUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common.utils;
+
+import io.grpc.Attributes;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+
+public class GrpcUtils {
+
+ private GrpcUtils() {
+ }
+
+ public static void putHeaderIfNotExist(Metadata headers, Metadata.Key key, T value) {
+ if (headers == null) {
+ return;
+ }
+ if (!headers.containsKey(key) && value != null) {
+ headers.put(key, value);
+ }
+ }
+
+ public static T getAttribute(ServerCall call, Attributes.Key key) {
+ Attributes attributes = call.getAttributes();
+ if (attributes == null) {
+ return null;
+ }
+ return attributes.get(key);
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java
index 28ee019fae7..e082ba6e28c 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java
@@ -33,6 +33,7 @@
import org.apache.rocketmq.acl.common.AuthenticationHeader;
import org.apache.rocketmq.acl.plain.PlainAccessResource;
import org.apache.rocketmq.common.constant.GrpcConstants;
+import org.apache.rocketmq.proxy.common.utils.GrpcUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
public class AuthenticationInterceptor implements ServerInterceptor {
@@ -49,8 +50,8 @@ public ServerCall.Listener interceptCall(ServerCall call, Metada
@Override
public void onMessage(R message) {
GeneratedMessageV3 messageV3 = (GeneratedMessageV3) message;
- headers.put(GrpcConstants.RPC_NAME, messageV3.getDescriptorForType().getFullName());
- headers.put(GrpcConstants.SIMPLE_RPC_NAME, messageV3.getDescriptorForType().getName());
+ GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.RPC_NAME, messageV3.getDescriptorForType().getFullName());
+ GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.SIMPLE_RPC_NAME, messageV3.getDescriptorForType().getName());
if (ConfigurationManager.getProxyConfig().isEnableACL()) {
try {
AuthenticationHeader authenticationHeader = AuthenticationHeader.builder()
@@ -85,7 +86,7 @@ protected void validate(AuthenticationHeader authenticationHeader, Metadata head
if (accessResource instanceof PlainAccessResource) {
PlainAccessResource plainAccessResource = (PlainAccessResource) accessResource;
- headers.put(GrpcConstants.AUTHORIZATION_AK, plainAccessResource.getAccessKey());
+ GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.AUTHORIZATION_AK, plainAccessResource.getAccessKey());
}
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
index 1de2ce4f986..e3e78841559 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
@@ -27,6 +27,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.GrpcConstants;
+import org.apache.rocketmq.proxy.common.utils.GrpcUtils;
import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
import java.net.InetSocketAddress;
@@ -44,11 +45,11 @@ public ServerCall.Listener interceptCall(
SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
remoteAddress = parseSocketAddress(remoteSocketAddress);
}
- headers.put(GrpcConstants.REMOTE_ADDRESS, remoteAddress);
+ GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.REMOTE_ADDRESS, remoteAddress);
SocketAddress localSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
String localAddress = parseSocketAddress(localSocketAddress);
- headers.put(GrpcConstants.LOCAL_ADDRESS, localAddress);
+ GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.LOCAL_ADDRESS, localAddress);
for (Attributes.Key> key : call.getAttributes().keys()) {
if (!StringUtils.startsWith(key.toString(), HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
@@ -57,12 +58,12 @@ public ServerCall.Listener interceptCall(
Metadata.Key headerKey
= Metadata.Key.of(key.toString(), Metadata.ASCII_STRING_MARSHALLER);
String headerValue = String.valueOf(call.getAttributes().get(key));
- headers.put(headerKey, headerValue);
+ GrpcUtils.putHeaderIfNotExist(headers, headerKey, headerValue);
}
String channelId = call.getAttributes().get(AttributeKeys.CHANNEL_ID);
if (StringUtils.isNotBlank(channelId)) {
- headers.put(GrpcConstants.CHANNEL_ID, channelId);
+ GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.CHANNEL_ID, channelId);
}
return next.startCall(call, headers);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java
index 58eed91c9fa..e317b48f1ed 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java
@@ -31,6 +31,7 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.utils.GrpcUtils;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
public class AuthenticationPipeline implements RequestPipeline {
@@ -73,7 +74,7 @@ protected AuthenticationContext newContext(ProxyContext context, Metadata header
if (result instanceof DefaultAuthenticationContext) {
DefaultAuthenticationContext defaultAuthenticationContext = (DefaultAuthenticationContext) result;
if (StringUtils.isNotBlank(defaultAuthenticationContext.getUsername())) {
- headers.put(GrpcConstants.AUTHORIZATION_AK, defaultAuthenticationContext.getUsername());
+ GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.AUTHORIZATION_AK, defaultAuthenticationContext.getUsername());
}
}
return result;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
index 39d7057bddd..518868831f4 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
@@ -118,11 +118,11 @@ protected HAProxyMessage buildHAProxyMessage(Channel inboundChannel) throws Ille
}
} else {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(inboundChannel);
- sourceAddress = StringUtils.substringBefore(remoteAddr, CommonConstants.COLON);
+ sourceAddress = StringUtils.substringBeforeLast(remoteAddr, CommonConstants.COLON);
sourcePort = Integer.parseInt(StringUtils.substringAfterLast(remoteAddr, CommonConstants.COLON));
String localAddr = RemotingHelper.parseChannelLocalAddr(inboundChannel);
- destinationAddress = StringUtils.substringBefore(localAddr, CommonConstants.COLON);
+ destinationAddress = StringUtils.substringBeforeLast(localAddr, CommonConstants.COLON);
destinationPort = Integer.parseInt(StringUtils.substringAfterLast(localAddr, CommonConstants.COLON));
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index 6b2ba02f7c9..a8088a95d0a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -176,7 +176,8 @@ public CompletableFuture sendMessageBack(ProxyContext ctx, Rece
}
@Override
- public CompletableFuture endTransactionOneway(ProxyContext ctx, String brokerName, EndTransactionRequestHeader requestHeader,
+ public CompletableFuture endTransactionOneway(ProxyContext ctx, String brokerName,
+ EndTransactionRequestHeader requestHeader,
long timeoutMillis) {
CompletableFuture future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createChannel(ctx);
@@ -310,9 +311,8 @@ public CompletableFuture changeInvisibleTime(ProxyContext ctx, Receip
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader, ctx.getLanguage());
CompletableFuture future = new CompletableFuture<>();
try {
- RemotingCommand response = brokerController.getChangeInvisibleTimeProcessor()
- .processRequest(channelHandlerContext, command);
- future.complete(response);
+ future = brokerController.getChangeInvisibleTimeProcessor()
+ .processRequestAsync(channelHandlerContext.channel(), command, true);
} catch (Exception e) {
log.error("Fail to process changeInvisibleTime command", e);
future.completeExceptionally(e);
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
index 3e3d37086b5..f7a656d7682 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.proxy.service.message;
+import io.netty.channel.Channel;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -370,11 +371,11 @@ public void testChangeInvisibleTime() throws Exception {
responseHeader.setReviveQid(newReviveQueueId);
responseHeader.setInvisibleTime(newInvisibleTime);
responseHeader.setPopTime(newPopTime);
- Mockito.when(changeInvisibleTimeProcessorMock.processRequest(Mockito.any(SimpleChannelHandlerContext.class), Mockito.argThat(argument -> {
+ Mockito.when(changeInvisibleTimeProcessorMock.processRequestAsync(Mockito.any(Channel.class), Mockito.argThat(argument -> {
boolean first = argument.getCode() == RequestCode.CHANGE_MESSAGE_INVISIBLETIME;
boolean second = argument.readCustomHeader() instanceof ChangeInvisibleTimeRequestHeader;
return first && second;
- }))).thenReturn(remotingCommand);
+ }), Mockito.any(Boolean.class))).thenReturn(CompletableFuture.completedFuture(remotingCommand));
ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader();
CompletableFuture future = localMessageService.changeInvisibleTime(proxyContext, handle, messageId,
requestHeader, 1000L);
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 566c983ea98..65e9a852fcc 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 9f3136195b3..ffa37260594 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -39,8 +39,8 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.AbortProcessException;
@@ -393,7 +393,7 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm
responseFuture.release();
}
} else {
- log.warn("receive response, cmd={}, but not matched any request, address={}", cmd, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ log.warn("receive response, cmd={}, but not matched any request, address={}, channelId={}", cmd, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), ctx.channel().id());
}
}
@@ -560,13 +560,13 @@ public void operationFail(Throwable throwable) {
return;
}
requestFail(opaque);
- log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
+ log.warn("send a request command to channel <{}>, channelId={}, failed.", RemotingHelper.parseChannelRemoteAddr(channel), channel.id());
});
return future;
} catch (Exception e) {
responseTable.remove(opaque);
responseFuture.release();
- log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
+ log.warn("send a request command to channel <{}> channelId={} Exception", RemotingHelper.parseChannelRemoteAddr(channel), channel.id(), e);
future.completeExceptionally(new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e));
return future;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 41976122b2f..ae82b09edaf 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -49,7 +49,6 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.cert.CertificateException;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -416,14 +415,14 @@ public void closeChannel(final String addr, final Channel channel) {
boolean removeItemFromTable = true;
final ChannelWrapper prevCW = this.channelTables.get(addrRemote);
- LOGGER.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
+ LOGGER.info("closeChannel: begin close the channel[addr={}, id={}] Found: {}", addrRemote, channel.id(), prevCW != null);
if (null == prevCW) {
- LOGGER.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+ LOGGER.info("closeChannel: the channel[addr={}, id={}] has been removed from the channel table before", addrRemote, channel.id());
removeItemFromTable = false;
} else if (prevCW.isWrapperOf(channel)) {
- LOGGER.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
- addrRemote);
+ LOGGER.info("closeChannel: the channel[addr={}, id={}] has been closed before, and has been created again, nothing to do.",
+ addrRemote, channel.id());
removeItemFromTable = false;
}
@@ -432,7 +431,7 @@ public void closeChannel(final String addr, final Channel channel) {
if (channelWrapper != null && channelWrapper.tryClose(channel)) {
this.channelTables.remove(addrRemote);
}
- LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+ LOGGER.info("closeChannel: the channel[addr={}, id={}] was removed from channel table", addrRemote, channel.id());
}
RemotingHelper.closeChannel(channel);
@@ -471,7 +470,7 @@ public void closeChannel(final Channel channel) {
}
if (null == prevCW) {
- LOGGER.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+ LOGGER.info("eventCloseChannel: the channel[addr={}, id={}] has been removed from the channel table before", RemotingHelper.parseChannelRemoteAddr(channel), channel.id());
removeItemFromTable = false;
}
@@ -480,11 +479,11 @@ public void closeChannel(final Channel channel) {
if (channelWrapper != null && channelWrapper.tryClose(channel)) {
this.channelTables.remove(addrRemote);
}
- LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+ LOGGER.info("closeChannel: the channel[addr={}, id={}] was removed from channel table", addrRemote, channel.id());
RemotingHelper.closeChannel(channel);
}
} catch (Exception e) {
- LOGGER.error("closeChannel: close the channel exception", e);
+ LOGGER.error("closeChannel: close the channel[id={}] exception", channel.id(), e);
} finally {
this.lockChannelTables.unlock();
}
@@ -521,10 +520,11 @@ public void updateNameServerAddressList(List addrs) {
this.namesrvAddrList.set(addrs);
// should close the channel if choosed addr is not exist.
- if (this.namesrvAddrChoosed.get() != null && !addrs.contains(this.namesrvAddrChoosed.get())) {
- String namesrvAddr = this.namesrvAddrChoosed.get();
+ String chosenNameServerAddr = this.namesrvAddrChoosed.get();
+ if (chosenNameServerAddr != null && !addrs.contains(chosenNameServerAddr)) {
+ namesrvAddrChoosed.compareAndSet(chosenNameServerAddr, null);
for (String addr : this.channelTables.keySet()) {
- if (addr.contains(namesrvAddr)) {
+ if (addr.contains(chosenNameServerAddr)) {
ChannelWrapper channelWrapper = this.channelTables.get(addr);
if (channelWrapper != null) {
channelWrapper.close();
@@ -562,9 +562,9 @@ public RemotingCommand invokeSync(String addr, final RemotingCommand request, lo
boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left > timeoutMillis / 4;
if (nettyClientConfig.isClientCloseSocketIfTimeout() && shouldClose) {
this.closeChannel(addr, channel);
- LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, channelRemoteAddr);
+ LOGGER.warn("invokeSync: close socket because of timeout, {}ms, channel[addr={}, id={}]", timeoutMillis, channelRemoteAddr, channel.id());
}
- LOGGER.warn("invokeSync: wait response timeout exception, the channel[{}]", channelRemoteAddr);
+ LOGGER.warn("invokeSync: wait response timeout exception, the channel[addr={}, id={}]", channelRemoteAddr, channel.id());
throw e;
}
} else {
@@ -819,10 +819,11 @@ public CompletableFuture invokeImpl(final Channel channel, final
RemotingCommand response = responseFuture.getResponseCommand();
if (response.getCode() == ResponseCode.GO_AWAY) {
if (nettyClientConfig.isEnableReconnectForGoAway()) {
+ LOGGER.info("Receive go away from channelId={}, channel={}", channel.id(), channel);
ChannelWrapper channelWrapper = channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> {
try {
- if (channelWrapper0.reconnect()) {
- LOGGER.info("Receive go away from channel {}, recreate the channel", channel0);
+ if (channelWrapper0.reconnect(channel0)) {
+ LOGGER.info("Receive go away from channelId={}, channel={}, recreate the channelId={}", channel0.id(), channel0, channelWrapper0.getChannel().id());
channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0);
}
} catch (Throwable t) {
@@ -830,10 +831,11 @@ public CompletableFuture invokeImpl(final Channel channel, final
}
return channelWrapper0;
});
- if (channelWrapper != null) {
+ if (channelWrapper != null && !channelWrapper.isWrapperOf(channel)) {
if (nettyClientConfig.isEnableTransparentRetry()) {
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
retryRequest.setBody(request.getBody());
+ retryRequest.setExtFields(request.getExtFields());
if (channelWrapper.isOK()) {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
@@ -865,6 +867,8 @@ public CompletableFuture invokeImpl(final Channel channel, final
return future;
}
}
+ } else {
+ LOGGER.warn("invokeImpl receive GO_AWAY, channelWrapper is null or channel is the same in wrapper, channelId={}", channel.id());
}
}
}
@@ -1002,7 +1006,6 @@ class ChannelWrapper {
// only affected by sync or async request, oneway is not included.
private ChannelFuture channelToClose;
private long lastResponseTime;
- private volatile long lastReconnectTimestamp = 0L;
private final String channelAddress;
public ChannelWrapper(String address, ChannelFuture channelFuture) {
@@ -1021,10 +1024,7 @@ public boolean isWritable() {
}
public boolean isWrapperOf(Channel channel) {
- if (this.channelFuture.channel() != null && this.channelFuture.channel() == channel) {
- return true;
- }
- return false;
+ return this.channelFuture.channel() != null && this.channelFuture.channel() == channel;
}
private Channel getChannel() {
@@ -1052,20 +1052,27 @@ public String getChannelAddress() {
return channelAddress;
}
- public boolean reconnect() {
+ public boolean reconnect(Channel channel) {
+ if (!isWrapperOf(channel)) {
+ LOGGER.warn("channelWrapper has reconnect, so do nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id());
+ return false;
+ }
if (lock.writeLock().tryLock()) {
try {
- if (lastReconnectTimestamp == 0L || System.currentTimeMillis() - lastReconnectTimestamp > Duration.ofSeconds(nettyClientConfig.getMaxReconnectIntervalTimeSeconds()).toMillis()) {
+ if (isWrapperOf(channel)) {
channelToClose = channelFuture;
String[] hostAndPort = getHostAndPort(channelAddress);
channelFuture = fetchBootstrap(channelAddress)
.connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
- lastReconnectTimestamp = System.currentTimeMillis();
return true;
+ } else {
+ LOGGER.warn("channelWrapper has reconnect, so do nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id());
}
} finally {
lock.writeLock().unlock();
}
+ } else {
+ LOGGER.warn("channelWrapper reconnect try lock fail, now channelId={}", getChannel().id());
}
return false;
}
@@ -1152,7 +1159,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}", remoteAddress);
+ LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}, channelId={}", remoteAddress, ctx.channel().id());
super.channelActive(ctx);
if (NettyRemotingClient.this.channelEventListener != null) {
@@ -1175,7 +1182,7 @@ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- LOGGER.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
+ LOGGER.info("NETTY CLIENT PIPELINE: CLOSE channel[addr={}, id={}]", remoteAddress, ctx.channel().id());
closeChannel(ctx.channel());
super.close(ctx, promise);
NettyRemotingClient.this.failFast(ctx.channel());
@@ -1187,7 +1194,7 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- LOGGER.info("NETTY CLIENT PIPELINE: channelInactive, the channel[{}]", remoteAddress);
+ LOGGER.info("NETTY CLIENT PIPELINE: channelInactive, the channel[addr={}, id={}]", remoteAddress, ctx.channel().id());
closeChannel(ctx.channel());
super.channelInactive(ctx);
}
@@ -1198,7 +1205,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
+ LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception channel[addr={}, id={}]", remoteAddress, ctx.channel().id());
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
@@ -1213,8 +1220,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
- LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
+ LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught channel[addr={}, id={}]", remoteAddress, ctx.channel().id(), cause);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 51f8b85009e..cbf25c23c60 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -270,8 +270,9 @@ public void run(Timeout timeout) {
*/
protected ChannelPipeline configChannel(SocketChannel ch) {
return ch.pipeline()
- .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
- .addLast(defaultEventExecutorGroup,
+ .addLast(nettyServerConfig.isServerNettyWorkerGroupEnable() ? defaultEventExecutorGroup : null,
+ HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
+ .addLast(nettyServerConfig.isServerNettyWorkerGroupEnable() ? defaultEventExecutorGroup : null,
encoder,
new NettyDecoder(),
distributionHandler,
@@ -782,16 +783,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
private void handleWithMessage(HAProxyMessage msg, Channel channel) {
try {
if (StringUtils.isNotBlank(msg.sourceAddress())) {
- channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
+ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress());
}
if (msg.sourcePort() > 0) {
- channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
+ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort()));
}
if (StringUtils.isNotBlank(msg.destinationAddress())) {
- channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
+ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress());
}
if (msg.destinationPort() > 0) {
- channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
+ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort()));
}
if (CollectionUtils.isNotEmpty(msg.tlvs())) {
msg.tlvs().forEach(tlv -> {
@@ -811,6 +812,6 @@ protected void handleHAProxyTLV(HAProxyTLV tlv, Channel channel) {
}
AttributeKey key = AttributeKeys.valueOf(
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
- channel.attr(key).set(new String(valueBytes, CharsetUtil.UTF_8));
+ RemotingHelper.setPropertyToAttr(channel, key, new String(valueBytes, CharsetUtil.UTF_8));
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index 6564404b920..664dee8371c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -36,6 +36,7 @@ public class NettyServerConfig implements Cloneable {
private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
private int serverSocketBacklog = NettySystemConfig.socketBacklog;
+ private boolean serverNettyWorkerGroupEnable = true;
private boolean serverPooledByteBufAllocatorEnable = true;
private boolean enableShutdownGracefully = false;
@@ -175,6 +176,14 @@ public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
this.writeBufferHighWaterMark = writeBufferHighWaterMark;
}
+ public boolean isServerNettyWorkerGroupEnable() {
+ return serverNettyWorkerGroupEnable;
+ }
+
+ public void setServerNettyWorkerGroupEnable(boolean serverNettyWorkerGroupEnable) {
+ this.serverNettyWorkerGroupEnable = serverNettyWorkerGroupEnable;
+ }
+
public boolean isEnableShutdownGracefully() {
return enableShutdownGracefully;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ForbiddenType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ForbiddenType.java
index 0701dc57fc5..7c561f5721a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ForbiddenType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ForbiddenType.java
@@ -37,11 +37,11 @@ public interface ForbiddenType {
*/
int TOPIC_FORBIDDEN = 3;
/**
- * 4=forbidden by brocasting mode
+ * 4=forbidden by broadcasting mode
*/
int BROADCASTING_DISABLE_FORBIDDEN = 4;
/**
- * 5=forbidden for a substription(group with a topic)
+ * 5=forbidden for a subscription(group with a topic)
*/
int SUBSCRIPTION_FORBIDDEN = 5;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index f45ff6fa484..cfc5cc22785 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -217,6 +217,7 @@ public class RequestCode {
public static final int GET_SUBSCRIPTIONGROUP_CONFIG = 352;
public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353;
+ public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354;
public static final int LITE_PULL_MESSAGE = 361;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
new file mode 100644
index 00000000000..76719ac1a24
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class CheckRocksdbCqWriteProgressResponseBody extends RemotingSerializable {
+
+ String diffResult;
+
+ public String getDiffResult() {
+ return diffResult;
+ }
+
+ public void setDiffResult(String diffResult) {
+ this.diffResult = diffResult;
+ }
+
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
new file mode 100644
index 00000000000..fee158b4976
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.header;
+
+import org.apache.rocketmq.common.action.Action;
+import org.apache.rocketmq.common.action.RocketMQAction;
+import org.apache.rocketmq.common.resource.ResourceType;
+import org.apache.rocketmq.common.resource.RocketMQResource;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+
+@RocketMQAction(value = RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, action = Action.GET)
+public class CheckRocksdbCqWriteProgressRequestHeader implements CommandCustomHeader {
+
+ @CFNotNull
+ @RocketMQResource(ResourceType.TOPIC)
+ private String topic;
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ResetOffsetRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ResetOffsetRequestHeader.java
index de9432ca515..f72fe57136c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ResetOffsetRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ResetOffsetRequestHeader.java
@@ -31,11 +31,11 @@ public class ResetOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
@RocketMQResource(ResourceType.GROUP)
- private String topic;
+ private String group;
@CFNotNull
@RocketMQResource(ResourceType.TOPIC)
- private String group;
+ private String topic;
private int queueId = -1;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java
index bca2d79d995..c8b404dd696 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java
@@ -174,6 +174,7 @@ public void operationSucceed(RemotingCommand response) {
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
rpcResponsePromise.setSuccess(new RpcResponse(response.getCode(), responseHeader, response.getBody()));
+ break;
default:
RpcResponse rpcResponse = new RpcResponse(new RpcException(response.getCode(), "unexpected remote response code"));
rpcResponsePromise.setSuccess(rpcResponse);
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index 562a5ea2a33..f6c5b3f54d6 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
diff --git a/store/pom.xml b/store/pom.xml
index 6de01626772..d49de5ae267 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index 3dbc274ef00..d9cd602a65c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -132,7 +132,7 @@ public void shutdown() {
super.shutdown(true);
for (AllocateRequest req : this.requestTable.values()) {
if (req.mappedFile != null) {
- log.info("delete pre allocated maped file, {}", req.mappedFile.getFileName());
+ log.info("delete pre allocated mapped file, {}", req.mappedFile.getFileName());
req.mappedFile.destroy(1000);
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index f707d8fbd87..972e71aadd8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -61,6 +61,7 @@
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
import org.apache.rocketmq.store.util.LibC;
import org.rocksdb.RocksDBException;
@@ -1834,12 +1835,13 @@ class DefaultAppendMessageCallback implements AppendMessageCallback {
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
// Store the message content
private final ByteBuffer msgStoreItemMemory;
- private final int crc32ReservedLength = enabledAppendPropCRC ? CommitLog.CRC32_RESERVED_LEN : 0;
+ private final int crc32ReservedLength;
private final MessageStoreConfig messageStoreConfig;
DefaultAppendMessageCallback(MessageStoreConfig messageStoreConfig) {
this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
this.messageStoreConfig = messageStoreConfig;
+ this.crc32ReservedLength = messageStoreConfig.isEnabledAppendPropCRC() ? CommitLog.CRC32_RESERVED_LEN : 0;
}
public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer,
@@ -1902,7 +1904,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
- boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner);
+ final boolean isMultiDispatchMsg = CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner);
if (isMultiDispatchMsg) {
AppendMessageResult appendMessageResult = handlePropertiesForLmqMsg(preEncodeBuffer, msgInner);
if (appendMessageResult != null) {
@@ -2243,8 +2245,9 @@ public FlushManager getFlushManager() {
return flushManager;
}
- public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) {
- return StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+ public static boolean isMultiDispatchMsg(MessageStoreConfig messageStoreConfig, MessageExtBrokerInner msg) {
+ return StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) &&
+ MultiDispatchUtils.isNeedHandleMultiDispatch(messageStoreConfig, msg.getTopic());
}
private boolean isCloseReadAhead() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index f159c31a7be..8b46c7f5ce4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -163,11 +163,13 @@ public class DefaultMessageStore implements MessageStore {
private volatile boolean shutdown = true;
protected boolean notifyMessageArriveInBatch = false;
- private StoreCheckpoint storeCheckpoint;
+ protected StoreCheckpoint storeCheckpoint;
private TimerMessageStore timerMessageStore;
private final LinkedList dispatcherList;
+ private RocksDBMessageStore rocksDBMessageStore;
+
private RandomAccessFile lockFile;
private FileLock lock;
@@ -354,12 +356,7 @@ public boolean load() {
}
if (result) {
- this.storeCheckpoint =
- new StoreCheckpoint(
- StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
- this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset();
- setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset());
-
+ loadCheckPoint();
result = this.indexService.load(lastExitOK);
this.recover(lastExitOK);
LOGGER.info("message store recover end, and the max phy offset = {}", this.getMaxPhyOffset());
@@ -381,6 +378,14 @@ public boolean load() {
return result;
}
+ public void loadCheckPoint() throws IOException {
+ this.storeCheckpoint =
+ new StoreCheckpoint(
+ StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
+ this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset();
+ setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset());
+ }
+
/**
* @throws Exception
*/
@@ -511,6 +516,10 @@ public void shutdown() {
this.compactionService.shutdown();
}
+ if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
+ this.rocksDBMessageStore.consumeQueueStore.shutdown();
+ }
+
this.flushConsumeQueueService.shutdown();
this.allocateMappedFileService.shutdown();
this.storeCheckpoint.flush();
@@ -985,7 +994,7 @@ public long getMaxOffsetInQueue(String topic, int queueId) {
@Override
public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {
if (committed) {
- ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId);
+ ConsumeQueueInterface logic = this.getConsumeQueue(topic, queueId);
if (logic != null) {
return logic.getMaxOffsetInQueue();
}
@@ -1021,7 +1030,7 @@ public void setTimerMessageStore(TimerMessageStore timerMessageStore) {
@Override
public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
- ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId);
+ ConsumeQueueInterface consumeQueue = getConsumeQueue(topic, queueId);
if (consumeQueue != null) {
CqUnit cqUnit = consumeQueue.get(consumeQueueOffset);
if (cqUnit != null) {
@@ -1157,7 +1166,7 @@ public boolean getLastMappedFile(long startOffset) {
@Override
public long getEarliestMessageTime(String topic, int queueId) {
- ConsumeQueueInterface logicQueue = this.findConsumeQueue(topic, queueId);
+ ConsumeQueueInterface logicQueue = this.getConsumeQueue(topic, queueId);
if (logicQueue != null) {
Pair pair = logicQueue.getEarliestUnitAndStoreTime();
if (pair != null && pair.getObject2() != null) {
@@ -1189,7 +1198,7 @@ public long getEarliestMessageTime() {
@Override
public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
- ConsumeQueueInterface logicQueue = this.findConsumeQueue(topic, queueId);
+ ConsumeQueueInterface logicQueue = this.getConsumeQueue(topic, queueId);
if (logicQueue != null) {
Pair pair = logicQueue.getCqUnitAndStoreTime(consumeQueueOffset);
if (pair != null && pair.getObject2() != null) {
@@ -1207,12 +1216,12 @@ public CompletableFuture getMessageStoreTimeStampAsync(String topic, int q
@Override
public long getMessageTotalInQueue(String topic, int queueId) {
- ConsumeQueueInterface logicQueue = this.findConsumeQueue(topic, queueId);
+ ConsumeQueueInterface logicQueue = this.getConsumeQueue(topic, queueId);
if (logicQueue != null) {
return logicQueue.getMessageTotalInQueue();
}
- return -1;
+ return 0;
}
@Override
@@ -1496,7 +1505,7 @@ public boolean checkInDiskByConsumeOffset(final String topic, final int queueId,
final long maxOffsetPy = this.commitLog.getMaxOffset();
- ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId);
+ ConsumeQueueInterface consumeQueue = getConsumeQueue(topic, queueId);
if (consumeQueue != null) {
CqUnit cqUnit = consumeQueue.get(consumeOffset);
@@ -1512,7 +1521,7 @@ public boolean checkInDiskByConsumeOffset(final String topic, final int queueId,
@Override
public boolean checkInMemByConsumeOffset(final String topic, final int queueId, long consumeOffset, int batchSize) {
- ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId);
+ ConsumeQueueInterface consumeQueue = getConsumeQueue(topic, queueId);
if (consumeQueue != null) {
CqUnit firstCQItem = consumeQueue.get(consumeOffset);
if (firstCQItem == null) {
@@ -3251,6 +3260,17 @@ public HARuntimeInfo getHARuntimeInfo() {
}
}
+ public void enableRocksdbCQWrite() {
+ try {
+ RocksDBMessageStore store = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, this.topicConfigTable);
+ this.rocksDBMessageStore = store;
+ store.loadAndStartConsumerServiceOnly();
+ addDispatcher(store.getDispatcherBuildRocksdbConsumeQueue());
+ } catch (Exception e) {
+ LOGGER.error("enableRocksdbCqWrite error", e);
+ }
+ }
+
public int getMaxDelayLevel() {
return maxDelayLevel;
}
@@ -3338,4 +3358,12 @@ public boolean isTransientStorePoolEnable() {
public long getReputFromOffset() {
return this.reputMessageService.getReputFromOffset();
}
+
+ public RocksDBMessageStore getRocksDBMessageStore() {
+ return this.rocksDBMessageStore;
+ }
+
+ public ConsumeQueueStoreInterface getConsumeQueueStore() {
+ return consumeQueueStore;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
index 20e9a652b7e..5c74918d9e6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
@@ -175,7 +175,7 @@ public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner msgInner)
public PutMessageResult encode(MessageExtBrokerInner msgInner) {
this.byteBuf.clear();
- if (messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner)) {
+ if (CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner)) {
return encodeWithoutProperties(msgInner);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
index 6141b778bf7..21f8d45c9d9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
@@ -16,16 +16,16 @@
*/
package org.apache.rocketmq.store;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
-
-import io.opentelemetry.api.common.AttributesBuilder;
-import io.opentelemetry.api.metrics.Meter;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
@@ -39,6 +39,8 @@
public class RocksDBMessageStore extends DefaultMessageStore {
+ private CommitLogDispatcherBuildRocksdbConsumeQueue dispatcherBuildRocksdbConsumeQueue;
+
public RocksDBMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig, final ConcurrentMap topicConfigTable) throws
IOException {
@@ -166,16 +168,46 @@ public void run() {
}
}
- @Override
- public long estimateMessageCount(String topic, int queueId, long from, long to, MessageFilter filter) {
- // todo
- return 0;
- }
-
@Override
public void initMetrics(Meter meter, Supplier attributesBuilderSupplier) {
DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, this);
// Also add some metrics for rocksdb's monitoring.
RocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, this);
}
+
+ public CommitLogDispatcherBuildRocksdbConsumeQueue getDispatcherBuildRocksdbConsumeQueue() {
+ return dispatcherBuildRocksdbConsumeQueue;
+ }
+
+ class CommitLogDispatcherBuildRocksdbConsumeQueue implements CommitLogDispatcher {
+ @Override
+ public void dispatch(DispatchRequest request) throws RocksDBException {
+ final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
+ switch (tranType) {
+ case MessageSysFlag.TRANSACTION_NOT_TYPE:
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+ putMessagePositionInfo(request);
+ break;
+ case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+ break;
+ }
+ }
+ }
+
+ public void loadAndStartConsumerServiceOnly() {
+ try {
+ this.dispatcherBuildRocksdbConsumeQueue = new CommitLogDispatcherBuildRocksdbConsumeQueue();
+ boolean loadResult = this.consumeQueueStore.load();
+ if (!loadResult) {
+ throw new RuntimeException("load consume queue failed");
+ }
+ super.loadCheckPoint();
+ this.consumeQueueStore.start();
+ } catch (Exception e) {
+ ERROR_LOG.error("loadAndStartConsumerServiceOnly error", e);
+ throw new RuntimeException(e);
+ }
+ }
+
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 0b45d92418e..5195868e0f1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -424,6 +424,53 @@ public class MessageStoreConfig {
private boolean putConsumeQueueDataByFileChannel = true;
+ private boolean transferOffsetJsonToRocksdb = false;
+
+ private boolean rocksdbCQDoubleWriteEnable = false;
+
+ private int batchWriteKvCqSize = 16;
+
+ /**
+ * If ConsumeQueueStore is RocksDB based, this option is to configure bottom-most tier compression type.
+ * The following values are valid:
+ *
+ * - snappy
+ * - z
+ * - bzip2
+ * - lz4
+ * - lz4hc
+ * - xpress
+ * - zstd
+ *
+ *
+ * LZ4 is the recommended one.
+ */
+ private String bottomMostCompressionTypeForConsumeQueueStore = "zstd";
+
+ public int getBatchWriteKvCqSize() {
+ return batchWriteKvCqSize;
+ }
+
+ public void setBatchWriteKvCqSize(int batchWriteKvCqSize) {
+ this.batchWriteKvCqSize = batchWriteKvCqSize;
+ }
+
+ public boolean isRocksdbCQDoubleWriteEnable() {
+ return rocksdbCQDoubleWriteEnable;
+ }
+
+ public void setRocksdbCQDoubleWriteEnable(boolean rocksdbWriteEnable) {
+ this.rocksdbCQDoubleWriteEnable = rocksdbWriteEnable;
+ }
+
+ public boolean isTransferOffsetJsonToRocksdb() {
+ return transferOffsetJsonToRocksdb;
+ }
+
+ public void setTransferOffsetJsonToRocksdb(boolean transferOffsetJsonToRocksdb) {
+ this.transferOffsetJsonToRocksdb = transferOffsetJsonToRocksdb;
+ }
+
public boolean isEnabledAppendPropCRC() {
return enabledAppendPropCRC;
}
@@ -1854,4 +1901,11 @@ public void setTransferMetadataJsonToRocksdb(boolean transferMetadataJsonToRocks
this.transferMetadataJsonToRocksdb = transferMetadataJsonToRocksdb;
}
+ public String getBottomMostCompressionTypeForConsumeQueueStore() {
+ return bottomMostCompressionTypeForConsumeQueueStore;
+ }
+
+ public void setBottomMostCompressionTypeForConsumeQueueStore(String bottomMostCompressionTypeForConsumeQueueStore) {
+ this.bottomMostCompressionTypeForConsumeQueueStore = bottomMostCompressionTypeForConsumeQueueStore;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 2f2ce981257..2401257c306 100644
--- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -661,4 +661,8 @@ public void recoverTopicQueueTable() {
public void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest) {
next.notifyMessageArriveIfNecessary(dispatchRequest);
}
+
+ public MessageStore getNext() {
+ return next;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java b/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java
index b8865fd9195..34f5cb142b6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java
@@ -109,6 +109,7 @@ public String toString() {
", size=" + size +
", pos=" + pos +
", batchNum=" + batchNum +
+ ", tagsCode=" + tagsCode +
", compactedOffset=" + compactedOffset +
'}';
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 5a981bb4df1..83ba7bebad0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -18,7 +18,6 @@
import java.nio.ByteBuffer;
import java.util.List;
-
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.attribute.CQType;
@@ -223,10 +222,47 @@ public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, Message
@Override
public long estimateMessageCount(long from, long to, MessageFilter filter) {
- // todo
- return 0;
+ // Check from and to offset validity
+ Pair fromUnit = getCqUnitAndStoreTime(from);
+ if (fromUnit == null) {
+ return -1;
+ }
+
+ if (from >= to) {
+ return -1;
+ }
+
+ if (to > getMaxOffsetInQueue()) {
+ to = getMaxOffsetInQueue();
+ }
+
+ int maxSampleSize = messageStore.getMessageStoreConfig().getMaxConsumeQueueScan();
+ int sampleSize = to - from > maxSampleSize ? maxSampleSize : (int) (to - from);
+
+ int matchThreshold = messageStore.getMessageStoreConfig().getSampleCountThreshold();
+ int matchSize = 0;
+
+ for (int i = 0; i < sampleSize; i++) {
+ long index = from + i;
+ Pair pair = getCqUnitAndStoreTime(index);
+ if (pair == null) {
+ continue;
+ }
+ CqUnit cqUnit = pair.getObject1();
+ if (filter.isMatchedByConsumeQueue(cqUnit.getTagsCode(), cqUnit.getCqExtUnit())) {
+ matchSize++;
+ // if matchSize is plenty, early exit estimate
+ if (matchSize > matchThreshold) {
+ sampleSize = i;
+ break;
+ }
+ }
+ }
+ // Make sure the second half is a floating point number, otherwise it will be truncated to 0
+ return sampleSize == 0 ? 0 : (long) ((to - from) * (matchSize / (sampleSize * 1.0)));
}
+
@Override
public long getMinOffsetInQueue() {
return this.messageStore.getMinOffsetInQueue(this.topic, this.queueId);
@@ -311,7 +347,7 @@ public CqUnit getEarliestUnit() {
public CqUnit getLatestUnit() {
try {
long maxOffset = this.messageStore.getQueueStore().getMaxOffsetInQueue(topic, queueId);
- return get(maxOffset);
+ return get(maxOffset > 0 ? maxOffset - 1 : maxOffset);
} catch (RocksDBException e) {
ERROR_LOG.error("getLatestUnit Failed. topic: {}, queueId: {}, {}", topic, queueId, e.getMessage());
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 3c6b91ec018..17b845d8176 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -28,7 +28,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BoundaryType;
@@ -56,7 +55,7 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore {
public static final byte CTRL_1 = '\u0001';
public static final byte CTRL_2 = '\u0002';
- private static final int BATCH_SIZE = 16;
+ private final int batchSize;
public static final int MAX_KEY_LEN = 300;
private final ScheduledExecutorService scheduledExecutorService;
@@ -82,15 +81,16 @@ public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
super(messageStore);
this.storePath = StorePathConfigHelper.getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir());
- this.rocksDBStorage = new ConsumeQueueRocksDBStorage(messageStore, storePath, 4);
+ this.rocksDBStorage = new ConsumeQueueRocksDBStorage(messageStore, storePath);
this.rocksDBConsumeQueueTable = new RocksDBConsumeQueueTable(rocksDBStorage, messageStore);
this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, messageStore);
this.writeBatch = new WriteBatch();
- this.bufferDRList = new ArrayList(BATCH_SIZE);
- this.cqBBPairList = new ArrayList(BATCH_SIZE);
- this.offsetBBPairList = new ArrayList(BATCH_SIZE);
- for (int i = 0; i < BATCH_SIZE; i++) {
+ this.batchSize = messageStoreConfig.getBatchWriteKvCqSize();
+ this.bufferDRList = new ArrayList<>(batchSize);
+ this.cqBBPairList = new ArrayList<>(batchSize);
+ this.offsetBBPairList = new ArrayList<>(batchSize);
+ for (int i = 0; i < batchSize; i++) {
this.cqBBPairList.add(RocksDBConsumeQueueTable.getCQByteBufferPair());
this.offsetBBPairList.add(RocksDBConsumeQueueOffsetTable.getOffsetByteBufferPair());
}
@@ -164,9 +164,10 @@ private boolean shutdownInner() {
@Override
public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksDBException {
- if (request == null || this.bufferDRList.size() >= BATCH_SIZE) {
+ if (request == null || this.bufferDRList.size() >= batchSize) {
putMessagePosition();
}
+
if (request != null) {
this.bufferDRList.add(request);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java
index c7d35fa8c0c..194bd4cca5f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java
@@ -185,6 +185,39 @@ public long binarySearchInCQByTime(String topic, int queueId, long high, long lo
long result = -1L;
long targetOffset = -1L, leftOffset = -1L, rightOffset = -1L;
long ceiling = high, floor = low;
+ // Handle the following corner cases first:
+ // 1. store time of (high) < timestamp
+ ByteBuffer buffer = getCQInKV(topic, queueId, ceiling);
+ if (buffer != null) {
+ long storeTime = buffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
+ if (storeTime < timestamp) {
+ switch (boundaryType) {
+ case LOWER:
+ return ceiling + 1;
+ case UPPER:
+ return ceiling;
+ default:
+ log.warn("Unknown boundary type");
+ break;
+ }
+ }
+ }
+ // 2. store time of (low) > timestamp
+ buffer = getCQInKV(topic, queueId, floor);
+ if (buffer != null) {
+ long storeTime = buffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
+ if (storeTime > timestamp) {
+ switch (boundaryType) {
+ case LOWER:
+ return floor;
+ case UPPER:
+ return 0;
+ default:
+ log.warn("Unknown boundary type");
+ break;
+ }
+ }
+ }
while (high >= low) {
long midOffset = low + ((high - low) >>> 1);
ByteBuffer byteBuffer = getCQInKV(topic, queueId, midOffset);
diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
index 362684560c8..b343a5b4b50 100644
--- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
+++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
@@ -16,53 +16,45 @@
*/
package org.apache.rocketmq.store.rocksdb;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
-import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.store.MessageStore;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.CompactRangeOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
-import org.rocksdb.WriteOptions;
public class ConsumeQueueRocksDBStorage extends AbstractRocksDBStorage {
+
+ public static final byte[] OFFSET_COLUMN_FAMILY = "offset".getBytes(StandardCharsets.UTF_8);
+
private final MessageStore messageStore;
private volatile ColumnFamilyHandle offsetCFHandle;
- public ConsumeQueueRocksDBStorage(final MessageStore messageStore, final String dbPath, final int prefixLen) {
+ public ConsumeQueueRocksDBStorage(final MessageStore messageStore, final String dbPath) {
+ super(dbPath);
this.messageStore = messageStore;
- this.dbPath = dbPath;
this.readOnly = false;
}
- private void initOptions() {
+ protected void initOptions() {
this.options = RocksDBOptionsFactory.createDBOptions();
+ super.initOptions();
+ }
- this.writeOptions = new WriteOptions();
- this.writeOptions.setSync(false);
- this.writeOptions.setDisableWAL(true);
- this.writeOptions.setNoSlowdown(true);
-
+ @Override
+ protected void initTotalOrderReadOptions() {
this.totalOrderReadOptions = new ReadOptions();
this.totalOrderReadOptions.setPrefixSameAsStart(false);
this.totalOrderReadOptions.setTotalOrderSeek(false);
-
- this.compactRangeOptions = new CompactRangeOptions();
- this.compactRangeOptions.setBottommostLevelCompaction(CompactRangeOptions.BottommostLevelCompaction.kForce);
- this.compactRangeOptions.setAllowWriteStall(true);
- this.compactRangeOptions.setExclusiveManualCompaction(false);
- this.compactRangeOptions.setChangeLevel(true);
- this.compactRangeOptions.setTargetLevel(-1);
- this.compactRangeOptions.setMaxSubcompactions(4);
}
@Override
@@ -72,7 +64,7 @@ protected boolean postLoad() {
initOptions();
- final List cfDescriptors = new ArrayList();
+ final List cfDescriptors = new ArrayList<>();
ColumnFamilyOptions cqCfOptions = RocksDBOptionsFactory.createCQCFOptions(this.messageStore);
this.cfOptions.add(cqCfOptions);
@@ -80,11 +72,8 @@ protected boolean postLoad() {
ColumnFamilyOptions offsetCfOptions = RocksDBOptionsFactory.createOffsetCFOptions();
this.cfOptions.add(offsetCfOptions);
- cfDescriptors.add(new ColumnFamilyDescriptor("offset".getBytes(DataConverter.CHARSET_UTF8), offsetCfOptions));
-
- final List cfHandles = new ArrayList();
- open(cfDescriptors, cfHandles);
-
+ cfDescriptors.add(new ColumnFamilyDescriptor(OFFSET_COLUMN_FAMILY, offsetCfOptions));
+ open(cfDescriptors);
this.defaultCFHandle = cfHandles.get(0);
this.offsetCFHandle = cfHandles.get(1);
} catch (final Exception e) {
@@ -130,4 +119,4 @@ public RocksIterator seekOffsetCF() {
public ColumnFamilyHandle getOffsetCFHandle() {
return this.offsetCFHandle;
}
-}
\ No newline at end of file
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index a3a99d3346c..d373ba6249c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.store.rocksdb;
-import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
+import org.apache.rocketmq.common.config.ConfigHelper;
import org.apache.rocketmq.store.MessageStore;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
@@ -65,13 +65,16 @@ public static ColumnFamilyOptions createCQCFOptions(final MessageStore messageSt
setMaxMergeWidth(Integer.MAX_VALUE).
setStopStyle(CompactionStopStyle.CompactionStopStyleTotalSize).
setCompressionSizePercent(-1);
+ String bottomMostCompressionTypeOpt = messageStore.getMessageStoreConfig()
+ .getBottomMostCompressionTypeForConsumeQueueStore();
+ CompressionType bottomMostCompressionType = CompressionType.getCompressionType(bottomMostCompressionTypeOpt);
return columnFamilyOptions.setMaxWriteBufferNumber(4).
setWriteBufferSize(128 * SizeUnit.MB).
setMinWriteBufferNumberToMerge(1).
setTableFormatConfig(blockBasedTableConfig).
setMemTableConfig(new SkipListMemTableConfig()).
setCompressionType(CompressionType.LZ4_COMPRESSION).
- setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION).
+ setBottommostCompressionType(bottomMostCompressionType).
setNumLevels(7).
setCompactionStyle(CompactionStyle.UNIVERSAL).
setCompactionOptionsUniversal(compactionOption).
@@ -134,7 +137,7 @@ public static DBOptions createDBOptions() {
Statistics statistics = new Statistics();
statistics.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
return options.
- setDbLogDir(ConfigRocksDBStorage.getDBLogDir()).
+ setDbLogDir(ConfigHelper.getDBLogDir()).
setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery).
setManualWalFlush(true).
@@ -144,9 +147,9 @@ public static DBOptions createDBOptions() {
setCreateIfMissing(true).
setCreateMissingColumnFamilies(true).
setMaxOpenFiles(-1).
- setMaxLogFileSize(1 * SizeUnit.GB).
+ setMaxLogFileSize(SizeUnit.GB).
setKeepLogFileNum(5).
- setMaxManifestFileSize(1 * SizeUnit.GB).
+ setMaxManifestFileSize(SizeUnit.GB).
setAllowConcurrentMemtableWrite(false).
setStatistics(statistics).
setAtomicFlush(true).
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/PerfCounter.java b/store/src/main/java/org/apache/rocketmq/store/util/PerfCounter.java
index e2a55d63994..99649398a83 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/PerfCounter.java
+++ b/store/src/main/java/org/apache/rocketmq/store/util/PerfCounter.java
@@ -356,7 +356,7 @@ public void run() {
}
} catch (Exception e) {
- logger.error("{} get unknown errror", getServiceName(), e);
+ logger.error("{} get unknown error", getServiceName(), e);
try {
Thread.sleep(1000);
} catch (Throwable ignored) {
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
index c3c8be52ddd..bf3b1eeca83 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
@@ -22,6 +22,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -31,6 +32,7 @@
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Assert;
@@ -84,7 +86,26 @@ messageStoreConfig, new BrokerStatsManager(brokerConfig),
return master;
}
- protected void putMsg(DefaultMessageStore messageStore) throws Exception {
+ protected RocksDBMessageStore genRocksdbMessageStore() throws Exception {
+ MessageStoreConfig messageStoreConfig = buildStoreConfig(
+ COMMIT_LOG_FILE_SIZE, CQ_FILE_SIZE, true, CQ_EXT_FILE_SIZE
+ );
+
+ BrokerConfig brokerConfig = new BrokerConfig();
+
+ RocksDBMessageStore master = new RocksDBMessageStore(
+ messageStoreConfig, new BrokerStatsManager(brokerConfig),
+ (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
+ }, brokerConfig, new ConcurrentHashMap<>());
+
+ assertThat(master.load()).isTrue();
+
+ master.start();
+
+ return master;
+ }
+
+ protected void putMsg(MessageStore messageStore) {
int totalMsgs = 200;
for (int i = 0; i < totalMsgs; i++) {
MessageExtBrokerInner message = buildMessage();
@@ -184,9 +205,33 @@ public void testIterator() throws Exception {
@Test
public void testEstimateMessageCountInEmptyConsumeQueue() {
- DefaultMessageStore master = null;
+ DefaultMessageStore messageStore = null;
+ try {
+ messageStore = gen();
+ doTestEstimateMessageCountInEmptyConsumeQueue(messageStore);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ }
+
+ @Test
+ public void testEstimateRocksdbMessageCountInEmptyConsumeQueue() {
+ if (notExecuted()) {
+ return;
+ }
+ DefaultMessageStore messageStore = null;
+ try {
+ messageStore = genRocksdbMessageStore();
+ doTestEstimateMessageCountInEmptyConsumeQueue(messageStore);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ }
+
+ public void doTestEstimateMessageCountInEmptyConsumeQueue(MessageStore master) {
try {
- master = gen();
ConsumeQueueInterface consumeQueue = master.findConsumeQueue(TOPIC, QUEUE_ID);
MessageFilter filter = new MessageFilter() {
@Override
@@ -219,16 +264,34 @@ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map pr
}
}
+ @Test
+ public void testEstimateRocksdbMessageCount() {
+ if (notExecuted()) {
+ return;
+ }
+ DefaultMessageStore messageStore = null;
+ try {
+ messageStore = genRocksdbMessageStore();
+ doTestEstimateMessageCount(messageStore);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ }
+
@Test
public void testEstimateMessageCount() {
DefaultMessageStore messageStore = null;
try {
messageStore = gen();
+ doTestEstimateMessageCount(messageStore);
} catch (Exception e) {
e.printStackTrace();
assertThat(Boolean.FALSE).isTrue();
}
+ }
+ public void doTestEstimateMessageCount(MessageStore messageStore) {
try {
try {
putMsg(messageStore);
@@ -265,15 +328,34 @@ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map pr
}
}
+ @Test
+ public void testEstimateRocksdbMessageCountSample() {
+ if (notExecuted()) {
+ return;
+ }
+ DefaultMessageStore messageStore = null;
+ try {
+ messageStore = genRocksdbMessageStore();
+ doTestEstimateMessageCountSample(messageStore);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ }
+
@Test
public void testEstimateMessageCountSample() {
DefaultMessageStore messageStore = null;
try {
messageStore = gen();
+ doTestEstimateMessageCountSample(messageStore);
} catch (Exception e) {
e.printStackTrace();
assertThat(Boolean.FALSE).isTrue();
}
+ }
+
+ public void doTestEstimateMessageCountSample(MessageStore messageStore) {
try {
try {
@@ -303,4 +385,8 @@ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map pr
UtilAll.deleteFile(new File(STORE_PATH));
}
}
+
+ private boolean notExecuted() {
+ return MixAll.isMac();
+ }
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTableTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTableTest.java
new file mode 100644
index 00000000000..d06b6da2fbd
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTableTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.queue;
+
+import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.rocksdb.ConsumeQueueRocksDBStorage;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+import org.rocksdb.RocksDBException;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.rocketmq.store.queue.RocksDBConsumeQueueTable.CQ_UNIT_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+public class RocksDBConsumeQueueTableTest {
+
+ @Test
+ public void testBinarySearchInCQByTime() throws RocksDBException {
+ if (MixAll.isMac()) {
+ return;
+ }
+ ConsumeQueueRocksDBStorage rocksDBStorage = mock(ConsumeQueueRocksDBStorage.class);
+ DefaultMessageStore store = mock(DefaultMessageStore.class);
+ RocksDBConsumeQueueTable table = new RocksDBConsumeQueueTable(rocksDBStorage, store);
+ doAnswer((Answer) mock -> {
+ /*
+ * queueOffset timestamp
+ * 100 1000
+ * 200 2000
+ * 201 2010
+ * 1000 10000
+ */
+ byte[] keyBytes = mock.getArgument(0);
+ ByteBuffer keyBuffer = ByteBuffer.wrap(keyBytes);
+ int len = keyBuffer.getInt(0);
+ long offset = keyBuffer.getLong(4 + 1 + len + 1 + 4 + 1);
+ long phyOffset = offset;
+ long timestamp = offset * 10;
+ final ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_UNIT_SIZE);
+ byteBuffer.putLong(phyOffset);
+ byteBuffer.putInt(1);
+ byteBuffer.putLong(0);
+ byteBuffer.putLong(timestamp);
+ return byteBuffer.array();
+ }).when(rocksDBStorage).getCQ(any());
+ assertEquals(1001, table.binarySearchInCQByTime("topic", 0, 1000, 100, 20000, 0, BoundaryType.LOWER));
+ assertEquals(1000, table.binarySearchInCQByTime("topic", 0, 1000, 100, 20000, 0, BoundaryType.UPPER));
+ assertEquals(100, table.binarySearchInCQByTime("topic", 0, 1000, 100, 1, 0, BoundaryType.LOWER));
+ assertEquals(0, table.binarySearchInCQByTime("topic", 0, 1000, 100, 1, 0, BoundaryType.UPPER));
+ assertEquals(201, table.binarySearchInCQByTime("topic", 0, 1000, 100, 2001, 0, BoundaryType.LOWER));
+ assertEquals(200, table.binarySearchInCQByTime("topic", 0, 1000, 100, 2001, 0, BoundaryType.UPPER));
+ assertEquals(200, table.binarySearchInCQByTime("topic", 0, 1000, 100, 2000, 0, BoundaryType.LOWER));
+ assertEquals(200, table.binarySearchInCQByTime("topic", 0, 1000, 100, 2000, 0, BoundaryType.UPPER));
+ }
+}
\ No newline at end of file
diff --git a/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
new file mode 100644
index 00000000000..1d7273968f6
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.rocksdb;
+
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.CompressionType;
+
+public class RocksDBOptionsFactoryTest {
+
+ @Test
+ public void testBottomMostCompressionType() {
+ MessageStoreConfig config = new MessageStoreConfig();
+ Assert.assertEquals(CompressionType.ZSTD_COMPRESSION,
+ CompressionType.getCompressionType(config.getBottomMostCompressionTypeForConsumeQueueStore()));
+ Assert.assertEquals(CompressionType.LZ4_COMPRESSION, CompressionType.getCompressionType("lz4"));
+ }
+}
diff --git a/test/pom.xml b/test/pom.xml
index df380a0b604..801a10301eb 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -20,7 +20,7 @@
rocketmq-all
org.apache.rocketmq
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java
index 2a596197441..7ac5ec39786 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java
@@ -26,8 +26,8 @@ public class RMQBroadCastConsumer extends RMQNormalConsumer {
private static Logger logger = LoggerFactory.getLogger(RMQBroadCastConsumer.class);
public RMQBroadCastConsumer(String nsAddr, String topic, String subExpression,
- String consumerGroup, AbstractListener listner) {
- super(nsAddr, topic, subExpression, consumerGroup, listner);
+ String consumerGroup, AbstractListener listener) {
+ super(nsAddr, topic, subExpression, consumerGroup, listener);
}
@Override
diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java
index 5681ecc841a..22193bb4ba9 100644
--- a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java
@@ -69,8 +69,8 @@ public AbstractListener getListener() {
return listener;
}
- public void setListener(AbstractListener listner) {
- this.listener = listner;
+ public void setListener(AbstractListener listener) {
+ this.listener = listener;
}
public String getNsAddr() {
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index b64cda33420..472e106ce35 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -100,8 +100,8 @@ public class BaseConf {
brokerController2.getBrokerConfig().getListenPort());
brokerController3 = IntegrationTestBase.createAndStartBroker(NAMESRV_ADDR);
- log.debug("Broker {} started, listening: {}", brokerController2.getBrokerConfig().getBrokerName(),
- brokerController2.getBrokerConfig().getListenPort());
+ log.debug("Broker {} started, listening: {}", brokerController3.getBrokerConfig().getBrokerName(),
+ brokerController3.getBrokerConfig().getListenPort());
CLUSTER_NAME = brokerController1.getBrokerConfig().getBrokerClusterName();
BROKER1_NAME = brokerController1.getBrokerConfig().getBrokerName();
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 2217936929c..fde991ad13d 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -136,6 +136,8 @@ public static BrokerController createAndStartBroker(String nsAddr) {
brokerConfig.setNamesrvAddr(nsAddr);
brokerConfig.setEnablePropertyFilter(true);
brokerConfig.setEnableCalcFilterBitMap(true);
+ brokerConfig.setAppendAckAsync(true);
+ brokerConfig.setAppendCkAsync(true);
storeConfig.setEnableConsumeQueueExt(true);
brokerConfig.setLoadBalancePollNameServerInterval(500);
storeConfig.setStorePathRootDir(baseDir);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
index 684b718ae5d..7408a092c4b 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
@@ -96,6 +96,8 @@ public void test3ConsumerAndCrashOne() {
MQWait.waitConsumeAll(CONSUME_TIME, producer.getAllMsgBody(), consumer1.getListener(),
consumer2.getListener(), consumer3.getListener());
consumer3.shutdown();
+ TestUtils.waitForSeconds(WAIT_TIME);
+
producer.clearMsg();
consumer1.clearMsg();
consumer2.clearMsg();
diff --git a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
index 9004b91db39..9e9afb1ed2c 100644
--- a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
@@ -17,13 +17,16 @@
package org.apache.rocketmq.test.route;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
+import org.junit.Ignore;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
public class CreateAndUpdateTopicIT extends BaseConf {
@@ -47,6 +50,8 @@ public void testCreateOrUpdateTopic_EnableSingleTopicRegistration() {
}
+ // Temporarily ignore the fact that this test cannot pass in the integration test pipeline due to unknown reasons
+ @Ignore
@Test
public void testDeleteTopicFromNameSrvWithBrokerRegistration() {
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true);
@@ -60,11 +65,9 @@ public void testDeleteTopicFromNameSrvWithBrokerRegistration() {
boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic1, 8, null);
assertThat(createResult).isTrue();
-
createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic2, 8, null);
assertThat(createResult).isTrue();
-
TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2);
assertThat(route.getBrokerDatas()).hasSize(3);
@@ -73,11 +76,13 @@ public void testDeleteTopicFromNameSrvWithBrokerRegistration() {
// Deletion is lazy, trigger broker registration
brokerController1.registerBrokerAll(false, false, true);
- // The route info of testTopic2 will be removed from broker1 after the registration
- route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2);
- assertThat(route.getBrokerDatas()).hasSize(2);
- assertThat(route.getQueueDatas().get(0).getBrokerName()).isEqualTo(BROKER2_NAME);
- assertThat(route.getQueueDatas().get(1).getBrokerName()).isEqualTo(BROKER3_NAME);
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ // The route info of testTopic2 will be removed from broker1 after the registration
+ TopicRouteData finalRoute = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2);
+ return finalRoute.getBrokerDatas().size() == 2
+ && finalRoute.getQueueDatas().get(0).getBrokerName().equals(BROKER2_NAME)
+ && finalRoute.getQueueDatas().get(1).getBrokerName().equals(BROKER3_NAME);
+ });
brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
diff --git a/tieredstore/pom.xml b/tieredstore/pom.xml
index 96f042da21b..4d9af208187 100644
--- a/tieredstore/pom.xml
+++ b/tieredstore/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 7b63e16696e..0e3ede871c3 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -180,9 +180,15 @@ public boolean fetchFromCurrentStore(String topic, int queueId, long offset, int
}
// determine whether tiered storage path conditions are met
- if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)
- && !next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
- return true;
+ if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)) {
+ // return true to read from tiered storage if the CommitLog is empty
+ if (next != null && next.getCommitLog() != null &&
+ next.getCommitLog().getMinOffset() < 0L) {
+ return true;
+ }
+ if (!next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
+ return true;
+ }
}
if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_MEM)
@@ -208,10 +214,10 @@ public CompletableFuture getMessageAsync(String group, String
}
if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
- log.trace("GetMessageAsync from current store, " +
+ log.trace("GetMessageAsync from remote store, " +
"topic: {}, queue: {}, offset: {}, maxCount: {}", topic, queueId, offset, maxMsgNums);
} else {
- log.trace("GetMessageAsync from remote store, " +
+ log.trace("GetMessageAsync from next store, " +
"topic: {}, queue: {}, offset: {}, maxCount: {}", topic, queueId, offset, maxMsgNums);
return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 020b9f3b068..0db5dc5c4c5 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -42,8 +42,6 @@
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
-import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
import org.apache.rocketmq.tieredstore.file.FlatAppendFile;
import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
import org.apache.rocketmq.tieredstore.provider.FileSegment;
@@ -271,23 +269,23 @@ public CompletableFuture> queryAsync(
public void forceUpload() {
try {
readWriteLock.writeLock().lock();
- if (this.currentWriteFile == null) {
- log.warn("IndexStoreService no need force upload current write file");
- return;
- }
- // note: current file has been shutdown before
- IndexStoreFile lastFile = new IndexStoreFile(storeConfig, currentWriteFile.getTimestamp());
- if (this.doCompactThenUploadFile(lastFile)) {
- this.setCompactTimestamp(lastFile.getTimestamp());
- } else {
- throw new TieredStoreException(
- TieredStoreErrorCode.UNKNOWN, "IndexStoreService force compact current file error");
+ while (true) {
+ Map.Entry entry =
+ this.timeStoreTable.higherEntry(this.compactTimestamp.get());
+ if (entry == null) {
+ break;
+ }
+ if (this.doCompactThenUploadFile(entry.getValue())) {
+ this.setCompactTimestamp(entry.getValue().getTimestamp());
+ // The total number of files will not too much, prevent io too fast.
+ TimeUnit.MILLISECONDS.sleep(50);
+ }
}
} catch (Exception e) {
log.error("IndexStoreService force upload error", e);
throw new RuntimeException(e);
} finally {
- readWriteLock.writeLock().lock();
+ readWriteLock.writeLock().unlock();
}
}
@@ -393,19 +391,13 @@ protected IndexFile getNextSealedFile() {
@Override
public void shutdown() {
super.shutdown();
- readWriteLock.writeLock().lock();
- try {
- for (Map.Entry entry : timeStoreTable.entrySet()) {
- entry.getValue().shutdown();
- }
- if (!autoCreateNewFile) {
- this.forceUpload();
+ // Wait index service upload then clear time store table
+ while (!this.timeStoreTable.isEmpty()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(50);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
- this.timeStoreTable.clear();
- } catch (Exception e) {
- log.error("IndexStoreService shutdown error", e);
- } finally {
- readWriteLock.writeLock().unlock();
}
}
@@ -424,6 +416,18 @@ public void run() {
}
this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
}
+ readWriteLock.writeLock().lock();
+ try {
+ if (autoCreateNewFile) {
+ this.forceUpload();
+ }
+ this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
+ this.timeStoreTable.clear();
+ } catch (Exception e) {
+ log.error("IndexStoreService shutdown error", e);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
log.info(this.getServiceName() + " service shutdown");
}
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
index fb563f7c6c2..83b407e73ba 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
@@ -120,7 +120,7 @@ public void doConvertOldFormatTest() throws IOException {
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();
ConcurrentSkipListMap timeStoreTable = indexService.getTimeStoreTable();
- Assert.assertEquals(1, timeStoreTable.size());
+ Assert.assertEquals(2, timeStoreTable.size());
Assert.assertEquals(Long.valueOf(timestamp), timeStoreTable.firstKey());
mappedFile.destroy(10 * 1000);
}
@@ -232,7 +232,7 @@ public void restartServiceTest() throws InterruptedException {
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();
Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue());
- Assert.assertEquals(2, indexService.getTimeStoreTable().size());
+ Assert.assertEquals(4, indexService.getTimeStoreTable().size());
Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD,
indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus());
}
diff --git a/tools/pom.xml b/tools/pom.xml
index ee459dfd95a..ab740bd8a70 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 5.3.1-SNAPSHOT
+ 5.3.2-SNAPSHOT
4.0.0
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 6ebee1d0dd1..3686bf2644b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -52,6 +52,7 @@
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -771,6 +772,12 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String
);
}
+ @Override
+ public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr, String topic)
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
+ return this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic);
+ }
+
@Override
public boolean resumeCheckHalfMessage(String topic,
String msgId)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index dc4d35e7049..883dcbe41d7 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -90,6 +90,7 @@
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -1817,6 +1818,12 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String
return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis);
}
+ @Override
+ public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr, String topic)
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
+ return this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr, topic, timeoutMillis);
+ }
+
@Override
public boolean resumeCheckHalfMessage(final String topic,
final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index ff78f22c704..09204ab7be2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -48,6 +48,7 @@
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -148,6 +149,8 @@ ConsumeStats examineConsumeStats(
final String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException;
+ CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr, String topic) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException;
+
ConsumeStats examineConsumeStats(final String consumerGroup,
final String topic) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 43e4259c4e1..313a777ce4f 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -104,6 +104,7 @@
import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand;
import org.apache.rocketmq.tools.command.producer.ProducerSubCommand;
+import org.apache.rocketmq.tools.command.queue.CheckRocksdbCqWriteProgressCommand;
import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
@@ -304,6 +305,7 @@ public static void initCommand() {
initCommand(new ListAclSubCommand());
initCommand(new CopyAclsSubCommand());
initCommand(new RocksDBConfigToJsonCommand());
+ initCommand(new CheckRocksdbCqWriteProgressCommand());
}
private static void printHelp() {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
index 1ecb1fa2cd9..c466490b8a8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.tools.command.export;
import com.alibaba.fastjson.JSONObject;
@@ -77,6 +78,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
}
String configType = commandLine.getOptionValue("configType").trim().toLowerCase();
+ path += "/" + configType;
boolean jsonEnable = false;
if (commandLine.hasOption("jsonEnable")) {
@@ -86,7 +88,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /* readOnly */);
if (!kvStore.start()) {
- System.out.print("RocksDB load error, path=" + path + "\n");
+ System.out.printf("RocksDB load error, path=%s\n" , path);
return;
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
index bb82f5079e5..97e101d813c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
@@ -24,6 +24,7 @@
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageExt;
@@ -97,6 +98,12 @@ public Options buildCommandlineOptions(Options options) {
opt.setRequired(false);
options.addOption(opt);
+ opt =
+ new Option("l", "lmqParentTopic", true,
+ "Lmq parent topic, lmq is used to find the route.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -113,11 +120,20 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
String subExpression =
!commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim();
+ String lmqParentTopic =
+ !commandLine.hasOption('l') ? null : commandLine.getOptionValue('l').trim();
+
boolean printBody = !commandLine.hasOption('d') || Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
consumer.start();
- Set mqs = consumer.fetchSubscribeMessageQueues(topic);
+ Set mqs;
+ if (lmqParentTopic != null) {
+ mqs = consumer.fetchSubscribeMessageQueues(lmqParentTopic);
+ mqs.forEach(mq -> mq.setTopic(topic));
+ } else {
+ mqs = consumer.fetchSubscribeMessageQueues(topic);
+ }
for (MessageQueue mq : mqs) {
long minOffset = consumer.minOffset(mq);
long maxOffset = consumer.maxOffset(mq);
@@ -139,6 +155,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
READQ:
for (long offset = minOffset; offset < maxOffset; ) {
try {
+ fillBrokerAddrIfNotExist(consumer, mq, lmqParentTopic);
PullResult pullResult = consumer.pull(mq, subExpression, offset, 32);
offset = pullResult.getNextBeginOffset();
switch (pullResult.getPullStatus()) {
@@ -167,4 +184,17 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
consumer.shutdown();
}
}
+
+ public void fillBrokerAddrIfNotExist(DefaultMQPullConsumer defaultMQPullConsumer, MessageQueue messageQueue,
+ String routeTopic) {
+
+ FindBrokerResult findBrokerResult = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
+ .findBrokerAddressInSubscribe(messageQueue.getBrokerName(), 0, false);
+ if (findBrokerResult == null) {
+ // use lmq parent topic to fill up broker addr table
+ defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
+ .updateTopicRouteInfoFromNameServer(routeTopic);
+ }
+
+ }
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
index 1d81287ac7d..f2803b0cbb3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.tools.command.metadata;
-import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@@ -33,10 +32,13 @@
import java.io.File;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
public class RocksDBConfigToJsonCommand implements SubCommand {
private static final String TOPICS_JSON_CONFIG = "topics";
private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups";
+ private static final String CONSUMER_OFFSETS_JSON_CONFIG = "consumerOffsets";
@Override
public String commandName() {
@@ -45,7 +47,7 @@ public String commandName() {
@Override
public String commandDesc() {
- return "Convert RocksDB kv config (topics/subscriptionGroups) to json";
+ return "Convert RocksDB kv config (topics/subscriptionGroups/consumerOffsets) to json";
}
@Override
@@ -56,7 +58,7 @@ public Options buildCommandlineOptions(Options options) {
options.addOption(pathOption);
Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " +
- "topics/subscriptionGroups");
+ "topics/subscriptionGroups/consumerOffsets");
configTypeOption.setRequired(true);
options.addOption(configTypeOption);
@@ -71,19 +73,21 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
return;
}
- String configType = commandLine.getOptionValue("configType").trim().toLowerCase();
+ String configType = commandLine.getOptionValue("configType").trim();
if (!path.endsWith("/")) {
path += "/";
}
path += configType;
-
+ if (CONSUMER_OFFSETS_JSON_CONFIG.equalsIgnoreCase(configType)) {
+ printConsumerOffsets(path);
+ return;
+ }
ConfigRocksDBStorage configRocksDBStorage = new ConfigRocksDBStorage(path, true);
configRocksDBStorage.start();
RocksIterator iterator = configRocksDBStorage.iterator();
-
try {
final Map configMap = new HashMap<>();
- final Map configTable = new HashMap<>();
+ final JSONObject configTable = new JSONObject();
iterator.seekToFirst();
while (iterator.isValid()) {
final byte[] key = iterator.key();
@@ -95,14 +99,16 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
iterator.next();
}
byte[] kvDataVersion = configRocksDBStorage.getKvDataVersion();
- configMap.put("dataVersion",
- JSONObject.parseObject(new String(kvDataVersion, DataConverter.CHARSET_UTF8)));
+ if (kvDataVersion != null) {
+ configMap.put("dataVersion",
+ JSONObject.parseObject(new String(kvDataVersion, DataConverter.CHARSET_UTF8)));
+ }
- if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) {
- configMap.put("topicConfigTable", JSON.parseObject(JSONObject.toJSONString(configTable)));
+ if (TOPICS_JSON_CONFIG.equalsIgnoreCase(configType)) {
+ configMap.put("topicConfigTable", configTable);
}
- if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) {
- configMap.put("subscriptionGroupTable", JSON.parseObject(JSONObject.toJSONString(configTable)));
+ if (SUBSCRIPTION_GROUP_JSON_CONFIG.equalsIgnoreCase(configType)) {
+ configMap.put("subscriptionGroupTable", configTable);
}
System.out.print(JSONObject.toJSONString(configMap, true) + "\n");
} catch (Exception e) {
@@ -111,4 +117,42 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
configRocksDBStorage.shutdown();
}
}
+
+ private void printConsumerOffsets(String path) {
+ ConfigRocksDBStorage configRocksDBStorage = new ConfigRocksDBStorage(path, true);
+ configRocksDBStorage.start();
+ RocksIterator iterator = configRocksDBStorage.iterator();
+ try {
+ final Map configMap = new HashMap<>();
+ final JSONObject configTable = new JSONObject();
+ iterator.seekToFirst();
+ while (iterator.isValid()) {
+ final byte[] key = iterator.key();
+ final byte[] value = iterator.value();
+ final String name = new String(key, DataConverter.CHARSET_UTF8);
+ final String config = new String(value, DataConverter.CHARSET_UTF8);
+ final RocksDBOffsetSerializeWrapper jsonObject = JSONObject.parseObject(config, RocksDBOffsetSerializeWrapper.class);
+ configTable.put(name, jsonObject.getOffsetTable());
+ iterator.next();
+ }
+ configMap.put("offsetTable", configTable);
+ System.out.print(JSONObject.toJSONString(configMap, true) + "\n");
+ } catch (Exception e) {
+ System.out.print("Error occurred while converting RocksDB kv config to json, " + "configType=consumerOffsets, " + e.getMessage() + "\n");
+ } finally {
+ configRocksDBStorage.shutdown();
+ }
+ }
+
+ static class RocksDBOffsetSerializeWrapper {
+ private ConcurrentMap offsetTable = new ConcurrentHashMap<>(16);
+
+ public ConcurrentMap getOffsetTable() {
+ return offsetTable;
+ }
+
+ public void setOffsetTable(ConcurrentMap offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+ }
}
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
new file mode 100644
index 00000000000..d18a24ee1dc
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.queue;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+
+public class CheckRocksdbCqWriteProgressCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "checkRocksdbCqWriteProgress";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "check if rocksdb cq is same as file cq";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "cluster", true, "cluster name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("n", "nameserverAddr", true, "nameserverAddr");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("t", "topic", true, "topic name");
+ opt.setRequired(false);
+ options.addOption(opt);
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ defaultMQAdminExt.setNamesrvAddr(StringUtils.trim(commandLine.getOptionValue('n')));
+ String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : "";
+ String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "";
+
+ try {
+ defaultMQAdminExt.start();
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Map brokerAddrTable = clusterInfo.getBrokerAddrTable();
+ if (clusterAddrTable.get(clusterName) == null) {
+ System.out.print("clusterAddrTable is empty");
+ return;
+ }
+ for (Map.Entry entry : brokerAddrTable.entrySet()) {
+ String brokerName = entry.getKey();
+ BrokerData brokerData = entry.getValue();
+ String brokerAddr = brokerData.getBrokerAddrs().get(0L);
+ CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic);
+ if (StringUtils.isNotBlank(topic)) {
+ System.out.print(body.getDiffResult());
+ } else {
+ System.out.print(brokerName + " | " + brokerAddr + " | \n" + body.getDiffResult());
+ }
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}