Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] DLQ producer name conflicts when consumer sends messages to DLQ #21888

Closed
2 tasks done
RobertIndie opened this issue Jan 12, 2024 · 0 comments · Fixed by #21890
Closed
2 tasks done

[Bug] DLQ producer name conflicts when consumer sends messages to DLQ #21888

RobertIndie opened this issue Jan 12, 2024 · 0 comments · Fixed by #21890
Assignees
Labels
release/blocker Indicate the PR or issue that should block the release until it gets resolved type/bug The PR fixed a bug or issue reported a bug
Milestone

Comments

@RobertIndie
Copy link
Member

RobertIndie commented Jan 12, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Version

c834feb

Minimal reproduce step

Run the test: testDeadLetterTopicWithInitialSubscriptionAndMultiConsumers

What did you expect to see?

The test should be passed without any exceptions.

What did you see instead?

Even though the test is passed, It will throw unexpected execeptions:

2024-01-12T09:55:17,713 - ERROR - [pulsar-client-io-35-5:ConsumerImpl@2155] - Dead letter producer exception with topic: persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?]
	at org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$16(ProducerImpl.java:1846) ~[classes/:?]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?]
	at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:794) ~[classes/:?]
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:192) ~[classes/:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[netty-handler-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
	at java.lang.Thread.run(Thread.java:1583) ~[?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
	at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1318) ~[classes/:?]
	at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:795) ~[classes/:?]
	... 26 more

Anything else?

The root cause is that a regression bug is introduced in #21589. The producer name will be conflicted when multiple consumers in the same topic and subscription send messages to DLQ concurrently.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@RobertIndie RobertIndie added the type/bug The PR fixed a bug or issue reported a bug label Jan 12, 2024
@RobertIndie RobertIndie self-assigned this Jan 12, 2024
@codelipenghui codelipenghui added the release/blocker Indicate the PR or issue that should block the release until it gets resolved label Jan 12, 2024
@codelipenghui codelipenghui added this to the 3.2.0 milestone Jan 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release/blocker Indicate the PR or issue that should block the release until it gets resolved type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
2 participants