Skip to content

Commit

Permalink
[fix][broker] Fix consumer stops receiving messages when with large b…
Browse files Browse the repository at this point in the history
…acklogs processing (apache#22454)

(cherry picked from commit 40329ee)
  • Loading branch information
Technoboy- authored and srinath-ctds committed Apr 23, 2024
1 parent 8e2bd55 commit 8f3e1fd
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,11 @@ private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Ob
log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition);
}

if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
return;
}

if (!hasMoreEntries()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Still no entries available. Register for notification", ledger.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,7 @@ public synchronized void asyncDeleteCursor(final String consumerName, final Dele
+ consumerName), ctx);
return;
} else if (!cursor.isDurable()) {
cursor.setState(ManagedCursorImpl.State.Closed);
cursors.removeCursor(consumerName);
deactivateCursorByName(consumerName);
callback.deleteCursorComplete(ctx);
Expand Down Expand Up @@ -3851,13 +3852,7 @@ public void removeWaitingCursor(ManagedCursor cursor) {
}

public void addWaitingCursor(ManagedCursorImpl cursor) {
if (cursor instanceof NonDurableCursorImpl) {
if (cursor.isActive()) {
this.waitingCursors.add(cursor);
}
} else {
this.waitingCursors.add(cursor);
}
this.waitingCursors.add(cursor);
}

public boolean isCursorActive(ManagedCursor cursor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor

if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
deactivateCursor();
topic.getManagedLedger().removeWaitingCursor(cursor);

if (!cursor.isDurable()) {
// If cursor is not durable, we need to clean up the subscription as well
Expand Down Expand Up @@ -335,11 +334,14 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor
if (!isResetCursor) {
try {
topic.getManagedLedger().deleteCursor(cursor.getName());
topic.getManagedLedger().removeWaitingCursor(cursor);
} catch (InterruptedException | ManagedLedgerException e) {
log.warn("[{}] [{}] Failed to remove non durable cursor", topic.getName(), subName, e);
}
}
});
} else {
topic.getManagedLedger().removeWaitingCursor(cursor);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -112,6 +113,11 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setManagedLedgerCursorBackloggedThreshold(10);
}

/**
* Test validates that broker cleans up topic which failed to unload while bundle unloading.
*
Expand Down Expand Up @@ -680,7 +686,7 @@ public void testAddWaitingCursorsForNonDurable() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
final ManagedCursor spyCursor= spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2"));
doAnswer((invocation) -> {
Thread.sleep(10_000);
Thread.sleep(5_000);
invocation.callRealMethod();
return null;
}).when(spyCursor).asyncReadEntriesOrWait(any(int.class), any(long.class),
Expand All @@ -707,4 +713,52 @@ public void testAddWaitingCursorsForNonDurable() throws Exception {
assertEquals(ledger.getWaitingCursorsCount(), 0);
});
}

@Test
public void testAddWaitingCursorsForNonDurable2() throws Exception {
final String ns = "prop/ns-test";
admin.namespaces().createNamespace(ns, 2);
final String topicName = "persistent://prop/ns-test/testAddWaitingCursors2";
admin.topics().createNonPartitionedTopic(topicName);
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub-1").subscribe().close();
@Cleanup
final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topicName).create();
for (int i = 0; i < 100; i ++) {
producer.sendAsync("test-" + i);
}
@Cleanup
final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionMode(SubscriptionMode.NonDurable)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("sub-2").subscribe();
int count = 0;
while(true) {
final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
if (msg != null) {
consumer.acknowledge(msg);
count++;
} else {
break;
}
}
Assert.assertEquals(count, 100);
Thread.sleep(3_000);
for (int i = 0; i < 100; i ++) {
producer.sendAsync("test-" + i);
}
while(true) {
final Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
if (msg != null) {
consumer.acknowledge(msg);
count++;
} else {
break;
}
}
Assert.assertEquals(count, 200);
}
}

0 comments on commit 8f3e1fd

Please sign in to comment.