Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Create new ledger after the current ledger is closed #22034

Merged
merged 12 commits into from
Mar 22, 2024

Conversation

liangyepianzhou
Copy link
Contributor

@liangyepianzhou liangyepianzhou commented Feb 6, 2024

Motivation

Fix1: Create new ledger after the current ledger is closed

Background

  1. Managerledger has a configuration maximumRolloverTimeMs to configure the max time of closing a ledger.
  2. The current ledger can be rollover when it reaches maximumRolloverTimeMs.
  3. The current ledger will not change after it is rollovered. The current ledger only be changed when creating a new ledger.
  4. The current ledger can not be deleted, because the current ledger may be written to.
  5. The ClosedLedger state of the manager indicates that the current ledger is rollovered and can not be written to.

In the current logic, if it is found that the current ledger should be closed during the process of writing an entry, then the current ledger will be closed after this entry is written. However, if there is no pending add entry operation, a new ledger will not be created. This will cause the current ledger to be unable to be deleted, and the retention policy cannot be executed as expected by the user. This is the problem we need to solve, and I will further elaborate on this issue below:

We need to open a new ledger after the current ledger when it is rolled over. In fact, Pulsar has a task that periodically checks if the ledger is full, and it will create a new ledger immediately after closing the ledger. Subsequently, the new ledger becomes the current ledger and the previous current ledger can be deleted.

this.checkLedgerRollTask = this.scheduledExecutor.schedule(
this::rollCurrentLedgerIfFull, this.maximumRolloverTimeMs, TimeUnit.MILLISECONDS);
}

public void rollCurrentLedgerIfFull() {
log.info("[{}] Start checking if current ledger is full", name);
if (currentLedgerEntries > 0 && currentLedgerIsFull()
&& STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) {
currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
@Override
public void closeComplete(int rc, LedgerHandle lh, Object o) {
checkArgument(currentLedger.getId() == lh.getId(), "ledgerId %s doesn't match with "
+ "acked ledgerId %s", currentLedger.getId(), lh.getId());
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
log.debug("[{}] Successfully closed ledger {}, trigger by rollover full ledger",
name, lh.getId());
}
} else {
log.warn("[{}] Error when closing ledger {}, trigger by rollover full ledger, Status={}",
name, lh.getId(), BKException.getMessage(rc));
}
ledgerClosed(lh);
createLedgerAfterClosed();
}

However, the ledger may also be closed after adding an entry, and at this time, if there is no pending add entry operation, a new ledger will not be created. Therefore, the already closed current ledger cannot be deleted because there is no new ledger to become the current ledger.

if (!pendingAddEntries.isEmpty()) {
// Need to create a new ledger to write pending entries
createLedgerAfterClosed();
}

Fix2: slowestReaderPosition should be the next porsition of markdelete position instead of markdelete position
In the current ledger the slowestReaderPosition will be the slowest markdeletepostion and this will make the last full ledger can not be trimed after all the entries in this ledger is acked.

internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),

protected void internalAsyncMarkDelete(final PositionImpl newPosition, Map<String, Long> properties,
final MarkDeleteCallback callback, final Object ctx) {
ledger.mbean.addMarkDeleteOp();
MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx);
// We cannot write to the ledger during the switch, need to wait until the new metadata ledger is available
synchronized (pendingMarkDeleteOps) {
// The state might have changed while we were waiting on the queue mutex
switch (STATE_UPDATER.get(this)) {
case Closed:
callback.markDeleteFailed(new ManagedLedgerException
.CursorAlreadyClosedException("Cursor was already closed"), ctx);
return;
case NoLedger:
pendingMarkDeleteOps.add(mdEntry);
// We need to create a new ledger to write into.
startCreatingNewMetadataLedger();
break;
// fall through
case SwitchingLedger:
pendingMarkDeleteOps.add(mdEntry);
break;
case Open:
if (PENDING_READ_OPS_UPDATER.get(this) > 0) {
// Wait until no read operation are pending
pendingMarkDeleteOps.add(mdEntry);
} else {
// Execute the mark delete immediately
internalMarkDelete(mdEntry);
}

public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Position newPosition) {
requireNonNull(cursor);
long stamp = rwLock.writeLock();
try {
Item item = cursors.get(cursor.getName());
if (item == null || item.idx == -1) {
return null;
}
PositionImpl previousSlowestConsumer = heap.get(0).position;
item.position = (PositionImpl) newPosition;
version = DataVersion.getNextVersion(version);
if (heap.size() == 1) {
return Pair.of(previousSlowestConsumer, item.position);
}
// When the cursor moves forward, we need to push it toward the
// bottom of the tree and push it up if a reset was done
if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) {
siftDown(item);
} else {
siftUp(item);
}
PositionImpl newSlowestConsumer = heap.get(0).position;
return Pair.of(previousSlowestConsumer, newSlowestConsumer);
} finally {
rwLock.unlockWrite(stamp);
}

Modifications

Fix1: Create new ledger after the current ledger is closed
We have two ways to solve this problem:

  1. Do not skip the current ledger when trimming ledgers.
  2. Create a new ledger after the current ledger is closed and there are no pending add entry operations.

In most cases, users take more care about the write-read latency instead of the cost of storage brought by creating an empty ledger. So I suggest we delete the limit of !pendingAddEntries.isEmpty(), creating a new ledger after closing the current ledger.
Fix2: slowestReaderPosition should be the next porsition of markdelete position instead of markdelete position

Skip the ledger that has been read completed when trimming.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Feb 6, 2024
@eolivelli
Copy link
Contributor

This sounds like a big behaviour change.
What about discussing on the dev@ mailing list ?

@Technoboy-
Copy link
Contributor

This sounds like a big behaviour change. What about discussing on the dev@ mailing list ?

ClosedLedger means Current ledger has been closed , so we can delete it. But I want to enable the test and check if it works

1. transaction test: deleteNamespace after test
2. badVersionErrorDuringTruncateLedger: Avoid the ledger was closed after rollover
3. testBacklogQuotaWithReader
no backlog should be retained
Copy link
Member

@mattisonchao mattisonchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiting for review.

@mattisonchao
Copy link
Member

The current ledger will not change after it is rollovered. The current ledger only be changed when creating a new ledger.

Hi, @liangyepianzhou could you please point out which code? I saw we will create a new ledger here.

@mattisonchao
Copy link
Member

Oh, I saw one place will cause this behaviour

. But it should be okay, the rollover will create a new ledger finally.

@liangyepianzhou
Copy link
Contributor Author

liangyepianzhou commented Feb 7, 2024

Oh, I saw one place will cause this behaviour

. But it should be okay, the rollover will create a new ledger finally.

Oh, I see. This is a bug, not a behavior change. The rollover in the OpAddEntry does not create a new ledger if there are no pending write ops.

if (closeWhenDone) {
log.info("[{}] Closing ledger {} for being full", ml.getName(), ledgerId);
// `data` will be released in `closeComplete`
if (ledger != null) {
ledger.asyncClose(this, ctx);
}

The callback finally reaches here.
if (!pendingAddEntries.isEmpty()) {
// Need to create a new ledger to write pending entries
createLedgerAfterClosed();
}

@lhotari
Copy link
Member

lhotari commented Feb 8, 2024

Is this related to #21893?

@lhotari
Copy link
Member

lhotari commented Feb 8, 2024

@liangyepianzhou It would be good to improve the description of the PR.

We need to delete the current ledger when it is rollovered.

This doesn't make sense.

Do not skip the current ledger when trimming ledgers if the state of the managerLedger is ClosedLedger.

This makes more sense. Is this what is the actual goal? Obviously the trimming could result in deleting the ledger.

@lhotari
Copy link
Member

lhotari commented Feb 9, 2024

Would it make sense to rename "Delete current ledger when it is closed" to "Trim current ledger when it is closed" if that's what is the expectation?

@lhotari
Copy link
Member

lhotari commented Feb 9, 2024

It seems that the problem might be very different. It would be necessary to explain the use case.
The description doesn't explain the use case. If I have understood correctly, the use case is about
configuring retention by time. When retention is configured, it is expected that expired messages get deleted.
That isn't happening in some cases currently and that's the issue that is being addressed.

One rare case where Pulsar currently seems to ignore retention policies is the case when topics aren't actively loaded on any broker in the cluster. A topic gets loaded when a consumer or producer connects to the topic.

@lhotari
Copy link
Member

lhotari commented Feb 9, 2024

There's a scheduled job

protected void startConsumedLedgersMonitor() {
int interval = pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
if (interval > 0) {
consumedLedgersMonitor.scheduleAtFixedRate(this::checkConsumedLedgers,
interval, interval, TimeUnit.SECONDS);
}
}
that runs by default every 120 seconds (configured with retentionCheckIntervalInSeconds).
That iterates all active topics and their managed ledgers:
private void checkConsumedLedgers() {
forEachTopic((t) -> {
if (t instanceof PersistentTopic) {
Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent(
managedLedger -> {
managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
}
);
}
});
}

This works as long as the topic is active (loaded) in a broker in the cluster.

@liangyepianzhou liangyepianzhou changed the title [fix][broker] Delete current ledger when it is closed [fix][broker] Create new ledger after the current ledger is closed Feb 16, 2024
@liangyepianzhou
Copy link
Contributor Author

@lhotari Thanks for your review. It helps a lot. I have updated the description, which has greatly changed from my original idea. In the initial plan, I did not consider the following two points:

  1. Creating a new ledger might be more acceptable to users.
  2. The existing timed task also directly creates a new ledger, so we should maintain consistency in behavior.

I hope the original description did not cause you too much trouble, and I look forward to your reply.

shibd pushed a commit that referenced this pull request Apr 15, 2024
shibd pushed a commit that referenced this pull request Apr 15, 2024
shibd pushed a commit that referenced this pull request Apr 15, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 15, 2024
…ntriesInStorage

- change apache#22034 is missing from branch-3.0

(cherry picked from commit e3531e8)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 16, 2024
mukesh-ctds added a commit to datastax/pulsar that referenced this pull request Apr 16, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 16, 2024
…pache#22034)

(cherry picked from commit d0ca983)
(cherry picked from commit 54042df)
(cherry picked from commit eed3d17)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 17, 2024
…ntriesInStorage

- change apache#22034 is missing from branch-3.0

(cherry picked from commit e3531e8)
mukesh-ctds added a commit to datastax/pulsar that referenced this pull request Apr 17, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 18, 2024
…ntriesInStorage

- change apache#22034 is missing from branch-3.0

(cherry picked from commit e3531e8)
mukesh-ctds added a commit to datastax/pulsar that referenced this pull request Apr 18, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 19, 2024
…ntriesInStorage

- change apache#22034 is missing from branch-3.0

(cherry picked from commit e3531e8)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 19, 2024
@lhotari
Copy link
Member

lhotari commented Apr 22, 2024

In branch-3.0 this change is making ClusterMigrationTest fail consistently with NPE.
example: https://github.com/apache/pulsar/actions/runs/8778991470/job/24086503436#step:10:1557

  Error:  org.apache.pulsar.broker.service.ClusterMigrationTest.testClusterMigration[true, Shared](4)  Time elapsed: 87.394 s  <<< FAILURE!
  org.apache.pulsar.client.api.PulsarClientException$BrokerPersistenceException: {"errorMsg":"org.apache.bookkeeper.mledger.ManagedLedgerException$MetaStoreException: java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke "org.apache.bookkeeper.client.LedgerHandle.getId()" because "this.currentLedger" is null","reqId":1758475176444131959, "remote":"localhost/127.0.0.1:43465", "local":"/127.0.0.1:35162"}
  	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1083)
  	at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:89)
  	at org.apache.pulsar.broker.service.ClusterMigrationTest.testClusterMigration(ClusterMigrationTest.java:352)
  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
  	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  	at java.base/java.lang.Thread.run(Thread.java:840)

example stacktrace of the NPE

2024-04-22T11:26:00,038 - WARN  - [bookkeeper-ml-scheduler-OrderedScheduler-8-0:ManagedLedgerImpl] - [pulsar/migrationNs/persistent/migrationTopic-bf02348c-42bb-42aa-a5ee-7c5d7f9ddec9] Recovery for cursor s1 failed
org.apache.bookkeeper.mledger.ManagedLedgerException$MetaStoreException: java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke "org.apache.bookkeeper.client.LedgerHandle.getId()" because "this.currentLedger" is null
Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke "org.apache.bookkeeper.client.LedgerHandle.getId()" because "this.currentLedger" is null
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) ~[?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:201) ~[bookkeeper-common-4.16.5.jar:4.16.5]
	at org.apache.bookkeeper.common.util.SingleThreadSafeScheduledExecutorService$SafeRunnable.run(SingleThreadSafeScheduledExecutorService.java:46) ~[bookkeeper-common-4.16.5.jar:4.16.5]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.bookkeeper.client.LedgerHandle.getId()" because "this.currentLedger" is null
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.getValidPositionAfterSkippedEntries(ManagedLedgerImpl.java:3705) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.getNextValidPosition(ManagedLedgerImpl.java:3696) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.recoveredCursor(ManagedCursorImpl.java:703) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$2.operationComplete(ManagedCursorImpl.java:513) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$2.operationComplete(ManagedCursorImpl.java:472) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.MetaStoreImpl.lambda$asyncGetCursorInfo$13(MetaStoreImpl.java:241) ~[classes/:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) ~[?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:201) ~[bookkeeper-common-4.16.5.jar:4.16.5]
	at org.apache.bookkeeper.common.util.SingleThreadSafeScheduledExecutorService$SafeRunnable.run(SingleThreadSafeScheduledExecutorService.java:46) ~[bookkeeper-common-4.16.5.jar:4.16.5]

@lhotari
Copy link
Member

lhotari commented Apr 22, 2024

When cherry-picking, it's important to also pick #22552

srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 23, 2024
…ntriesInStorage

- change apache#22034 is missing from branch-3.0

(cherry picked from commit e3531e8)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 23, 2024
@lhotari
Copy link
Member

lhotari commented Oct 9, 2024

This PR introduced a flaky test #23164, @liangyepianzhou do you have a chance to fix it? thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants