From 75b2db956287d272410a12de5776cb784d322a0c Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 May 2024 10:30:00 +0200 Subject: [PATCH] [improve][storage] Periodically rollover Cursor ledgers (#257) --- .../bookkeeper/mledger/ManagedCursor.java | 7 ++++ .../bookkeeper/mledger/ManagedLedger.java | 5 +++ .../mledger/impl/ManagedCursorImpl.java | 36 +++++++++++++++--- .../mledger/impl/ManagedLedgerImpl.java | 9 +++++ .../mledger/impl/ManagedLedgerBkTest.java | 37 +++++++++++++++++++ .../pulsar/broker/service/BrokerService.java | 1 + 6 files changed, 89 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index bc6a1e9a782d6..dc731b8bfe57a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -866,4 +866,11 @@ default void skipNonRecoverableLedger(long ledgerId){} * @return whether this cursor is closed. */ boolean isClosed(); + + /** + * Called by the system to trigger perdiodic rollover in absence of activity. + */ + default boolean periodicRollover() { + return false; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index f91d9ec3f5a02..955a0d7850275 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -631,6 +631,11 @@ void asyncSetProperties(Map properties, AsyncCallbacks.UpdatePro */ void trimConsumedLedgersInBackground(CompletableFuture promise); + /** + * Rollover cursors in background if needed. + */ + default void rolloverCursorsInBackground() {} + /** * If a ledger is lost, this ledger will be skipped after enabled "autoSkipNonRecoverableData", and the method is * used to delete information about this ledger in the ManagedCursor. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index ccea125ef83f6..b726a8fbd3150 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3112,12 +3112,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin lh1.getId()); } - if (shouldCloseLedger(lh1)) { - if (log.isDebugEnabled()) { - log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name); - } - startCreatingNewMetadataLedger(); - } + rolloverLedgerIfNeeded(lh1); mbean.persistToLedger(true); mbean.addWriteCursorLedgerSize(data.length); @@ -3135,6 +3130,35 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin }, null); } + public boolean periodicRollover() { + LedgerHandle lh = cursorLedger; + if (State.Open.equals(STATE_UPDATER.get(this)) + && lh != null && lh.getLength() > 0) { + boolean triggered = rolloverLedgerIfNeeded(lh); + if (triggered) { + log.info("[{}] Periodic rollover triggered for cursor {} (length={} bytes)", + ledger.getName(), name, lh.getLength()); + } else { + log.debug("[{}] Periodic rollover skipped for cursor {} (length={} bytes)", + ledger.getName(), name, lh.getLength()); + + } + return triggered; + } + return false; + } + + boolean rolloverLedgerIfNeeded(LedgerHandle lh1) { + if (shouldCloseLedger(lh1)) { + if (log.isDebugEnabled()) { + log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name); + } + startCreatingNewMetadataLedger(); + return true; + } + return false; + } + void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final VoidCallback callback) { final PositionImpl newPosition = mdEntry.newPosition; STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 3c56d6c66a5fd..91edf6ad46648 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2855,6 +2855,15 @@ public void operationFailed(MetaStoreException e) { } } + @Override + public void rolloverCursorsInBackground() { + if (cursors.hasDurableCursors()) { + executor.execute(() -> { + cursors.forEach(ManagedCursor::periodicRollover); + }); + } + } + /** * @param ledgerId the ledger handle which maybe will be released. * @return if the ledger handle was released. diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 0281c8cdd88e3..bb505200ba75e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -53,6 +53,7 @@ import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; +import org.awaitility.Awaitility; import org.testng.annotations.Test; public class ManagedLedgerBkTest extends BookKeeperClusterTestCase { @@ -548,6 +549,42 @@ public void testChangeCrcType() throws Exception { } } + @Test + public void testPeriodicRollover() throws Exception { + ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig(); + factoryConf.setMaxCacheSize(0); + + int rolloverTimeForCursorInSeconds = 5; + + @Cleanup("shutdown") + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1) + .setMetadataAckQuorumSize(1) + .setLedgerRolloverTimeout(rolloverTimeForCursorInSeconds); + ManagedLedger ledger = factory.open("my-ledger" + testName, config); + ManagedCursor cursor = ledger.openCursor("c1"); + + Position pos = ledger.addEntry("entry-0".getBytes()); + ledger.addEntry("entry-1".getBytes()); + + List entries = cursor.readEntries(2); + assertEquals(2, entries.size()); + entries.forEach(Entry::release); + ManagedCursorImpl cursorImpl = (ManagedCursorImpl) cursor; + assertEquals(ManagedCursorImpl.State.NoLedger, cursorImpl.state); + + // this creates the ledger + cursor.delete(pos); + + Awaitility.await().until(() -> cursorImpl.state == ManagedCursorImpl.State.Open); + + Thread.sleep(rolloverTimeForCursorInSeconds * 1000 + 1000); + + long currentLedgerId = cursorImpl.getCursorLedger(); + assertTrue(cursor.periodicRollover()); + Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 97742e25af255..050dee9f99004 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2118,6 +2118,7 @@ private void checkConsumedLedgers() { Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent( managedLedger -> { managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE); + managedLedger.rolloverCursorsInBackground(); } ); }