Skip to content

Commit

Permalink
[Fix][broker] Fix NPE when ledger id not found in OpReadEntry (apac…
Browse files Browse the repository at this point in the history
…he#15837)

(cherry picked from commit 7a3ad61)
  • Loading branch information
mattisonchao authored and JiangHaiting committed Aug 7, 2022
1 parent c397e91 commit 49a6e4b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1969,14 +1969,9 @@ void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
}
}

PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) {
PositionImpl startReadOperationOnLedger(PositionImpl position) {
Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
if (null == ledgerId) {
opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key) method is used to return the " +
"least key greater than or equal to the given key, or null if there is no such key"), null);
}

if (ledgerId != position.getLedgerId()) {
if (ledgerId != null && ledgerId != position.getLedgerId()) {
// The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need
// to skip on the next available ledger
position = new PositionImpl(ledgerId, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class OpReadEntry implements ReadEntriesCallback {
public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
ReadEntriesCallback callback, Object ctx) {
OpReadEntry op = RECYCLER.get();
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
op.cursor = cursor;
op.count = count;
op.callback = callback;
Expand Down Expand Up @@ -132,7 +132,7 @@ void checkReadCompletion() {
if (entries.size() < count && cursor.hasMoreEntries()) {
// We still have more entries to read from the next ledger, schedule a new async operation
cursor.ledger.getExecutor().execute(safeRun(() -> {
readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition);
cursor.ledger.asyncReadEntries(OpReadEntry.this);
}));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -409,6 +411,31 @@ public void spanningMultipleLedgers() throws Exception {
ledger.close();
}

@Test
public void testStartReadOperationOnLedgerWithEmptyLedgers() throws ManagedLedgerException, InterruptedException {
ManagedLedger ledger = factory.open("my_test_ledger_1");
ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
NavigableMap<Long, LedgerInfo> ledgers = ledgerImpl.getLedgersInfo();
LedgerInfo ledgerInfo = ledgers.firstEntry().getValue();
ledgers.clear();
ManagedCursor c1 = ledger.openCursor("c1");
PositionImpl position = new PositionImpl(ledgerInfo.getLedgerId(), 0);
OpReadEntry opReadEntry = OpReadEntry.create((ManagedCursorImpl) c1, position, 20,
new ReadEntriesCallback() {

@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {

}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

}
}, null);
Assert.assertEquals(opReadEntry.readPosition, position);
}

@Test(timeOut = 20000)
public void spanningMultipleLedgersWithSize() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1000000);
Expand Down

0 comments on commit 49a6e4b

Please sign in to comment.