Skip to content

Commit 5ef895a

Browse files
authored
[fix][client] Remove producer when close producer command is received (#16028)
1 parent 9f40cc1 commit 5ef895a

File tree

2 files changed

+24
-5
lines changed

2 files changed

+24
-5
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ public class ClientCnx extends PulsarHandler {
111111
// LookupRequests that waiting in client side.
112112
private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;
113113

114-
private final ConcurrentLongHashMap<ProducerImpl<?>> producers =
114+
@VisibleForTesting
115+
final ConcurrentLongHashMap<ProducerImpl<?>> producers =
115116
ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder()
116117
.expectedItems(16)
117118
.concurrencyLevel(1)
@@ -721,7 +722,7 @@ protected void handleError(CommandError error) {
721722
protected void handleCloseProducer(CommandCloseProducer closeProducer) {
722723
log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId());
723724
final long producerId = closeProducer.getProducerId();
724-
ProducerImpl<?> producer = producers.get(producerId);
725+
ProducerImpl<?> producer = producers.remove(producerId);
725726
if (producer != null) {
726727
producer.connectionClosed(this);
727728
} else {

pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
3939
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
4040
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
41+
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
4142
import org.apache.pulsar.common.api.proto.CommandError;
4243
import org.apache.pulsar.common.api.proto.ServerError;
4344
import org.apache.pulsar.common.protocol.Commands;
@@ -156,7 +157,7 @@ public void testGetLastMessageIdWithError() throws Exception {
156157

157158
@Test
158159
public void testHandleCloseConsumer() {
159-
ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
160+
ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseConsumer");
160161
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
161162
ClientConfigurationData conf = new ClientConfigurationData();
162163
ClientCnx cnx = new ClientCnx(conf, eventLoop);
@@ -165,11 +166,28 @@ public void testHandleCloseConsumer() {
165166
cnx.registerConsumer(consumerId, mock(ConsumerImpl.class));
166167
assertEquals(cnx.consumers.size(), 1);
167168

168-
CommandCloseConsumer closeConsumer = new CommandCloseConsumer()
169-
.setConsumerId(1);
169+
CommandCloseConsumer closeConsumer = new CommandCloseConsumer().setConsumerId(consumerId);
170170
cnx.handleCloseConsumer(closeConsumer);
171171
assertEquals(cnx.consumers.size(), 0);
172172

173173
eventLoop.shutdownGracefully();
174174
}
175+
176+
@Test
177+
public void testHandleCloseProducer() {
178+
ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseProducer");
179+
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
180+
ClientConfigurationData conf = new ClientConfigurationData();
181+
ClientCnx cnx = new ClientCnx(conf, eventLoop);
182+
183+
long producerId = 1;
184+
cnx.registerProducer(producerId, mock(ProducerImpl.class));
185+
assertEquals(cnx.producers.size(), 1);
186+
187+
CommandCloseProducer closeProducerCmd = new CommandCloseProducer().setProducerId(producerId);
188+
cnx.handleCloseProducer(closeProducerCmd);
189+
assertEquals(cnx.producers.size(), 0);
190+
191+
eventLoop.shutdownGracefully();
192+
}
175193
}

0 commit comments

Comments
 (0)