From ca5d7de40115ee7803850efbc56b194982761a12 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Tue, 27 Aug 2024 17:31:10 +0800 Subject: [PATCH 1/5] fix missing brokerName in sendMessageBack request --- .../apache/rocketmq/client/consumer/DefaultMQPullConsumer.java | 2 +- .../apache/rocketmq/client/consumer/DefaultMQPushConsumer.java | 2 +- .../client/impl/consumer/DefaultMQPushConsumerImpl.java | 2 +- .../client/impl/consumer/DefaultMQPushConsumerImplTest.java | 3 ++- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index 089fd39b3e9..7c9a65ecdbf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -262,7 +262,7 @@ public void setRegisterTopics(Set registerTopics) { public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { msg.setTopic(withNamespace(msg.getTopic())); - this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null); + this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, msg.getBrokerName()); } /** diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 94785c69708..5df5cc8fa1a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -688,7 +688,7 @@ public void setSubscription(Map subscription) { public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { msg.setTopic(withNamespace(msg.getTopic())); - this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, (String) null); + this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, msg.getBrokerName()); } /** diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 0fef8666cb5..c92cadf5057 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -752,7 +752,7 @@ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerN public void sendMessageBack(MessageExt msg, int delayLevel, final MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - sendMessageBack(msg, delayLevel, null, mq); + sendMessageBack(msg, delayLevel, msg.getBrokerName(), mq); } diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java index 68563c02562..e5aebeba12b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java @@ -652,9 +652,10 @@ public void testQueryMessageByUniqKey() throws InterruptedException, MQClientExc @Test public void testSendMessageBack() throws InterruptedException, MQClientException, MQBrokerException, RemotingException { defaultMQPushConsumerImpl.sendMessageBack(createMessageExt(), 1, createMessageQueue()); + when(mQClientFactory.findBrokerAddressInPublish(anyString())).thenReturn(defaultBrokerAddr); verify(mqClientAPIImpl).consumerSendMessageBack( eq(defaultBrokerAddr), - any(), + eq(defaultBroker), any(MessageExt.class), any(), eq(1), From 60d7fc3859f3ea944c09a07d1671bf4bffac1b82 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Tue, 27 Aug 2024 18:00:00 +0800 Subject: [PATCH 2/5] fix --- .../client/impl/consumer/DefaultMQPushConsumerImplTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java index e5aebeba12b..2bc9c5a18db 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java @@ -651,8 +651,8 @@ public void testQueryMessageByUniqKey() throws InterruptedException, MQClientExc @Test public void testSendMessageBack() throws InterruptedException, MQClientException, MQBrokerException, RemotingException { - defaultMQPushConsumerImpl.sendMessageBack(createMessageExt(), 1, createMessageQueue()); when(mQClientFactory.findBrokerAddressInPublish(anyString())).thenReturn(defaultBrokerAddr); + defaultMQPushConsumerImpl.sendMessageBack(createMessageExt(), 1, createMessageQueue()); verify(mqClientAPIImpl).consumerSendMessageBack( eq(defaultBrokerAddr), eq(defaultBroker), From dc421affd5cf34e296545a662e58b0ffd5fcc888 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Wed, 11 Sep 2024 16:23:39 +0800 Subject: [PATCH 3/5] fix trace topic --- .../apache/rocketmq/client/trace/AsyncTraceDispatcher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 6d62617eb8e..0c675103189 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -332,14 +332,14 @@ private void flushData(List transBeanList, String topic, Stri buffer.append(bean.getTransData()); count++; if (buffer.length() >= traceProducer.getMaxMessageSize()) { - sendTraceDataByMQ(keySet, buffer.toString(), TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId); + sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName); buffer.delete(0, buffer.length()); keySet.clear(); count = 0; } } if (count > 0) { - sendTraceDataByMQ(keySet, buffer.toString(), TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId); + sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName); } transBeanList.clear(); } From 08b03d9ccec53cfc6b62597657d1e7097b4e3648 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Wed, 11 Sep 2024 16:42:54 +0800 Subject: [PATCH 4/5] fix --- .../client/trace/AsyncTraceDispatcher.java | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 0c675103189..be1442a811b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -302,14 +302,24 @@ public void run() { public void sendTraceData(List contextList) { Map> transBeanMap = new HashMap<>(16); - String currentRegionId; + String traceTopic; for (TraceContext context : contextList) { - currentRegionId = context.getRegionId(); + AccessChannel accessChannel = context.getAccessChannel(); + if (accessChannel == null) { + accessChannel = AsyncTraceDispatcher.this.accessChannel; + } + String currentRegionId = context.getRegionId(); if (currentRegionId == null || context.getTraceBeans().isEmpty()) { continue; } + if (AccessChannel.CLOUD == accessChannel) { + traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + currentRegionId; + } else { + traceTopic = traceTopicName; + } + String topic = context.getTraceBeans().get(0).getTopic(); - String key = topic + TraceConstants.CONTENT_SPLITOR + currentRegionId; + String key = topic + TraceConstants.CONTENT_SPLITOR + traceTopic; List transBeanList = transBeanMap.computeIfAbsent(key, k -> new ArrayList<>()); TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context); transBeanList.add(traceData); @@ -320,7 +330,7 @@ public void sendTraceData(List contextList) { } } - private void flushData(List transBeanList, String topic, String currentRegionId) { + private void flushData(List transBeanList, String topic, String traceTopic) { if (transBeanList.size() == 0) { return; } @@ -332,14 +342,14 @@ private void flushData(List transBeanList, String topic, Stri buffer.append(bean.getTransData()); count++; if (buffer.length() >= traceProducer.getMaxMessageSize()) { - sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName); + sendTraceDataByMQ(keySet, buffer.toString(), traceTopic); buffer.delete(0, buffer.length()); keySet.clear(); count = 0; } } if (count > 0) { - sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName); + sendTraceDataByMQ(keySet, buffer.toString(), traceTopic); } transBeanList.clear(); } @@ -411,4 +421,7 @@ private Set tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, } } + public boolean isStarted() { + return isStarted.get(); + } } \ No newline at end of file From f7d48086bcb5b88b61c8423c892e307c1aa59ddf Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Wed, 11 Sep 2024 16:43:25 +0800 Subject: [PATCH 5/5] fix --- .../org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index be1442a811b..e321e1583d2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -421,7 +421,4 @@ private Set tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, } } - public boolean isStarted() { - return isStarted.get(); - } } \ No newline at end of file