Skip to content

Commit

Permalink
[fix] [broker] fix mismatch between dispatcher.consumerList and dispa…
Browse files Browse the repository at this point in the history
…tcher.consumerSet (apache#22283)

(cherry picked from commit a52945b)
(cherry picked from commit bec3be2)
  • Loading branch information
poorbarcode authored and srinath-ctds committed Apr 23, 2024
1 parent 4764b44 commit 29b8359
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1196,10 +1196,20 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
"Consumer is already present on the connection");
} else if (existingConsumerFuture.isCompletedExceptionally()){
log.warn("[{}][{}][{}] A failed consumer with id is already present on the connection,"
+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s",
remoteAddress, subscriptionName));
consumers.remove(consumerId, existingConsumerFuture);
String.format("A failed consumer with id is already present on the connection."
+ " consumerId: %s, remoteAddress: %s, subscription: %s",
consumerId, remoteAddress, subscriptionName));
/**
* This future may was failed due to the client closed a in-progress subscribing.
* See {@link #handleCloseConsumer(CommandCloseConsumer)}
* Do not remove the failed future at current line, it will be removed after the progress of
* the previous subscribing is done.
* Before the previous subscribing is done, the new subscribe request will always fail.
* This mechanism is in order to prevent more complex logic to handle the race conditions.
*/
commandSender.sendErrorResponse(requestId, error,
"Consumer that failed is already present on the connection");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,15 @@ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
}

if (isConsumersExceededOnSubscription()) {
log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name);
log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit {}",
name, consumer);
return FutureUtil.failedFuture(new ConsumerBusyException("Subscription reached max consumers limit"));
}
// This is not an expected scenario, it will never happen in expected. Just print a warn log if the unexpected
// scenario happens. See more detail: https://github.com/apache/pulsar/pull/22283.
if (consumerSet.contains(consumer)) {
log.warn("[{}] Attempting to add a consumer that already registered {}", name, consumer);
}

consumerList.add(consumer);
if (consumerList.size() > 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3383,8 +3383,9 @@ public boolean isCompletedExceptionally() {
};
// assert error response
assertTrue(responseAssert.test(responseAssert));
// assert consumer-delete event occur
assertEquals(1L,
// The delete event will only occur after the future is completed.
// assert consumer-delete event will not occur.
assertEquals(0L,
deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count());
// Server will not close the connection
assertTrue(channel.isOpen());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import com.carrotsearch.hppc.ObjectSet;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-api")
public class SimpleProducerConsumerMLInitializeDelayTest extends ProducerConsumerBase {

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setTopicLoadTimeoutSeconds(60 * 5);
}

@Test(timeOut = 30 * 1000)
public void testConsumerListMatchesConsumerSet() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subName = "sub";
final int clientOperationTimeout = 3;
final int loadMLDelayMillis = clientOperationTimeout * 3 * 1000;
final int clientMaxBackoffSeconds = clientOperationTimeout * 2;
admin.topics().createNonPartitionedTopic(topicName);
// Create a client with a low operation timeout.
PulsarClient client = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.operationTimeout(clientOperationTimeout, TimeUnit.SECONDS)
.maxBackoffInterval(clientMaxBackoffSeconds, TimeUnit.SECONDS)
.build();
Consumer consumer = client.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
// Inject a delay for the initialization of ML, to make the consumer to register twice.
// Consumer register twice: the first will be timeout, and try again.
AtomicInteger delayTimes = new AtomicInteger();
mockZooKeeper.delay(loadMLDelayMillis, (op, s) -> {
if (op.toString().equals("GET") && s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) {
return delayTimes.incrementAndGet() == 1;
}
return false;
});
admin.topics().unload(topicName);
// Verify: at last, "dispatcher.consumers.size" equals "dispatcher.consumerList.size".
Awaitility.await().atMost(Duration.ofSeconds(loadMLDelayMillis * 3))
.ignoreExceptions().untilAsserted(() -> {
Dispatcher dispatcher = pulsar.getBrokerService()
.getTopic(topicName, false).join().get()
.getSubscription(subName).getDispatcher();
ObjectSet consumerSet = WhiteboxImpl.getInternalState(dispatcher, "consumerSet");
List consumerList = WhiteboxImpl.getInternalState(dispatcher, "consumerList");
log.info("consumerSet_size: {}, consumerList_size: {}", consumerSet.size(), consumerList.size());
Assert.assertEquals(consumerList.size(), 1);
Assert.assertEquals(consumerSet.size(), 1);
});

// Verify: the topic can be deleted.
consumer.close();
admin.topics().delete(topicName);
// cleanup.
client.close();
}
}

0 comments on commit 29b8359

Please sign in to comment.