Skip to content

Commit

Permalink
[fix][broker] Create new ledger after the current ledger is closed (a…
Browse files Browse the repository at this point in the history
…pache#22034)

(cherry picked from commit d0ca983)
(cherry picked from commit 54042df)
  • Loading branch information
liangyepianzhou authored and mukesh-ctds committed Apr 16, 2024
1 parent c7d1521 commit eed3d17
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,7 @@ protected void internalResetCursor(PositionImpl proposedReadPosition,
if (proposedReadPosition.equals(PositionImpl.EARLIEST)) {
newReadPosition = ledger.getFirstPosition();
} else if (proposedReadPosition.equals(PositionImpl.LATEST)) {
newReadPosition = ledger.getLastPosition().getNext();
newReadPosition = ledger.getNextValidPosition(ledger.getLastPosition());
} else {
newReadPosition = proposedReadPosition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1757,10 +1757,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) {

maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);

if (!pendingAddEntries.isEmpty()) {
// Need to create a new ledger to write pending entries
createLedgerAfterClosed();
}
createLedgerAfterClosed();
}

@Override
Expand Down Expand Up @@ -1815,7 +1812,6 @@ public void closeComplete(int rc, LedgerHandle lh, Object o) {
}

ledgerClosed(lh);
createLedgerAfterClosed();
}
}, null);
}
Expand Down Expand Up @@ -2696,7 +2692,16 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
} else {
PositionImpl slowestReaderPosition = cursors.getSlowestReaderPosition();
if (slowestReaderPosition != null) {
slowestReaderLedgerId = slowestReaderPosition.getLedgerId();
// The slowest reader position is the mark delete position.
// If the slowest reader position point the last entry in the ledger x,
// the slowestReaderLedgerId should be x + 1 and the ledger x could be deleted.
LedgerInfo ledgerInfo = ledgers.get(slowestReaderPosition.getLedgerId());
if (ledgerInfo != null && ledgerInfo.getLedgerId() != currentLedger.getId()
&& ledgerInfo.getEntries() == slowestReaderPosition.getEntryId() + 1) {
slowestReaderLedgerId = slowestReaderPosition.getLedgerId() + 1;
} else {
slowestReaderLedgerId = slowestReaderPosition.getLedgerId();
}
} else {
promise.completeExceptionally(new ManagedLedgerException("Couldn't find reader position"));
trimmerMutex.unlock();
Expand Down Expand Up @@ -3740,7 +3745,11 @@ public PositionImpl getValidPositionAfterSkippedEntries(final PositionImpl posit
PositionImpl skippedPosition = position.getPositionAfterEntries(skippedEntryNum);
while (!isValidPosition(skippedPosition)) {
Long nextLedgerId = ledgers.ceilingKey(skippedPosition.getLedgerId() + 1);
// This means it has jumped to the last position
if (nextLedgerId == null) {
if (currentLedgerEntries == 0) {
return PositionImpl.get(currentLedger.getId(), 0);
}
return lastConfirmedEntry.getNext();
}
skippedPosition = PositionImpl.get(nextLedgerId, 0);
Expand Down Expand Up @@ -4532,7 +4541,6 @@ public boolean checkInactiveLedgerAndRollOver() {
}

ledgerClosed(lh);
createLedgerAfterClosed();
// we do not create ledger here, since topic is inactive for a long time.
}, null);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,16 @@ void readTwice() throws Exception {

@Test
void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception {
final int entryCount = 10;
final int entryCount = 9;
final String cursorName = "c1";
final String mlName = "ml_test";
final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
// Avoid creating new empty ledger after the last ledger is full and remove fail future.
final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(2);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);

ManagedCursor cursor = ml.openCursor("c1");
Position lastEntry = null;
for (int i = 0; i < 10; i++) {
for (int i = 0; i < entryCount; i++) {
lastEntry = ml.addEntry(("entry-" + i).getBytes(Encoding));
}

Expand Down Expand Up @@ -809,7 +810,7 @@ void testResetCursor1() throws Exception {
assertEquals(firstInNext, cursor.getReadPosition());
moveStatus.set(false);

// reset to a non exist larger ledger should point to the first non-exist entry in the last ledger
// reset to a non exist larger ledger should point to the first non-exist entry in the next ledger
PositionImpl latest = new PositionImpl(last.getLedgerId() + 2, 0);
try {
cursor.resetCursor(latest);
Expand All @@ -818,11 +819,13 @@ void testResetCursor1() throws Exception {
log.warn("error in reset cursor", e.getCause());
}
assertTrue(moveStatus.get());
PositionImpl lastPos = new PositionImpl(last.getLedgerId(), last.getEntryId() + 1);
assertEquals(lastPos, cursor.getReadPosition());
PositionImpl lastPos = new PositionImpl(last.getLedgerId() + 1, 0);
Awaitility.await().untilAsserted(() -> {
assertEquals(lastPos, cursor.getReadPosition());
});
moveStatus.set(false);

// reset to latest should point to the first non-exist entry in the last ledger
// reset to latest should point to the first non-exist entry in the next ledger
PositionImpl anotherLast = PositionImpl.LATEST;
try {
cursor.resetCursor(anotherLast);
Expand Down Expand Up @@ -1701,7 +1704,7 @@ void testMarkDeleteTwice(boolean useOpenRangeSet) throws Exception {

@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testSkipEntries(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet).setMaxEntriesPerLedger(2));
Position pos;

Expand All @@ -1715,14 +1718,19 @@ void testSkipEntries(boolean useOpenRangeSet) throws Exception {
pos = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
pos = ledger.addEntry("dummy-entry-2".getBytes(Encoding));

// Wait new empty ledger created completely.
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), 2);
});

// skip entries in same ledger
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
assertEquals(c1.getNumberOfEntries(), 1);

// skip entries until end of ledger
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
assertEquals(c1.getNumberOfEntries(), 0);
assertEquals(c1.getReadPosition(), pos.getNext());
assertEquals(c1.getReadPosition(), new PositionImpl(ledger.currentLedger.getId(), 0));
assertEquals(c1.getMarkDeletedPosition(), pos);

// skip entries across ledgers
Expand All @@ -1737,7 +1745,10 @@ void testSkipEntries(boolean useOpenRangeSet) throws Exception {
c1.skipEntries(10, IndividualDeletedEntries.Exclude);
assertEquals(c1.getNumberOfEntries(), 0);
assertFalse(c1.hasMoreEntries());
assertEquals(c1.getReadPosition(), pos.getNext());
// We can not check the ledger id because a cursor leger can be created.
Awaitility.await().untilAsserted(() -> {
assertEquals(c1.getReadPosition().getEntryId(), 0);
});
assertEquals(c1.getMarkDeletedPosition(), pos);
}

Expand All @@ -1759,7 +1770,7 @@ void testSkipEntriesWithIndividualDeletedMessages(boolean useOpenRangeSet) throw

c1.skipEntries(3, IndividualDeletedEntries.Exclude);
assertEquals(c1.getNumberOfEntries(), 0);
assertEquals(c1.getReadPosition(), pos5.getNext());
assertEquals(c1.getReadPosition(), new PositionImpl(pos5.getLedgerId() + 1, 0));
assertEquals(c1.getMarkDeletedPosition(), pos5);

pos1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testGetManagedLedgerInfoWithClose() throws Exception {

ManagedLedgerInfo info = factory.getManagedLedgerInfo("testGetManagedLedgerInfo");

assertEquals(info.ledgers.size(), 4);
assertEquals(info.ledgers.size(), 5);

assertEquals(info.ledgers.get(0).ledgerId, 3);
assertEquals(info.ledgers.get(1).ledgerId, 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
Expand Down Expand Up @@ -1120,9 +1121,13 @@ public void testTrimmer() throws Exception {

cursor.markDelete(lastPosition);

while (ledger.getNumberOfEntries() != 2) {
Thread.sleep(10);
}
Awaitility.await().untilAsserted(() -> {
// The number of entries in the ledger should not contain the entry in the mark delete position.
// last position is the position of entry-3.
// cursor.markDelete(lastPosition);
// only entry-4 is left in the ledger.
assertEquals(ledger.getNumberOfEntries(), 1);
});
}

@Test(timeOut = 20000)
Expand Down Expand Up @@ -2436,7 +2441,7 @@ public void testRetentionSize() throws Exception {

Awaitility.await().untilAsserted(() -> {
assertTrue(ml.getTotalSize() <= retentionSizeInMB * 1024 * 1024);
assertEquals(ml.getLedgersInfoAsList().size(), 5);
assertEquals(ml.getLedgersInfoAsList().size(), 6);
});
}

Expand Down Expand Up @@ -2694,9 +2699,17 @@ public void testGetNextValidPosition() throws Exception {

assertEquals(ledger.getNextValidPosition((PositionImpl) c1.getMarkDeletedPosition()), p1);
assertEquals(ledger.getNextValidPosition(p1), p2);
assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId() + 1, 0));
});
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)),
PositionImpl.get(p3.getLedgerId() + 1, 0));
});
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)),
PositionImpl.get(p3.getLedgerId() + 1, 0));
});
}

/**
Expand Down Expand Up @@ -3035,19 +3048,22 @@ public void testConsumerSubscriptionInitializePosition() throws Exception{
String content = "entry" + i; // 5 bytes
ledger.addEntry(content.getBytes());
}
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.currentLedgerSize, 0);
assertEquals(ledger.ledgers.size(), 1);
});
// Open Cursor also adds cursor into activeCursor-container
ManagedCursor latestCursor = ledger.openCursor("c1", InitialPosition.Latest);
ManagedCursor earliestCursor = ledger.openCursor("c2", InitialPosition.Earliest);

// Since getReadPosition returns the next position, we decrease the entryId by 1
PositionImpl p1 = (PositionImpl) latestCursor.getReadPosition();
PositionImpl p2 = (PositionImpl) earliestCursor.getReadPosition();

Pair<PositionImpl, Long> latestPositionAndCounter = ledger.getLastPositionAndCounter();
Pair<PositionImpl, Long> earliestPositionAndCounter = ledger.getFirstPositionAndCounter();

assertEquals(latestPositionAndCounter.getLeft().getNext(), p1);
assertEquals(earliestPositionAndCounter.getLeft().getNext(), p2);
// The read position is the valid next position of the last position instead of the next position.
assertEquals(ledger.getNextValidPosition(latestPositionAndCounter.getLeft()), latestCursor.getReadPosition());
assertEquals(ledger.getNextValidPosition(earliestPositionAndCounter.getLeft()), p2);

assertEquals(latestPositionAndCounter.getRight().longValue(), totalInsertedEntries);
assertEquals(earliestPositionAndCounter.getRight().longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog(false));
Expand Down Expand Up @@ -3471,7 +3487,8 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
ledger.addEntry(new byte[1024 * 1024]);
}

Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2));
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(),
msgNum / 2 + 1));
List<Entry> entries = cursor.readEntries(msgNum);
Assert.assertEquals(msgNum, entries.size());

Expand All @@ -3486,6 +3503,9 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
stateUpdater.setAccessible(true);
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
ledger.trimConsumedLedgersInBackground(completableFuture);
completableFuture.get();
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
}
Expand Down Expand Up @@ -3651,8 +3671,12 @@ public void testInvalidateReadHandleWhenDeleteLedger() throws Exception {
}
List<Entry> entryList = cursor.readEntries(3);
assertEquals(entryList.size(), 3);
assertEquals(ledger.ledgers.size(), 3);
assertEquals(ledger.ledgerCache.size(), 2);
Awaitility.await().untilAsserted(() -> {
log.error("ledger.ledgerCache.size() : " + ledger.ledgerCache.size());
assertEquals(ledger.ledgerCache.size(), 3);
assertEquals(ledger.ledgers.size(), 4);
});

cursor.clearBacklog();
cursor2.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
Expand Down Expand Up @@ -3681,15 +3705,15 @@ public void testLockReleaseWhenTrimLedger() throws Exception {
}
List<Entry> entryList = cursor.readEntries(entries);
assertEquals(entryList.size(), entries);
assertEquals(ledger.ledgers.size(), entries);
assertEquals(ledger.ledgerCache.size(), entries - 1);
assertEquals(ledger.ledgers.size() - 1, entries);
assertEquals(ledger.ledgerCache.size() - 1, entries - 1);
cursor.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
// Cleanup fails because ManagedLedgerNotFoundException is thrown
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), entries);
assertEquals(ledger.ledgerCache.size(), entries - 1);
assertEquals(ledger.ledgers.size() - 1, entries);
assertEquals(ledger.ledgerCache.size() - 1, entries - 1);
});
// The lock is released even if an ManagedLedgerNotFoundException occurs, so it can be called repeatedly
Awaitility.await().untilAsserted(() ->
Expand All @@ -3715,25 +3739,25 @@ public void testInvalidateReadHandleWhenConsumed() throws Exception {
}
List<Entry> entryList = cursor.readEntries(3);
assertEquals(entryList.size(), 3);
assertEquals(ledger.ledgers.size(), 3);
assertEquals(ledger.ledgerCache.size(), 2);
assertEquals(ledger.ledgers.size(), 4);
assertEquals(ledger.ledgerCache.size(), 3);
cursor.clearBacklog();
cursor2.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), 3);
assertEquals(ledger.ledgers.size(), 4);
assertEquals(ledger.ledgerCache.size(), 0);
});

// Verify the ReadHandle can be reopened.
ManagedCursor cursor3 = ledger.openCursor("test-cursor3", InitialPosition.Earliest);
entryList = cursor3.readEntries(3);
assertEquals(entryList.size(), 3);
assertEquals(ledger.ledgerCache.size(), 2);
assertEquals(ledger.ledgerCache.size(), 3);
cursor3.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), 3);
assertEquals(ledger.ledgers.size(), 4);
assertEquals(ledger.ledgerCache.size(), 0);
});

Expand Down Expand Up @@ -4255,4 +4279,45 @@ public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce
verify(ledgerOffloader, times(0))
.deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap());
}


@DataProvider(name = "closeLedgerByAddEntry")
public Object[][] closeLedgerByAddEntry() {
return new Object[][] {{Boolean.TRUE}, {Boolean.FALSE}};
}

@Test(dataProvider = "closeLedgerByAddEntry")
public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry) throws Exception {
// Setup: Open a manageLedger with one initial entry.
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("testDeleteCurrentLedgerWhenItIsClosed",
config));
assertEquals(ml.ledgers.size(), 1);
ml.addEntry(new byte[4]);
// Act: Trigger the rollover of the current ledger.
long currentLedgerID = ml.currentLedger.getId();
ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS);
Thread.sleep(10);
if (closeLedgerByAddEntry) {
// Detect the current ledger is full before written entry and close the ledger after writing completely.
ml.addEntry(new byte[4]);
} else {
// Detect the current ledger is full by the timed task. (Imitate: the timed task `checkLedgerRollTask` call
// `rollCurrentLedgerIfFull` periodically).
ml.rollCurrentLedgerIfFull();
// the ledger closing in the `rollCurrentLedgerIfFull` is async, so the wait is needed.
Awaitility.await().untilAsserted(() -> assertEquals(ml.ledgers.size(), 2));
}
// Act: Trigger trimming to delete the previous current ledger.
ml.internalTrimLedgers(false, Futures.NULL_PROMISE);
// Verify: A new ledger will be opened after the current ledger is closed and the previous current ledger can be
// deleted.
Awaitility.await().untilAsserted(() -> {
assertEquals(ml.state, ManagedLedgerImpl.State.LedgerOpened);
assertEquals(ml.ledgers.size(), 1);
assertNotEquals(currentLedgerID, ml.currentLedger.getId());
assertEquals(ml.currentLedgerEntries, 0);
});
}
}
Loading

0 comments on commit eed3d17

Please sign in to comment.