Skip to content

Commit 5cc8392

Browse files
committed
fix bug: ignoring flow control message since cursor reset in progress
1 parent 3752a11 commit 5cc8392

File tree

2 files changed

+14
-0
lines changed

2 files changed

+14
-0
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

+8
Original file line numberDiff line numberDiff line change
@@ -3283,4 +3283,12 @@ public void setState(State state) {
32833283
public ManagedLedgerConfig getConfig() {
32843284
return config;
32853285
}
3286+
3287+
/**
3288+
* check cursor reset status
3289+
* @return true if the cursor reset in progress
3290+
*/
3291+
public boolean resetCursorInProgress() {
3292+
return RESET_CURSOR_IN_PROGRESS_UPDATER.get(this) == TRUE;
3293+
}
32863294
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java

+6
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.bookkeeper.mledger.ManagedLedgerException;
3535
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
3636
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
37+
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
3738
import org.apache.bookkeeper.mledger.impl.PositionImpl;
3839
import org.apache.bookkeeper.mledger.util.SafeRun;
3940
import org.apache.commons.lang3.tuple.Pair;
@@ -278,6 +279,11 @@ private synchronized void internalConsumerFlow(Consumer consumer) {
278279
log.debug("[{}-{}] Ignoring flow control message since consumer is waiting for cursor to be rewinded",
279280
name, consumer);
280281
}
282+
} else if (((ManagedCursorImpl) cursor).resetCursorInProgress()) {
283+
if (log.isDebugEnabled()) {
284+
log.debug("[{}-{}] Ignoring flow control message since cursor reset in progress - cursor {}",
285+
name, consumer, cursor.getName());
286+
}
281287
} else {
282288
if (log.isDebugEnabled()) {
283289
log.debug("[{}-{}] Trigger new read after receiving flow control message", name, consumer);

0 commit comments

Comments
 (0)