Skip to content

Commit

Permalink
[Broker] Fix the backlog issue with --precise-backlog=true (apache#10966
Browse files Browse the repository at this point in the history
)

## Motivation
fix backlog issuse with --precise-backlog=true.
Now when `managedLedger` create a new `ledger` complete. if `markDelete` is the `previousLedger` LAC it will delete the previousLedger from `managedLedger` . when get backlog we will use range.close to get `getNumberOfEntries` -1, if previousLedger not exist will get the wrong number. 

![image](https://user-images.githubusercontent.com/39078850/122502847-fce47800-d029-11eb-81b3-abc9e595d93e.png)

## implement
 use range.openClose() to `getBacklog`.
### Verifying this change
Add the tests for it
  • Loading branch information
congbobo184 authored Jun 21, 2021
1 parent 2f8c175 commit e3a97ee
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -883,13 +883,13 @@ public long getNumberOfEntriesInBacklog(boolean isPrecise) {
messagesConsumedCounter, markDeletePosition, readPosition);
}
if (isPrecise) {
return getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())) - 1;
return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}

long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
if (backlog < 0) {
// In some case the counters get incorrect values, fall back to the precise backlog count
backlog = getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())) - 1;
backlog = getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}

return backlog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
Expand Down Expand Up @@ -3526,5 +3528,34 @@ public void testCursorCheckReadPositionChanged() throws Exception {
ledger.close();
}


@Test
public void testCursorGetBacklog() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("get-backlog", managedLedgerConfig);
ManagedCursor managedCursor = ledger.openCursor("test");

Position position = ledger.addEntry("test".getBytes(Encoding));
ledger.addEntry("test".getBytes(Encoding));
Position position1 = ledger.addEntry("test".getBytes(Encoding));
ledger.addEntry("test".getBytes(Encoding));

Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(true), 4);
Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
Field field = ManagedLedgerImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);

((ConcurrentSkipListMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>) field.get(ledger)).remove(position.getLedgerId());
field = ManagedCursorImpl.class.getDeclaredField("markDeletePosition");
field.setAccessible(true);
field.set(managedCursor, PositionImpl.get(position1.getLedgerId(), -1));


Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(true), 2);
Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}

0 comments on commit e3a97ee

Please sign in to comment.