From 6e35917d91e7ce0d54f8af3116428894baab90e8 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 15 Aug 2024 22:00:29 +0300 Subject: [PATCH 1/3] [fix][client] Copy orderingKey to retry letter topic and DLQ messages --- .../client/api/DeadLetterTopicTest.java | 60 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 9 ++- 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 83320fffa1a7f..f5a74dcd1661b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -197,6 +197,66 @@ public void testDeadLetterTopicWithBinaryMessageKey() throws Exception { consumer.close(); } + @Test + public void testDeadLetterTopicMessagesWithOrderingKey() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + + final int maxRedeliveryCount = 1; + + final int sendMessages = 100; + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + @Cleanup + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + byte[] key = new byte[]{1, 2, 3, 4}; + for (int i = 0; i < sendMessages; i++) { + producer.newMessage() + .orderingKey(key) + .value(String.format("Hello Pulsar [%d]", i).getBytes()) + .send(); + } + + producer.close(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + assertEquals(message.getOrderingKey(), key); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + } + public void testDeadLetterTopicWithProducerName() throws Exception { final String topic = "persistent://my-property/my-ns/dead-letter-topic"; final String subscription = "my-subscription"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 36cd52f955409..9b8da1fbdbcdc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -639,7 +639,7 @@ protected CompletableFuture doAcknowledge(List messageIdList, A } } - private static void copyMessageKeyIfNeeded(Message message, TypedMessageBuilder typedMessageBuilderNew) { + private static void copyMessageKeysIfNeeded(Message message, TypedMessageBuilder typedMessageBuilderNew) { if (message.hasKey()) { if (message.hasBase64EncodedKey()) { typedMessageBuilderNew.keyBytes(message.getKeyBytes()); @@ -647,6 +647,9 @@ private static void copyMessageKeyIfNeeded(Message message, TypedMessageBuild typedMessageBuilderNew.key(message.getKey()); } } + if (message.hasOrderingKey()) { + typedMessageBuilderNew.orderingKey(message.getOrderingKey()); + } } @SuppressWarnings("unchecked") @@ -732,7 +735,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a if (delayTime > 0) { typedMessageBuilderNew.deliverAfter(delayTime, unit); } - copyMessageKeyIfNeeded(message, typedMessageBuilderNew); + copyMessageKeysIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) .thenAccept(v -> result.complete(null)) @@ -2196,7 +2199,7 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) .value(message.getData()) .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); - copyMessageKeyIfNeeded(message, typedMessageBuilderNew); + copyMessageKeysIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenAccept(messageIdInDLQ -> { possibleSendToDeadLetterTopicMessages.remove(messageId); From 979d2e2fc423a9101064df063fd7f6103c18311f Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 15 Aug 2024 22:10:18 +0300 Subject: [PATCH 2/3] Add test for retry and dlq key and orderingKey --- .../pulsar/client/api/RetryTopicTest.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index 2ccae72143443..9cb82fde04118 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -257,6 +257,9 @@ public void testAutoConsumeSchemaRetryLetter() throws Exception { public void testRetryTopicProperties() throws Exception { final String topic = "persistent://my-property/my-ns/retry-topic"; + byte[] key = "key".getBytes(); + byte[] orderingKey = "orderingKey".getBytes(); + final int maxRedeliveryCount = 3; final int sendMessages = 10; @@ -285,7 +288,11 @@ public void testRetryTopicProperties() throws Exception { Set originMessageIds = new HashSet<>(); for (int i = 0; i < sendMessages; i++) { - MessageId msgId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + MessageId msgId = producer.newMessage() + .value(String.format("Hello Pulsar [%d]", i).getBytes()) + .keyBytes(key) + .orderingKey(orderingKey) + .send(); originMessageIds.add(msgId.toString()); } @@ -298,6 +305,10 @@ public void testRetryTopicProperties() throws Exception { if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) { // check the REAL_TOPIC property assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic); + assertTrue(message.hasKey()); + assertEquals(message.getKeyBytes(), key); + assertTrue(message.hasOrderingKey()); + assertEquals(message.getOrderingKey(), orderingKey); retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)); } consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); @@ -317,6 +328,10 @@ public void testRetryTopicProperties() throws Exception { if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) { // check the REAL_TOPIC property assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic); + assertTrue(message.hasKey()); + assertEquals(message.getKeyBytes(), key); + assertTrue(message.hasOrderingKey()); + assertEquals(message.getOrderingKey(), orderingKey); deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)); } deadLetterConsumer.acknowledge(message); From aede45124e94d78d374e5c53148e3f6f115bc933 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 15 Aug 2024 22:10:41 +0300 Subject: [PATCH 3/3] Fix bug in ConsumerImpl where the keys weren't copied --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 9b8da1fbdbcdc..596e65484d1b2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -707,6 +707,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get())) .value(retryMessage.getData()) .properties(propertiesMap); + copyMessageKeysIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { consumerDlqMessagesCounter.increment();