Skip to content

Commit

Permalink
[fix][broker] Skip reading entries from closed cursor. (apache#22751)
Browse files Browse the repository at this point in the history
(cherry picked from commit aa8226f)
(cherry picked from commit 93e09ae)
  • Loading branch information
dao-jun authored and nikhil-ctds committed Aug 22, 2024
1 parent 29d155a commit 3c8c8ab
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import java.util.ArrayList;
Expand Down Expand Up @@ -286,6 +287,12 @@ public void readMoreEntriesAsync() {
}

public synchronized void readMoreEntries() {
if (cursor.isClosed()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor is already closed, skipping read more entries.", cursor.getName());
}
return;
}
if (isSendInProgress()) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
Expand Down Expand Up @@ -853,7 +860,14 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
ReadType readType = (ReadType) ctx;
long waitTimeMillis = readFailureBackoff.next();

if (exception instanceof NoMoreEntriesToReadException) {
// Do not keep reading more entries if the cursor is already closed.
if (exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor is already closed, skipping read more entries", cursor.getName());
}
// Set the wait time to -1 to avoid rescheduling the read.
waitTimeMillis = -1;
} else if (exception instanceof NoMoreEntriesToReadException) {
if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Topic has been terminated and there are no more entries to read
// Notify the consumer only if all the messages were already acknowledged
Expand Down Expand Up @@ -892,7 +906,14 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
}

readBatchSize = serviceConfig.getDispatcherMinReadBatchSize();
// Skip read if the waitTimeMillis is a nagetive value.
if (waitTimeMillis >= 0) {
scheduleReadEntriesWithDelay(exception, readType, waitTimeMillis);
}
}

@VisibleForTesting
void scheduleReadEntriesWithDelay(Exception e, ReadType readType, long waitTimeMillis) {
topic.getBrokerService().executor().schedule(() -> {
synchronized (PersistentDispatcherMultipleConsumers.this) {
// If it's a replay read we need to retry even if there's already
Expand All @@ -902,11 +923,10 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
log.info("[{}] Retrying read operation", name);
readMoreEntries();
} else {
log.info("[{}] Skipping read retry: havePendingRead {}", name, havePendingRead, exception);
log.info("[{}] Skipping read retry: havePendingRead {}", name, havePendingRead, e);
}
}
}, waitTimeMillis, TimeUnit.MILLISECONDS);

}

private boolean needTrimAckedMessages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Recycler;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -318,6 +319,12 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl

@Override
protected void readMoreEntries(Consumer consumer) {
if (cursor.isClosed()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor is already closed, skipping read more entries", cursor.getName());
}
return;
}
// consumer can be null when all consumers are disconnected from broker.
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
Expand Down Expand Up @@ -489,6 +496,14 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep
Consumer c = readEntriesCtx.getConsumer();
readEntriesCtx.recycle();

// Do not keep reading messages from a closed cursor.
if (exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor was already closed, skipping read more entries", cursor.getName());
}
return;
}

if (exception instanceof ConcurrentWaitCallbackException) {
// At most one pending read request is allowed when there are no more entries, we should not trigger more
// read operations in this case and just wait the existing read operation completes.
Expand Down Expand Up @@ -525,6 +540,11 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep
// Reduce read batch size to avoid flooding bookies with retries
readBatchSize = serviceConfig.getDispatcherMinReadBatchSize();

scheduleReadEntriesWithDelay(c, waitTimeMillis);
}

@VisibleForTesting
void scheduleReadEntriesWithDelay(Consumer c, long delay) {
topic.getBrokerService().executor().schedule(() -> {

// Jump again into dispatcher dedicated thread
Expand All @@ -546,8 +566,7 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep
}
}
});
}, waitTimeMillis, TimeUnit.MILLISECONDS);

}, delay, TimeUnit.MILLISECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,24 @@

import com.carrotsearch.hppc.ObjectSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -98,4 +107,66 @@ public void testTopicDeleteIfConsumerSetMismatchConsumerList2() throws Exception
consumer.close();
admin.topics().delete(topicName, false);
}

@Test
public void testSkipReadEntriesFromCloseCursor() throws Exception {
final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/testSkipReadEntriesFromCloseCursor");
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(topicName);

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
for (int i = 0; i < 10; i++) {
producer.send("message-" + i);
}
producer.close();

// Get the dispatcher of the topic.
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
.getTopic(topicName, false).join().get();

ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
Mockito.doReturn(subscription).when(cursor).getName();
Subscription sub = Mockito.mock(PersistentSubscription.class);
Mockito.doReturn(topic).when(sub).getTopic();
// Mock the dispatcher.
PersistentDispatcherMultipleConsumers dispatcher =
Mockito.spy(new PersistentDispatcherMultipleConsumers(topic, cursor, sub));
// Return 10 permits to make the dispatcher can read more entries.
Mockito.doReturn(10).when(dispatcher).getFirstAvailableConsumerPermits();

// Make the count + 1 when call the scheduleReadEntriesWithDelay(...).
AtomicInteger callScheduleReadEntriesWithDelayCnt = new AtomicInteger(0);
Mockito.doAnswer(inv -> {
callScheduleReadEntriesWithDelayCnt.getAndIncrement();
return inv.callRealMethod();
}).when(dispatcher).scheduleReadEntriesWithDelay(Mockito.any(), Mockito.any(), Mockito.anyLong());

// Make the count + 1 when call the readEntriesFailed(...).
AtomicInteger callReadEntriesFailed = new AtomicInteger(0);
Mockito.doAnswer(inv -> {
callReadEntriesFailed.getAndIncrement();
return inv.callRealMethod();
}).when(dispatcher).readEntriesFailed(Mockito.any(), Mockito.any());

Mockito.doReturn(false).when(cursor).isClosed();

// Mock the readEntriesOrWait(...) to simulate the cursor is closed.
Mockito.doAnswer(inv -> {
PersistentDispatcherMultipleConsumers dispatcher1 = inv.getArgument(2);
dispatcher1.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("cursor closed"),
null);
return null;
}).when(cursor).asyncReadEntriesOrWait(Mockito.anyInt(), Mockito.anyLong(), Mockito.eq(dispatcher),
Mockito.any(), Mockito.any());

dispatcher.readMoreEntries();

// Verify: the readEntriesFailed should be called once and the scheduleReadEntriesWithDelay should not be called.
Assert.assertTrue(callReadEntriesFailed.get() == 1 && callScheduleReadEntriesWithDelayCnt.get() == 0);

// Verify: the topic can be deleted successfully.
admin.topics().delete(topicName, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;

import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-api")
public class PersistentDispatcherSingleActiveConsumerTest extends ProducerConsumerBase {
@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testSkipReadEntriesFromCloseCursor() throws Exception {
final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/testSkipReadEntriesFromCloseCursor");
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(topicName);

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
for (int i = 0; i < 10; i++) {
producer.send("message-" + i);
}
producer.close();

// Get the dispatcher of the topic.
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
.getTopic(topicName, false).join().get();

ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
Mockito.doReturn(subscription).when(cursor).getName();
Subscription sub = Mockito.mock(PersistentSubscription.class);
Mockito.doReturn(topic).when(sub).getTopic();
// Mock the dispatcher.
PersistentDispatcherSingleActiveConsumer dispatcher =
Mockito.spy(new PersistentDispatcherSingleActiveConsumer(cursor, CommandSubscribe.SubType.Exclusive,0, topic, sub));

// Mock a consumer
Consumer consumer = Mockito.mock(Consumer.class);
consumer.getAvailablePermits();
Mockito.doReturn(10).when(consumer).getAvailablePermits();
Mockito.doReturn(10).when(consumer).getAvgMessagesPerEntry();
Mockito.doReturn("test").when(consumer).consumerName();
Mockito.doReturn(true).when(consumer).isWritable();
Mockito.doReturn(false).when(consumer).readCompacted();

// Make the consumer as the active consumer.
Mockito.doReturn(consumer).when(dispatcher).getActiveConsumer();

// Make the count + 1 when call the scheduleReadEntriesWithDelay(...).
AtomicInteger callScheduleReadEntriesWithDelayCnt = new AtomicInteger(0);
Mockito.doAnswer(inv -> {
callScheduleReadEntriesWithDelayCnt.getAndIncrement();
return inv.callRealMethod();
}).when(dispatcher).scheduleReadEntriesWithDelay(Mockito.eq(consumer), Mockito.anyLong());

// Make the count + 1 when call the readEntriesFailed(...).
AtomicInteger callReadEntriesFailed = new AtomicInteger(0);
Mockito.doAnswer(inv -> {
callReadEntriesFailed.getAndIncrement();
return inv.callRealMethod();
}).when(dispatcher).readEntriesFailed(Mockito.any(), Mockito.any());

Mockito.doReturn(false).when(cursor).isClosed();

// Mock the readEntriesOrWait(...) to simulate the cursor is closed.
Mockito.doAnswer(inv -> {
PersistentDispatcherSingleActiveConsumer dispatcher1 = inv.getArgument(2);
dispatcher1.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("cursor closed"),
null);
return null;
}).when(cursor).asyncReadEntriesOrWait(Mockito.anyInt(), Mockito.anyLong(), Mockito.eq(dispatcher),
Mockito.any(), Mockito.any());

dispatcher.readMoreEntries(consumer);

// Verify: the readEntriesFailed should be called once and the scheduleReadEntriesWithDelay should not be called.
Assert.assertTrue(callReadEntriesFailed.get() == 1 && callScheduleReadEntriesWithDelayCnt.get() == 0);

// Verify: the topic can be deleted successfully.
admin.topics().delete(topicName, false);
}
}

0 comments on commit 3c8c8ab

Please sign in to comment.