Skip to content

Commit

Permalink
[fix][client] Fix DLQ producer name conflicts when multiples consumer…
Browse files Browse the repository at this point in the history
…s send messages to DLQ (#21890)
  • Loading branch information
RobertIndie authored Jan 15, 2024
1 parent c834feb commit 252509e
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public void testDeadLetterTopicWithMessageKey() throws Exception {
public void testDeadLetterTopicWithProducerName() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
final String subscription = "my-subscription";
String deadLetterProducerName = String.format("%s-%s-DLQ", topic, subscription);
final String consumerName = "my-consumer";
String deadLetterProducerName = String.format("%s-%s-%s-DLQ", topic, subscription, consumerName);

final int maxRedeliveryCount = 1;

Expand All @@ -149,6 +150,7 @@ public void testDeadLetterTopicWithProducerName() throws Exception {
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName(subscription)
.consumerName(consumerName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
Expand Down Expand Up @@ -929,6 +931,9 @@ public void testDeadLetterTopicWithInitialSubscriptionAndMultiConsumers() throws
assertTrue(admin.topics().getSubscriptions(deadLetterTopic).contains(dlqInitialSub));
});

// We should assert that all consumers are able to produce messages to DLQ
assertEquals(admin.topics().getStats(deadLetterTopic).getPublishers().size(), 2);

Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic(deadLetterTopic)
.subscriptionName(dlqInitialSub)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2172,7 +2172,8 @@ private void initDeadLetterProducerIfNeeded() {
((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.producerName(String.format("%s-%s-DLQ", this.topicName, this.subscription))
.producerName(String.format("%s-%s-%s-DLQ", this.topicName, this.subscription,
this.consumerName))
.blockIfQueueFull(false)
.enableBatching(false)
.enableChunking(true)
Expand Down

0 comments on commit 252509e

Please sign in to comment.