Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Orphan producer when concurrently calling producer closing and reconnection #23853

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Jan 15, 2025

Motivation

background 1: producer's reconnection[1]

  • set its connection to null
  • check the state is not closing | closed
  • set the state to connecting

background 2: steps of producer closing[2]

  • if its connection is null:
    • fails all pending sends
    • set state to closed
  • if its connection is present:
    • remove producer in broker
    • remove the producer in the connection
    • ails all pending sends
    • set state to closed

Issue 1: resending messages encountered a recycled pending message

time thread reconnection thread close producer
1 set its connection to null
2 check state is not `closing closed`
3 set state to closing
4 connection is null now
5 set state to connecting
6 reconnect successfully
7 resend pending messages fails all pending sends
8 encounters a recycled pending message
9 set state to close
10 An orphan producer was left in broker

Issue 2: closed producers were set to connecting mistakenly: the steps to reproduce the issue are as follows

time thread reconnection thread close producer
1 set its connection to null
2 check state is not `closing closed`
3 set state to closing
4 connection is null now
5 set state to connecting
6 reconnect successfully
7 resend pending messages fails all pending sends
8 encounters a recycled pending message
9 set state to close
10 an orphan producer was left in broker

You can reproduce the issue by testConcurrencyReconnectAndClose

    public void connectionClosed(ClientCnx cnx, Optional<Long> initialConnectionDelayMs, Optional<URI> hostUrl) {
        if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) { // set its connection to `null`
            if (!isValidStateForReconnection()) { // check the state is not `closing | closed` 
                return;
            }
            state.setState(State.Connecting); // set the state to `connecting`
            grabCnx(hostUrl); // do reconnect.
    }
        if (cnx == null || currentState != State.Ready) {
            log.info("[{}] [{}] Closed Producer (not connected)", topic, producerName);
            closeAndClearPendingMessages();
            return CompletableFuture.completedFuture(null);
        }

logs that encountered the issue 2

2025-01-09T11:04:41,172+0000 [pulsar-web-43-5] WARN  org.apache.pulsar.client.impl.ProducerImpl - [persistent://xxx/xxx/xx-partition-0] [pulsar.repl.prod-->prod-repl] Got exception while completing the callback for msg 458:
java.lang.NullPointerException: Cannot read field "replicatorId" because "x0" is null
	at org.apache.pulsar.broker.service.persistent.PersistentReplicator.access$000(PersistentReplicator.java:71) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.broker.service.persistent.PersistentReplicator$ProducerSendCallback.sendComplete(PersistentReplicator.java:356) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1526) ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:2077) ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8]
	at java.util.ArrayDeque.forEach(ArrayDeque.java:888) ~[?:?]
	at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1617) ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:2067) ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.client.impl.ProducerImpl.closeAndClearPendingMessages(ProducerImpl.java:1113) ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.client.impl.ProducerImpl.closeAsync(ProducerImpl.java:1080) ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.broker.service.AbstractReplicator.doCloseProducerAsync(AbstractReplicator.java:365) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.broker.service.AbstractReplicator.terminate(AbstractReplicator.java:387) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$close$47(PersistentTopic.java:1598) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:563) ~[io.streamnative-pulsar-common-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:277) ~[io.streamnative-pulsar-common-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.close(PersistentTopic.java:1598) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalUnloadNonPartitionedTopicAsync$126(PersistentTopicsBase.java:1174) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalUnloadNonPartitionedTopicAsync(PersistentTopicsBase.java:1174) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalUnloadTopic$110(PersistentTopicsBase.java:962) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757) ~[?:?]
	at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182) ~[?:?]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalUnloadTopic(PersistentTopicsBase.java:956) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.broker.admin.v2.PersistentTopics.unloadTopic(PersistentTopics.java:1120) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:569) ~[?:?]
	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:146) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:189) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:93) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?]
	at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:359) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:312) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?]
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.apache.pulsar.broker.web.ResponseHandlerFilter.doFilter(ResponseHandlerFilter.java:66) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.apache.pulsar.broker.web.AuthenticationFilter.doFilter(AuthenticationFilter.java:65) ~[io.streamnative-pulsar-broker-common-3.0.6.8.jar:3.0.6.8]
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.apache.pulsar.broker.intercept.BrokerInterceptor.onFilter(BrokerInterceptor.java:224) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.apache.pulsar.broker.web.ProcessHandlerFilter.doFilter(ProcessHandlerFilter.java:46) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.apache.pulsar.broker.web.PreInterceptFilter.doFilter(PreInterceptFilter.java:73) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.apache.pulsar.broker.web.WebService$FilterInitializer$WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter.doFilter(WebService.java:325) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.servlets.QoSFilter.doFilter(QoSFilter.java:202) ~[org.eclipse.jetty-jetty-servlets-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:722) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) ~[org.eclipse.jetty-jetty-io-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) ~[org.eclipse.jetty-jetty-io-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) ~[org.eclipse.jetty-jetty-io-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208]
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]

2025-01-09T11:04:41,171+0000 [pulsar-web-43-5] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://xxx/xxx/xxx-partition-0] [pulsar.repl.prod-->prod-repl] Closed Producer (not connected)

Modifications

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode added type/bug The PR fixed a bug or issue reported a bug release/3.0.9 release/3.3.5 release/4.0.3 labels Jan 15, 2025
@poorbarcode poorbarcode added this to the 4.1.0 milestone Jan 15, 2025
@poorbarcode poorbarcode self-assigned this Jan 15, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jan 15, 2025
@lhotari
Copy link
Member

lhotari commented Jan 15, 2025

The PR title is currently very confusing when it's "Fix closed producers were reverted mistakenly".
Are you referring to these changes that "reverted" something: https://github.com/apache/pulsar/pull/23761/files , or is it something else?

@poorbarcode poorbarcode changed the title [fix][client]Fix closed producers were reverted mistakenly [fix][client]Orphan producer when concurrency calling producer closing and reconnection Jan 16, 2025
@poorbarcode
Copy link
Contributor Author

poorbarcode commented Jan 16, 2025

@lhotari

Are you referring to these changes that "reverted" something: https://github.com/apache/pulsar/pull/23761/files , or is it something else?

Sorry, it should be recover, which means the producer's state will be reset to reconnecting even if it has been closed

I corrected the title

@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@lhotari
Copy link
Member

lhotari commented Jan 16, 2025

@lhotari

Are you referring to these changes that "reverted" something: https://github.com/apache/pulsar/pull/23761/files , or is it something else?

Sorry, it should be recover, which means the producer's state will be reset to reconnecting even if it has been closed

I corrected the title

The title remains hard to understand. I used the technique described here to let an LLM (Claude) suggest a title. The suggestion based on the PR context is "Fix race condition between producer reconnection and closing that causes orphaned producers". Here's the full example of what LLM suggested: https://gist.github.com/lhotari/e63521b8a5694c5a928f740cd5d46331 .
I'm not saying that LLMs should be used to write PR titles and descriptions, but they are very good in fixing grammar and improving the clarity. It's a useful tool.

@lhotari
Copy link
Member

lhotari commented Jan 16, 2025

Sorry, it should be recover, which means the producer's state will be reset to reconnecting even if it has been closed

I corrected the title

@poorbarcode Please also make updates in the description where it mentions the very misleading sentence "closed producers were reverted mistakenly".

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Jan 16, 2025

@lhotari

@poorbarcode Please also make updates in the description where it mentions the very misleading sentence "closed producers were reverted mistakenly".

Modified

Copy link
Member

@shibd shibd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/LGTM

Nice catch!

Comment on lines -78 to -106
@Test
public void testClearPendingMessageWhenCloseAsync() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
Mockito.doReturn(1L).when(client).newProducerId();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setStatsIntervalSeconds(-1);
Mockito.doReturn(clientConf).when(client).getConfiguration();
Mockito.doReturn(new InstrumentProvider(null)).when(client).instrumentProvider();
ConnectionPool connectionPool = mock(ConnectionPool.class);
Mockito.doReturn(1).when(connectionPool).genRandomKeyToSelectCon();
Mockito.doReturn(connectionPool).when(client).getCnxPool();
HashedWheelTimer timer = mock(HashedWheelTimer.class);
Mockito.doReturn(null).when(timer).newTimeout(Mockito.any(), Mockito.anyLong(), Mockito.any());
Mockito.doReturn(timer).when(client).timer();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setSendTimeoutMs(-1);
ProducerImpl<?> producer = Mockito.spy(new ProducerImpl<>(client, "topicName", producerConf, null, 0, null, null, Optional.empty()));

// make sure throw exception when send request to broker
ClientCnx clientCnx = mock(ClientCnx.class);
CompletableFuture<ProducerResponse> tCompletableFuture = new CompletableFuture<>();
tCompletableFuture.completeExceptionally(new PulsarClientException("error"));
when(clientCnx.sendRequestWithId(Mockito.any(), Mockito.anyLong())).thenReturn(tCompletableFuture);
Mockito.doReturn(clientCnx).when(producer).cnx();

// run closeAsync and verify
CompletableFuture<Void> voidCompletableFuture = producer.closeAsync();
verify(producer).closeAndClearPendingMessages();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the test case completely removed? Would it be possible to ensure with a test that the problem described in PR #23761 is addressed?

@@ -192,13 +193,12 @@ public void connectionClosed(ClientCnx cnx, Optional<Long> initialConnectionDela
duringConnect.set(false);
state.client.getCnxPool().releaseConnection(cnx);
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
if (!isValidStateForReconnection()) {
if (!state.changeToConnecting()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously this would allow the connection to be in Ready state while switching to Connecting. Is it intentional that it's no longer allowed?

@lhotari lhotari changed the title [fix][client]Orphan producer when concurrency calling producer closing and reconnection [fix][client] Orphan producer when concurrently calling producer closing and reconnection Jan 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs ready-to-test release/3.0.9 release/3.3.5 release/4.0.3 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants