diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 972abcfea983d..ccea125ef83f6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1278,7 +1278,7 @@ protected void internalResetCursor(PositionImpl proposedReadPosition, if (proposedReadPosition.equals(PositionImpl.EARLIEST)) { newReadPosition = ledger.getFirstPosition(); } else if (proposedReadPosition.equals(PositionImpl.LATEST)) { - newReadPosition = ledger.getLastPosition().getNext(); + newReadPosition = ledger.getNextValidPosition(ledger.getLastPosition()); } else { newReadPosition = proposedReadPosition; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index dce4860e3e6f0..32ac345629d32 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1757,10 +1757,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) { maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); - if (!pendingAddEntries.isEmpty()) { - // Need to create a new ledger to write pending entries - createLedgerAfterClosed(); - } + createLedgerAfterClosed(); } @Override @@ -1815,7 +1812,6 @@ public void closeComplete(int rc, LedgerHandle lh, Object o) { } ledgerClosed(lh); - createLedgerAfterClosed(); } }, null); } @@ -2696,7 +2692,16 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { } else { PositionImpl slowestReaderPosition = cursors.getSlowestReaderPosition(); if (slowestReaderPosition != null) { - slowestReaderLedgerId = slowestReaderPosition.getLedgerId(); + // The slowest reader position is the mark delete position. + // If the slowest reader position point the last entry in the ledger x, + // the slowestReaderLedgerId should be x + 1 and the ledger x could be deleted. + LedgerInfo ledgerInfo = ledgers.get(slowestReaderPosition.getLedgerId()); + if (ledgerInfo != null && ledgerInfo.getLedgerId() != currentLedger.getId() + && ledgerInfo.getEntries() == slowestReaderPosition.getEntryId() + 1) { + slowestReaderLedgerId = slowestReaderPosition.getLedgerId() + 1; + } else { + slowestReaderLedgerId = slowestReaderPosition.getLedgerId(); + } } else { promise.completeExceptionally(new ManagedLedgerException("Couldn't find reader position")); trimmerMutex.unlock(); @@ -3740,7 +3745,11 @@ public PositionImpl getValidPositionAfterSkippedEntries(final PositionImpl posit PositionImpl skippedPosition = position.getPositionAfterEntries(skippedEntryNum); while (!isValidPosition(skippedPosition)) { Long nextLedgerId = ledgers.ceilingKey(skippedPosition.getLedgerId() + 1); + // This means it has jumped to the last position if (nextLedgerId == null) { + if (currentLedgerEntries == 0) { + return PositionImpl.get(currentLedger.getId(), 0); + } return lastConfirmedEntry.getNext(); } skippedPosition = PositionImpl.get(nextLedgerId, 0); @@ -4532,7 +4541,6 @@ public boolean checkInactiveLedgerAndRollOver() { } ledgerClosed(lh); - createLedgerAfterClosed(); // we do not create ledger here, since topic is inactive for a long time. }, null); return true; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 644f53c3a522d..c9bd64171c15a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -234,15 +234,16 @@ void readTwice() throws Exception { @Test void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception { - final int entryCount = 10; + final int entryCount = 9; final String cursorName = "c1"; final String mlName = "ml_test"; - final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(1); + // Avoid creating new empty ledger after the last ledger is full and remove fail future. + final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(2); ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig); ManagedCursor cursor = ml.openCursor("c1"); Position lastEntry = null; - for (int i = 0; i < 10; i++) { + for (int i = 0; i < entryCount; i++) { lastEntry = ml.addEntry(("entry-" + i).getBytes(Encoding)); } @@ -809,7 +810,7 @@ void testResetCursor1() throws Exception { assertEquals(firstInNext, cursor.getReadPosition()); moveStatus.set(false); - // reset to a non exist larger ledger should point to the first non-exist entry in the last ledger + // reset to a non exist larger ledger should point to the first non-exist entry in the next ledger PositionImpl latest = new PositionImpl(last.getLedgerId() + 2, 0); try { cursor.resetCursor(latest); @@ -818,11 +819,13 @@ void testResetCursor1() throws Exception { log.warn("error in reset cursor", e.getCause()); } assertTrue(moveStatus.get()); - PositionImpl lastPos = new PositionImpl(last.getLedgerId(), last.getEntryId() + 1); - assertEquals(lastPos, cursor.getReadPosition()); + PositionImpl lastPos = new PositionImpl(last.getLedgerId() + 1, 0); + Awaitility.await().untilAsserted(() -> { + assertEquals(lastPos, cursor.getReadPosition()); + }); moveStatus.set(false); - // reset to latest should point to the first non-exist entry in the last ledger + // reset to latest should point to the first non-exist entry in the next ledger PositionImpl anotherLast = PositionImpl.LATEST; try { cursor.resetCursor(anotherLast); @@ -1701,7 +1704,7 @@ void testMarkDeleteTwice(boolean useOpenRangeSet) throws Exception { @Test(timeOut = 20000, dataProvider = "useOpenRangeSet") void testSkipEntries(boolean useOpenRangeSet) throws Exception { - ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig() + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", new ManagedLedgerConfig() .setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet).setMaxEntriesPerLedger(2)); Position pos; @@ -1715,6 +1718,11 @@ void testSkipEntries(boolean useOpenRangeSet) throws Exception { pos = ledger.addEntry("dummy-entry-1".getBytes(Encoding)); pos = ledger.addEntry("dummy-entry-2".getBytes(Encoding)); + // Wait new empty ledger created completely. + Awaitility.await().untilAsserted(() -> { + assertEquals(ledger.ledgers.size(), 2); + }); + // skip entries in same ledger c1.skipEntries(1, IndividualDeletedEntries.Exclude); assertEquals(c1.getNumberOfEntries(), 1); @@ -1722,7 +1730,7 @@ void testSkipEntries(boolean useOpenRangeSet) throws Exception { // skip entries until end of ledger c1.skipEntries(1, IndividualDeletedEntries.Exclude); assertEquals(c1.getNumberOfEntries(), 0); - assertEquals(c1.getReadPosition(), pos.getNext()); + assertEquals(c1.getReadPosition(), new PositionImpl(ledger.currentLedger.getId(), 0)); assertEquals(c1.getMarkDeletedPosition(), pos); // skip entries across ledgers @@ -1737,7 +1745,10 @@ void testSkipEntries(boolean useOpenRangeSet) throws Exception { c1.skipEntries(10, IndividualDeletedEntries.Exclude); assertEquals(c1.getNumberOfEntries(), 0); assertFalse(c1.hasMoreEntries()); - assertEquals(c1.getReadPosition(), pos.getNext()); + // We can not check the ledger id because a cursor leger can be created. + Awaitility.await().untilAsserted(() -> { + assertEquals(c1.getReadPosition().getEntryId(), 0); + }); assertEquals(c1.getMarkDeletedPosition(), pos); } @@ -1759,7 +1770,7 @@ void testSkipEntriesWithIndividualDeletedMessages(boolean useOpenRangeSet) throw c1.skipEntries(3, IndividualDeletedEntries.Exclude); assertEquals(c1.getNumberOfEntries(), 0); - assertEquals(c1.getReadPosition(), pos5.getNext()); + assertEquals(c1.getReadPosition(), new PositionImpl(pos5.getLedgerId() + 1, 0)); assertEquals(c1.getMarkDeletedPosition(), pos5); pos1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding)); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java index 4f2c3e178773e..a953b140aba63 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java @@ -54,7 +54,7 @@ public void testGetManagedLedgerInfoWithClose() throws Exception { ManagedLedgerInfo info = factory.getManagedLedgerInfo("testGetManagedLedgerInfo"); - assertEquals(info.ledgers.size(), 4); + assertEquals(info.ledgers.size(), 5); assertEquals(info.ledgers.get(0).ledgerId, 3); assertEquals(info.ledgers.get(1).ledgerId, 4); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index f918ffdc755d6..c28a32d9ec2e1 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -36,6 +36,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertSame; @@ -1120,9 +1121,13 @@ public void testTrimmer() throws Exception { cursor.markDelete(lastPosition); - while (ledger.getNumberOfEntries() != 2) { - Thread.sleep(10); - } + Awaitility.await().untilAsserted(() -> { + // The number of entries in the ledger should not contain the entry in the mark delete position. + // last position is the position of entry-3. + // cursor.markDelete(lastPosition); + // only entry-4 is left in the ledger. + assertEquals(ledger.getNumberOfEntries(), 1); + }); } @Test(timeOut = 20000) @@ -2436,7 +2441,7 @@ public void testRetentionSize() throws Exception { Awaitility.await().untilAsserted(() -> { assertTrue(ml.getTotalSize() <= retentionSizeInMB * 1024 * 1024); - assertEquals(ml.getLedgersInfoAsList().size(), 5); + assertEquals(ml.getLedgersInfoAsList().size(), 6); }); } @@ -2694,9 +2699,17 @@ public void testGetNextValidPosition() throws Exception { assertEquals(ledger.getNextValidPosition((PositionImpl) c1.getMarkDeletedPosition()), p1); assertEquals(ledger.getNextValidPosition(p1), p2); - assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)); - assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)); - assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)); + Awaitility.await().untilAsserted(() -> { + assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId() + 1, 0)); + }); + Awaitility.await().untilAsserted(() -> { + assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)), + PositionImpl.get(p3.getLedgerId() + 1, 0)); + }); + Awaitility.await().untilAsserted(() -> { + assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)), + PositionImpl.get(p3.getLedgerId() + 1, 0)); + }); } /** @@ -3035,19 +3048,22 @@ public void testConsumerSubscriptionInitializePosition() throws Exception{ String content = "entry" + i; // 5 bytes ledger.addEntry(content.getBytes()); } + Awaitility.await().untilAsserted(() -> { + assertEquals(ledger.currentLedgerSize, 0); + assertEquals(ledger.ledgers.size(), 1); + }); // Open Cursor also adds cursor into activeCursor-container ManagedCursor latestCursor = ledger.openCursor("c1", InitialPosition.Latest); ManagedCursor earliestCursor = ledger.openCursor("c2", InitialPosition.Earliest); // Since getReadPosition returns the next position, we decrease the entryId by 1 - PositionImpl p1 = (PositionImpl) latestCursor.getReadPosition(); PositionImpl p2 = (PositionImpl) earliestCursor.getReadPosition(); Pair latestPositionAndCounter = ledger.getLastPositionAndCounter(); Pair earliestPositionAndCounter = ledger.getFirstPositionAndCounter(); - - assertEquals(latestPositionAndCounter.getLeft().getNext(), p1); - assertEquals(earliestPositionAndCounter.getLeft().getNext(), p2); + // The read position is the valid next position of the last position instead of the next position. + assertEquals(ledger.getNextValidPosition(latestPositionAndCounter.getLeft()), latestCursor.getReadPosition()); + assertEquals(ledger.getNextValidPosition(earliestPositionAndCounter.getLeft()), p2); assertEquals(latestPositionAndCounter.getRight().longValue(), totalInsertedEntries); assertEquals(earliestPositionAndCounter.getRight().longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog(false)); @@ -3471,7 +3487,8 @@ public void testManagedLedgerRollOverIfFull() throws Exception { ledger.addEntry(new byte[1024 * 1024]); } - Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2)); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), + msgNum / 2 + 1)); List entries = cursor.readEntries(msgNum); Assert.assertEquals(msgNum, entries.size()); @@ -3486,6 +3503,9 @@ public void testManagedLedgerRollOverIfFull() throws Exception { stateUpdater.setAccessible(true); stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened); ledger.rollCurrentLedgerIfFull(); + CompletableFuture completableFuture = new CompletableFuture<>(); + ledger.trimConsumedLedgersInBackground(completableFuture); + completableFuture.get(); Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1)); Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0)); } @@ -3651,8 +3671,12 @@ public void testInvalidateReadHandleWhenDeleteLedger() throws Exception { } List entryList = cursor.readEntries(3); assertEquals(entryList.size(), 3); - assertEquals(ledger.ledgers.size(), 3); - assertEquals(ledger.ledgerCache.size(), 2); + Awaitility.await().untilAsserted(() -> { + log.error("ledger.ledgerCache.size() : " + ledger.ledgerCache.size()); + assertEquals(ledger.ledgerCache.size(), 3); + assertEquals(ledger.ledgers.size(), 4); + }); + cursor.clearBacklog(); cursor2.clearBacklog(); ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE); @@ -3681,15 +3705,15 @@ public void testLockReleaseWhenTrimLedger() throws Exception { } List entryList = cursor.readEntries(entries); assertEquals(entryList.size(), entries); - assertEquals(ledger.ledgers.size(), entries); - assertEquals(ledger.ledgerCache.size(), entries - 1); + assertEquals(ledger.ledgers.size() - 1, entries); + assertEquals(ledger.ledgerCache.size() - 1, entries - 1); cursor.clearBacklog(); ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE); ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE); // Cleanup fails because ManagedLedgerNotFoundException is thrown Awaitility.await().untilAsserted(() -> { - assertEquals(ledger.ledgers.size(), entries); - assertEquals(ledger.ledgerCache.size(), entries - 1); + assertEquals(ledger.ledgers.size() - 1, entries); + assertEquals(ledger.ledgerCache.size() - 1, entries - 1); }); // The lock is released even if an ManagedLedgerNotFoundException occurs, so it can be called repeatedly Awaitility.await().untilAsserted(() -> @@ -3715,13 +3739,13 @@ public void testInvalidateReadHandleWhenConsumed() throws Exception { } List entryList = cursor.readEntries(3); assertEquals(entryList.size(), 3); - assertEquals(ledger.ledgers.size(), 3); - assertEquals(ledger.ledgerCache.size(), 2); + assertEquals(ledger.ledgers.size(), 4); + assertEquals(ledger.ledgerCache.size(), 3); cursor.clearBacklog(); cursor2.clearBacklog(); ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE); Awaitility.await().untilAsserted(() -> { - assertEquals(ledger.ledgers.size(), 3); + assertEquals(ledger.ledgers.size(), 4); assertEquals(ledger.ledgerCache.size(), 0); }); @@ -3729,11 +3753,11 @@ public void testInvalidateReadHandleWhenConsumed() throws Exception { ManagedCursor cursor3 = ledger.openCursor("test-cursor3", InitialPosition.Earliest); entryList = cursor3.readEntries(3); assertEquals(entryList.size(), 3); - assertEquals(ledger.ledgerCache.size(), 2); + assertEquals(ledger.ledgerCache.size(), 3); cursor3.clearBacklog(); ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE); Awaitility.await().untilAsserted(() -> { - assertEquals(ledger.ledgers.size(), 3); + assertEquals(ledger.ledgers.size(), 4); assertEquals(ledger.ledgerCache.size(), 0); }); @@ -4255,4 +4279,45 @@ public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce verify(ledgerOffloader, times(0)) .deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap()); } + + + @DataProvider(name = "closeLedgerByAddEntry") + public Object[][] closeLedgerByAddEntry() { + return new Object[][] {{Boolean.TRUE}, {Boolean.FALSE}}; + } + + @Test(dataProvider = "closeLedgerByAddEntry") + public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry) throws Exception { + // Setup: Open a manageLedger with one initial entry. + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("testDeleteCurrentLedgerWhenItIsClosed", + config)); + assertEquals(ml.ledgers.size(), 1); + ml.addEntry(new byte[4]); + // Act: Trigger the rollover of the current ledger. + long currentLedgerID = ml.currentLedger.getId(); + ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS); + Thread.sleep(10); + if (closeLedgerByAddEntry) { + // Detect the current ledger is full before written entry and close the ledger after writing completely. + ml.addEntry(new byte[4]); + } else { + // Detect the current ledger is full by the timed task. (Imitate: the timed task `checkLedgerRollTask` call + // `rollCurrentLedgerIfFull` periodically). + ml.rollCurrentLedgerIfFull(); + // the ledger closing in the `rollCurrentLedgerIfFull` is async, so the wait is needed. + Awaitility.await().untilAsserted(() -> assertEquals(ml.ledgers.size(), 2)); + } + // Act: Trigger trimming to delete the previous current ledger. + ml.internalTrimLedgers(false, Futures.NULL_PROMISE); + // Verify: A new ledger will be opened after the current ledger is closed and the previous current ledger can be + // deleted. + Awaitility.await().untilAsserted(() -> { + assertEquals(ml.state, ManagedLedgerImpl.State.LedgerOpened); + assertEquals(ml.ledgers.size(), 1); + assertNotEquals(currentLedgerID, ml.currentLedger.getId()); + assertEquals(ml.currentLedgerEntries, 0); + }); + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java index 1e1f7df0a46d5..82141bfd0eeeb 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java @@ -589,12 +589,12 @@ void subscribeToEarliestPositionWithImmediateDeletion() throws Exception { /* Position p1 = */ ledger.addEntry("entry-1".getBytes()); /* Position p2 = */ ledger.addEntry("entry-2".getBytes()); - Position p3 = ledger.addEntry("entry-3".getBytes()); + /* Position p3 = */ ledger.addEntry("entry-3".getBytes()); Thread.sleep(300); ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST); - assertEquals(c1.getReadPosition(), p3); - assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(5, -1)); + assertEquals(c1.getReadPosition(), new PositionImpl(6, 0)); + assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(6, -1)); } @Test // (timeOut = 20000) @@ -723,9 +723,10 @@ public void testBacklogStatsWhenDroppingData() throws Exception { CompletableFuture promise = new CompletableFuture<>(); ledger.internalTrimConsumedLedgers(promise); promise.join(); - - assertEquals(nonDurableCursor.getNumberOfEntries(), 6); - assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6); + // The mark delete position has moved to position 4:1, and the ledger 4 only has one entry, + // so the ledger 4 can be deleted. nonDurableCursor should has the same backlog with durable cursor. + assertEquals(nonDurableCursor.getNumberOfEntries(), 5); + assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 5); c1.close(); ledger.deleteCursor(c1.getName()); @@ -733,8 +734,8 @@ public void testBacklogStatsWhenDroppingData() throws Exception { ledger.internalTrimConsumedLedgers(promise); promise.join(); - assertEquals(nonDurableCursor.getNumberOfEntries(), 1); - assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 1); + assertEquals(nonDurableCursor.getNumberOfEntries(), 0); + assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0); ledger.close(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java index 4482e9944c0ce..cc4b3f2481152 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java @@ -148,7 +148,10 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { newPos = sourceML.addEntry(data); // new ledger rolled. newPos = sourceML.addEntry(data); - Awaitility.await().untilAsserted(() -> assertEquals(shadowML.ledgers.size(), 5)); + Awaitility.await().untilAsserted(() -> { + assertEquals(shadowML.ledgers.size(), 6); + assertEquals(shadowML.currentLedgerEntries, 0); + }); assertEquals(future.get(), fakePos); // LCE should be updated. log.info("3.Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 0ac5fdaef1599..3918dcbe86d66 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -228,7 +228,7 @@ public void testBacklogQuotaWithReader() throws Exception { // non-durable mes should still assertEquals(stats.getSubscriptions().size(), 1); long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog(); - assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER, + assertEquals(nonDurableSubscriptionBacklog, 0, "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); MessageIdImpl msgId = null; @@ -254,9 +254,6 @@ public void testBacklogQuotaWithReader() throws Exception { // check there is only one ledger left assertEquals(internalStats.ledgers.size(), 1); - - // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER - assertEquals(internalStats.ledgers.get(0).ledgerId, finalMsgId.getLedgerId()); }); // check reader can still read with out error @@ -303,10 +300,10 @@ public void testTriggerBacklogQuotaSizeWithReader() throws Exception { TopicStats stats = getTopicStats(topic1); // overall backlogSize should be zero because we only have readers assertEquals(stats.getBacklogSize(), 0, "backlog size is [" + stats.getBacklogSize() + "]"); - // non-durable mes should still assertEquals(stats.getSubscriptions().size(), 1); long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog(); - assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER, + // All the full ledgers should be deleted. + assertEquals(nonDurableSubscriptionBacklog, 0, "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); MessageIdImpl messageId = null; try { @@ -327,8 +324,8 @@ public void testTriggerBacklogQuotaSizeWithReader() throws Exception { // check there is only one ledger left assertEquals(internalStats.ledgers.size(), 1); - // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER - assertEquals(internalStats.ledgers.get(0).ledgerId, finalMessageId.getLedgerId()); + // check if it's the expected ledger id given MAX_ENTRIES_PER_LEDGER + assertEquals(internalStats.ledgers.get(0).ledgerId, finalMessageId.getLedgerId() + 1); }); // check reader can still read with out error diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 40649a4164047..42b9358911a69 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -191,9 +191,9 @@ public void testSkipCorruptDataLedger() throws Exception { .build(); final String ns1 = "prop/usc/crash-broker"; - final int totalMessages = 100; + final int totalMessages = 99; final int totalDataLedgers = 5; - final int entriesPerLedger = totalMessages / totalDataLedgers; + final int entriesPerLedger = 20; try { admin.namespaces().createNamespace(ns1); @@ -273,9 +273,9 @@ public void testSkipCorruptDataLedger() throws Exception { retryStrategically((test) -> config.isAutoSkipNonRecoverableData(), 5, 100); - // (5) consumer will be able to consume 20 messages from last non-deleted ledger + // (5) consumer will be able to consume 19 messages from last non-deleted ledger consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name").subscribe(); - for (int i = 0; i < entriesPerLedger; i++) { + for (int i = 0; i < entriesPerLedger - 1; i++) { msg = consumer.receive(); System.out.println(i); consumer.acknowledge(msg); @@ -296,9 +296,9 @@ public void testTruncateCorruptDataLedger() throws Exception { .statsInterval(0, TimeUnit.SECONDS) .build(); - final int totalMessages = 100; + final int totalMessages = 99; final int totalDataLedgers = 5; - final int entriesPerLedger = totalMessages / totalDataLedgers; + final int entriesPerLedger = 20; final String tenant = "prop"; try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java index 5252407892eea..3b2f3cf215ea3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java @@ -231,28 +231,43 @@ public void testBookieIsolation() throws Exception { LedgerManager ledgerManager = getLedgerManager(bookie1); // namespace: ns1 - ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1.getManagedLedger(); - assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + ManagedLedgerImpl ml1 = (ManagedLedgerImpl) topic1.getManagedLedger(); + // totalLedgers = totalPublish / totalEntriesPerLedger. (totalPublish = 100, totalEntriesPerLedger = 20.) + // The last ledger is full, a new empty ledger will be created. + // The ledger is created async, so adding a wait is needed. + Awaitility.await().untilAsserted(() -> { + assertEquals(ml1.getLedgersInfoAsList().size(), totalLedgers + 1); + assertEquals(ml1.getCurrentLedgerEntries(), 0); + }); // validate ledgers' ensemble with affinity bookies - assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), defaultBookies); + assertAffinityBookies(ledgerManager, ml1.getLedgersInfoAsList(), defaultBookies); // namespace: ns2 - ml = (ManagedLedgerImpl) topic2.getManagedLedger(); - assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger(); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers + 1); + assertEquals(ml2.getCurrentLedgerEntries(), 0); + }); // validate ledgers' ensemble with affinity bookies - assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies); + assertAffinityBookies(ledgerManager, ml2.getLedgersInfoAsList(), isolatedBookies); // namespace: ns3 - ml = (ManagedLedgerImpl) topic3.getManagedLedger(); - assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + ManagedLedgerImpl ml3 = (ManagedLedgerImpl) topic3.getManagedLedger(); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml3.getLedgersInfoAsList().size(), totalLedgers + 1); + assertEquals(ml3.getCurrentLedgerEntries(), 0); + }); // validate ledgers' ensemble with affinity bookies - assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies); + assertAffinityBookies(ledgerManager, ml3.getLedgersInfoAsList(), isolatedBookies); // namespace: ns4 - ml = (ManagedLedgerImpl) topic4.getManagedLedger(); - assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + ManagedLedgerImpl ml4 = (ManagedLedgerImpl) topic4.getManagedLedger(); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml4.getLedgersInfoAsList().size(), totalLedgers + 1); + assertEquals(ml4.getCurrentLedgerEntries(), 0); + }); // validate ledgers' ensemble with affinity bookies - assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies); + assertAffinityBookies(ledgerManager, ml4.getLedgersInfoAsList(), isolatedBookies); ManagedLedgerClientFactory mlFactory = (ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory(); @@ -388,11 +403,14 @@ public void testSetRackInfoAndAffinityGroupDuringProduce() throws Exception { ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger(); // namespace: ns2 - assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers); - + Awaitility.await().untilAsserted(() -> { + assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers + 1); + assertEquals(ml2.getCurrentLedgerEntries(), 0); + }); List ledgers = ml2.getLedgersInfoAsList(); // validate ledgers' ensemble with affinity bookies - for (int i=1; i> ledgerMetaFuture = ledgerManager.readLedgerMetadata(ledgerId); @@ -529,28 +547,40 @@ public void testStrictBookieIsolation() throws Exception { LedgerManager ledgerManager = getLedgerManager(bookie1); // namespace: ns1 - ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1.getManagedLedger(); - assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + ManagedLedgerImpl ml1 = (ManagedLedgerImpl) topic1.getManagedLedger(); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml1.getLedgersInfoAsList().size(), totalLedgers + 1); + assertEquals(ml1.getCurrentLedgerEntries(), 0); + }); // validate ledgers' ensemble with affinity bookies - assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), defaultBookies); + assertAffinityBookies(ledgerManager, ml1.getLedgersInfoAsList(), defaultBookies); // namespace: ns2 - ml = (ManagedLedgerImpl) topic2.getManagedLedger(); - assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger(); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers + 1); + assertEquals(ml2.getCurrentLedgerEntries(), 0); + }); // validate ledgers' ensemble with affinity bookies - assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies); + assertAffinityBookies(ledgerManager, ml2.getLedgersInfoAsList(), isolatedBookies); // namespace: ns3 - ml = (ManagedLedgerImpl) topic3.getManagedLedger(); - assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + ManagedLedgerImpl ml3 = (ManagedLedgerImpl) topic3.getManagedLedger(); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml3.getLedgersInfoAsList().size(), totalLedgers + 1); + assertEquals(ml3.getCurrentLedgerEntries(), 0); + }); // validate ledgers' ensemble with affinity bookies - assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies); + assertAffinityBookies(ledgerManager, ml3.getLedgersInfoAsList(), isolatedBookies); // namespace: ns4 - ml = (ManagedLedgerImpl) topic4.getManagedLedger(); - assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + ManagedLedgerImpl ml4 = (ManagedLedgerImpl) topic4.getManagedLedger(); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml4.getLedgersInfoAsList().size(), totalLedgers + 1); + assertEquals(ml4.getCurrentLedgerEntries(), 0); + }); // validate ledgers' ensemble with affinity bookies - assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies); + assertAffinityBookies(ledgerManager, ml4.getLedgersInfoAsList(), isolatedBookies); ManagedLedgerClientFactory mlFactory = (ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory(); @@ -687,22 +717,32 @@ public void testBookieIsolationWithSecondaryGroup() throws Exception { LedgerManager ledgerManager = getLedgerManager(bookie1); // namespace: ns1 - ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1.getManagedLedger(); - assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + ManagedLedgerImpl ml1 = (ManagedLedgerImpl) topic1.getManagedLedger(); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml1.getLedgersInfoAsList().size(), totalLedgers + 1); + assertEquals(ml1.getCurrentLedgerEntries(), 0); + }); + // validate ledgers' ensemble with affinity bookies - assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), defaultBookies); + assertAffinityBookies(ledgerManager, ml1.getLedgersInfoAsList(), defaultBookies); // namespace: ns2 - ml = (ManagedLedgerImpl) topic2.getManagedLedger(); - assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger(); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers + 1); + assertEquals(ml2.getCurrentLedgerEntries(), 0); + }); // validate ledgers' ensemble with affinity bookies - assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies); + assertAffinityBookies(ledgerManager, ml2.getLedgersInfoAsList(), isolatedBookies); // namespace: ns3 - ml = (ManagedLedgerImpl) topic3.getManagedLedger(); - assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + ManagedLedgerImpl ml3 = (ManagedLedgerImpl) topic3.getManagedLedger(); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml3.getLedgersInfoAsList().size(), totalLedgers + 1); + assertEquals(ml3.getCurrentLedgerEntries(), 0); + }); // validate ledgers' ensemble with affinity bookies - assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies); + assertAffinityBookies(ledgerManager, ml3.getLedgersInfoAsList(), isolatedBookies); ManagedLedgerClientFactory mlFactory = (ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java index 80db4c30f454d..30867dd2cb44d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java @@ -97,11 +97,13 @@ public void TestConsumedLedgersTrim() throws Exception { } ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); - Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(managedLedger.getLedgersInfoAsList().size() - 1, msgNum / 2); + }); //no traffic, unconsumed ledger will be retained Thread.sleep(1200); - Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2); + Assert.assertEquals(managedLedger.getLedgersInfoAsList().size() - 1, msgNum / 2); for (int i = 0; i < msgNum; i++) { Message msg = consumer.receive(2, TimeUnit.SECONDS); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java index 4ec8107030600..a06085d3d4626 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java @@ -20,6 +20,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -29,11 +30,14 @@ import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.client.api.BatcherBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandCloseProducer; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -186,4 +190,44 @@ public void testSendTimerCheckForBatchContainer(BatcherBuilder batcherBuilder) t future.thenAccept(msgId -> log.info("msg-1 done: {} (msgId: {})", System.nanoTime(), msgId)); future.get(); } + + + @Test + public void testRetentionPolicyByProducingMessages() throws Exception { + // Setup: configure the entries per ledger and retention polices. + final int maxEntriesPerLedger = 10, messagesCount = 10; + final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_"); + pulsar.getConfiguration().setManagedLedgerMaxEntriesPerLedger(maxEntriesPerLedger); + pulsar.getConfiguration().setManagedLedgerMinLedgerRolloverTimeMinutes(0); + pulsar.getConfiguration().setDefaultRetentionTimeInMinutes(0); + pulsar.getConfiguration().setDefaultRetentionSizeInMB(0); + + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topicName) + .sendTimeout(1, TimeUnit.SECONDS) + .enableBatching(false) + .create(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer().topic(topicName) + .subscriptionName("my-sub") + .subscribe(); + // Act: prepare a full ledger data and ack them. + for (int i = 0; i < messagesCount; i++) { + producer.newMessage().sendAsync(); + } + for (int i = 0; i < messagesCount; i++) { + Message message = consumer.receive(); + assertNotNull(message); + consumer.acknowledge(message); + } + // Verify: a new empty ledger will be created after the current ledger is fulled. + // And the previous consumed ledgers will be deleted + Awaitility.await().untilAsserted(() -> { + admin.topics().trimTopic(topicName); + PersistentTopicInternalStats internalStats = admin.topics().getInternalStatsAsync(topicName).get(); + assertEquals(internalStats.currentLedgerEntries, 0); + assertEquals(internalStats.ledgers.size(), 1); + }); + } }