Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][storage] refresh the ledgers map when the offload complete failed #17228

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -2353,6 +2353,42 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
}
}

// Although we have caught the connection loss exception on the meta store, to avoid other exceptions cause
// the mismatch between meta store and in memory, we refresh the ledger info list when the offload execute
// failed by badversion
private void asyncRefreshLedgersInfoOnBadVersion(ManagedLedgerException exception) {
if (!(exception instanceof BadVersionException)) {
return;
}
if (!metadataMutex.tryLock()) {
scheduledExecutor.schedule(
() -> asyncRefreshLedgersInfoOnBadVersion(exception), 100, TimeUnit.MILLISECONDS);
return;
}
store.getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
@Override
public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
ledgersStat = stat;
try {
synchronized (ManagedLedgerImpl.this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here should be an issue because maybe there is another operation that updated the ledger list (take the lock first), and then here takes the lock which will mess up the ledger list again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the ledger's update operations should be guarded by the metadata lock because it needs to make sure the ledger stat is the latest version.
And the synchronized to make sure there hasn't remove/add operation when the ledger is closing or creating.

I haven't found other places to operate the map without locks. Do you know if other places still have concurrency issues?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the synchronized to make sure there hasn't remove/add operation when the ledger is closing or creating.

Yes, that is what I want to say. If there are other operations that will update the ledgers, it will introduce the problem.

For example:

  1. asyncRefreshLedgersInfoOnBadVersion get the metadataMutex lock
  2. we got the returned ManageLedgerInfo at line 2370
  3. another operation changed the ledgers, we will waiting here line 2373
  4. after the ledgers changed, we will continue to run lines 2374 - 2377
  5. we will lose the changes at step 3

And it also might introduce deadlock? someone is getting the synchronized but waiting for the metadataMutex lock, here is waiting for the synchronized lock but can't release the metadataMutex lock

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method synchronized void ledgerClosed(final LedgerHandle lh) will update the ledgers only with synchronized lock, no metadataMutex

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because when we use metadataMutex, we are trying to lock it, I think it won't cause a deadlock?

When we lock the metadataMutex, I saw all other places will retry. It won't update the ledgers or metadata successfully. Then it has to wait for the refresh done and then do other things.

for (LedgerInfo li : mlInfo.getLedgerInfoList()) {
long ledgerId = li.getLedgerId();
ledgers.put(ledgerId, li);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible some ledgerIds need to be removed (anything not present in mlInfo.getLedgerInfoList())?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we trigger this operation after offloading failed. It shouldn't have any remove operation on the metadata store. If it have remove, it should succeed or fail before the offload, right?

}
}
} finally {
metadataMutex.unlock();
}
}

@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to refresh the list of ledgers after updating failed", name, e);
metadataMutex.unlock();
}
});
}

private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
if (!offloadMutex.tryLock()) {
scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
Expand All @@ -2362,6 +2398,10 @@ private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
unlockingPromise.whenComplete((res, ex) -> {
offloadMutex.unlock();
if (ex != null) {
Throwable e = FutureUtil.unwrapCompletionException(ex);
if (e instanceof ManagedLedgerException) {
asyncRefreshLedgersInfoOnBadVersion((ManagedLedgerException) e);
}
finalPromise.completeExceptionally(ex);
} else {
finalPromise.complete(res);
Expand Down Expand Up @@ -2867,6 +2907,7 @@ public void offloadComplete(Position offloadedTo, Object ctx) {

@Override
public void offloadFailed(ManagedLedgerException e, Object ctx) {
asyncRefreshLedgersInfoOnBadVersion(e);
promise.completeExceptionally(e);
}
}, null);
Expand Down Expand Up @@ -2957,7 +2998,12 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct
promise.whenComplete((result, exception) -> {
offloadMutex.unlock();
if (exception != null) {
callback.offloadFailed(new ManagedLedgerException(exception), ctx);
Throwable t = FutureUtil.unwrapCompletionException(exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we call asyncRefreshLedgersInfoOnBadVersion here but in offloadPrefix?
asyncOffloadPrefix is a public method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! We need to do the refresh here

if (t instanceof ManagedLedgerException) {
callback.offloadFailed((ManagedLedgerException) t, ctx);
} else {
callback.offloadFailed(new ManagedLedgerException(t), ctx);
}
} else {
callback.offloadComplete(result, ctx);
}
Expand Down