-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][storage] refresh the ledgers map when the offload complete failed #17228
Conversation
--- *Motivation* We found there has an incorrect state when the offload complete failed. It failed by a connection loss exception but the ledger info updated into the meta store successfully. Which makes the in memory data is different to the meta store. Then the offloader will remove the previous offload information and cleanup the ledgers. We have added retry when the meta store received connection loss exception. This PR trying to makes the in memory data won't be different to the meta store when the exception throws but the data updated to the meta store.
ping @hangc0276 @codelipenghui . Could you please take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After checking the offloading process.
Can we just add a check in cleanupOffloaded()
method?
If we encounter the zookeeper operation timeout,
we should only cleanup the offloaded data until the metadata refreshed.
I see you have added lastOffloadCompleteFailed
and refreshedIfOffloadCompleteFailed
, if I understand currently, you want to avoid the subsequent write operation based on the unrefreshed metadata. The managed ledger
already handled this case by updating znode with ledgersStat
.
if (injection != null) { | ||
lastOffloadCompleteFailed = true; | ||
refreshedIfOffloadCompleteFailed = false; | ||
injection.throwException(ledgerId); | ||
} | ||
if (exception == null) { | ||
log.info("[{}] End Offload. ledger={}, uuid={}", name, ledgerId, uuid); | ||
} else { | ||
lastOffloadCompleteFailed = true; | ||
refreshedIfOffloadCompleteFailed = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the code style.
refreshFuture.whenComplete((unused, throwable) -> { | ||
if (throwable != null) { | ||
log.error("Failed to refresh the ledger info list", throwable); | ||
unlockingPromise.completeExceptionally(throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should add return here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
log.warn("[{}] Failed to complete offload of ledger {}, uuid {}", | ||
name, ledgerId, uuid, exception); | ||
} | ||
}); | ||
} | ||
|
||
private Injection injection; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please do not add this mechanism on core classes.
we can use Mockito to interact with the internals and simulate problems
@@ -220,6 +220,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { | |||
private long lastOffloadLedgerId = 0; | |||
private long lastOffloadSuccessTimestamp = 0; | |||
private long lastOffloadFailureTimestamp = 0; | |||
@Getter | |||
private boolean lastOffloadCompleteFailed = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that we are handing correctly concurrent access to these fields
they should be at least volatile
When |
Yes |
@hangc0276 @eolivelli @codelipenghui Thanks for your review! I reconsider and change the implementation of the PR. Please take a look again. Thank you. |
This is another issue I want to mention, I can send it to the mailing list if you prefer to. I take a deep look yesterday, what we want to resolve by this PR is trying to make the ledgers map consistent between the memory and zookeeper server when offloading fails. I saw in Pulsar Metadata handler, we retry the operation when zookeeper throws connection loss exception. But the operation may fail after the retry. For example, we update the ledgers map in memory after successfully updating the LedgerInfo in the zookeeper . If the zookeeper update operation executes successfully on server but throws connection loss on the client, and we have to retry on the connection loss exception, then the callback may receive a BadVersion exception. At this moment, the memory ledgers list is different from the zookeeper server. And that may cause some other issues on the broker. I'm not sure if I missing something. But looks like there have many places in our code we do not consider that situation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall LGTM
I have left some suggestions
@Override | ||
public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { | ||
ledgersStat = stat; | ||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about an explicit ?
synchronized (ManagedLedgerImpl.this) {
} | ||
} | ||
} | ||
metadataMutex.unlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be in a finally
block
@eolivelli PTAL. thank you |
public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { | ||
ledgersStat = stat; | ||
try { | ||
synchronized (ManagedLedgerImpl.this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here should be an issue because maybe there is another operation that updated the ledger list (take the lock first), and then here takes the lock which will mess up the ledger list again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the ledger's update operations should be guarded by the metadata lock because it needs to make sure the ledger stat is the latest version.
And the synchronized to make sure there hasn't remove/add operation when the ledger is closing or creating.
I haven't found other places to operate the map without locks. Do you know if other places still have concurrency issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the synchronized to make sure there hasn't remove/add operation when the ledger is closing or creating.
Yes, that is what I want to say. If there are other operations that will update the ledgers
, it will introduce the problem.
For example:
- asyncRefreshLedgersInfoOnBadVersion get the metadataMutex lock
- we got the returned
ManageLedgerInfo
at line 2370 - another operation changed the
ledgers
, we will waiting here line 2373 - after the
ledgers
changed, we will continue to run lines 2374 - 2377 - we will lose the changes at step 3
And it also might introduce deadlock? someone is getting the synchronized
but waiting for the metadataMutex
lock, here is waiting for the synchronized
lock but can't release the metadataMutex
lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method synchronized void ledgerClosed(final LedgerHandle lh)
will update the ledgers
only with synchronized
lock, no metadataMutex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because when we use metadataMutex, we are trying to lock it, I think it won't cause a deadlock?
When we lock the metadataMutex, I saw all other places will retry. It won't update the ledgers or metadata successfully. Then it has to wait for the refresh done and then do other things.
@@ -2366,6 +2402,9 @@ private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) { | |||
unlockingPromise.whenComplete((res, ex) -> { | |||
offloadMutex.unlock(); | |||
if (ex != null) { | |||
if (FutureUtil.unwrapCompletionException(ex) instanceof ManagedLedgerException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't need check here? asyncRefreshLedgersInfoOnBadVersion
already checked exception type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the exception may a CompletionException, we need to make sure it is a ManagedLedgerException.
@codelipenghui PTAL |
ping @eolivelli |
ping @eolivelli @codelipenghui |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -2957,7 +2998,12 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct | |||
promise.whenComplete((result, exception) -> { | |||
offloadMutex.unlock(); | |||
if (exception != null) { | |||
callback.offloadFailed(new ManagedLedgerException(exception), ctx); | |||
Throwable t = FutureUtil.unwrapCompletionException(exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we call asyncRefreshLedgersInfoOnBadVersion
here but in offloadPrefix
?
asyncOffloadPrefix
is a public method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch! We need to do the refresh here
@zymap |
@Jason918 Because we cannot know who cause the bad version. If the data has been written by another broker in some cases, we will also get a bad version. I added some logs in another pr to check it. |
I see, we stored |
synchronized (ManagedLedgerImpl.this) { | ||
for (LedgerInfo li : mlInfo.getLedgerInfoList()) { | ||
long ledgerId = li.getLedgerId(); | ||
ledgers.put(ledgerId, li); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible some ledgerIds need to be removed (anything not present in mlInfo.getLedgerInfoList()
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we trigger this operation after offloading failed. It shouldn't have any remove operation on the metadata store. If it have remove, it should succeed or fail before the offload, right?
Discussed with @zymap, this should be merged after #17512 helped confirmed the root cause of this issue. Move to FYI @codelipenghui |
This patch is related to this problem, but from another perspective. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure anymore about this patch.
In case of BadVersion we don't know the cause of the inconsistency.
And we should run thru the whole recovery procedure of the ledger.
See the linked PR of mine
I will close this PR. Your's should be a better way to handle this case. |
Motivation
We found there has an incorrect state when the offload complete failed.
It failed by a connection loss exception but the ledger info updated
into the meta store successfully. Which makes the in memory data is
different to the meta store. Then the offloader will remove the previous
offload information and cleanup the ledgers.
We have added retry when the meta store received a connection loss exception.
This PR trying to make the in memory data won't be different to the meta
store when the exception throws but the data is updated to the meta store.
Modifications
Describe the modifications you've done.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)