Skip to content

Commit

Permalink
Only skipping invalid md-position when recovering cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Jan 26, 2024
1 parent 1ae43b7 commit 9f80e7c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,16 @@ private void recoverBatchDeletedIndexes (
private void recoveredCursor(PositionImpl position, Map<String, Long> properties,
Map<String, String> cursorProperties,
LedgerHandle recoveredFromCursorLedger) {
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
// we need to move to the next existing ledger
if (position.getEntryId() == -1L && !ledger.ledgerExists(position.getLedgerId())) {
Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
if (nextExistingLedger == null) {
log.info("[{}] [{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name,
position);
}
position = nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, -1) : position;
}
if (position.compareTo(ledger.getLastPosition()) > 0) {
log.warn("[{}] [{}] Current position {} is ahead of last position {}", ledger.getName(), name, position,
ledger.getLastPosition());
Expand All @@ -690,7 +700,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
markDeletePosition = position;
persistentMarkDeletePosition = position;
inProgressMarkDeletePersistPosition = null;
readPosition = position.getNext();
readPosition = ledger.getNextValidPosition(position);
ledger.onCursorReadPositionUpdated(this, readPosition);
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null);
// assign cursor-ledger so, it can be deleted when new ledger will be switched
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2978,7 +2978,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter2.await();
}

// @Test(timeOut = 20000)
@Test(timeOut = 20000)
public void testReopenMultipleTimes() throws Exception {
ManagedLedger ledger = factory.open("testReopenMultipleTimes");
ManagedCursor c1 = ledger.openCursor("c1");
Expand Down Expand Up @@ -3377,7 +3377,7 @@ public void testEstimatedUnackedSize() throws Exception {
assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 10 * entryData.length);
}

// @Test(timeOut = 20000)
@Test(timeOut = 20000)
public void testRecoverCursorAheadOfLastPosition() throws Exception {
final String mlName = "my_test_ledger";
final PositionImpl lastPosition = new PositionImpl(1L, 10L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2836,7 +2836,7 @@ public void testActiveDeactiveCursor() throws Exception {
ledger.close();
}

// @Test
@Test
public void testCursorRecoveryForEmptyLedgers() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testCursorRecoveryForEmptyLedgers");
ManagedCursor c1 = ledger.openCursor("c1");
Expand Down

0 comments on commit 9f80e7c

Please sign in to comment.