-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP-129] Introduce intermediate state for ledger deletion #13575
[PIP-129] Introduce intermediate state for ledger deletion #13575
Conversation
4d16622
to
92da60a
Compare
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
c28414a
to
ca0ddc6
Compare
/pulsarbot run-failure-checks |
@codelipenghui @hangc0276 Could you help review this PR? |
ca0ddc6
to
0c1ddf1
Compare
/pulsarbot run-failure-checks |
a737aae
to
24af2c1
Compare
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
|
||
/** | ||
* Mark deletable ledgers for bookkeeper and offload storage. | ||
* | ||
* @param deletableLedgerIds | ||
* @param deletableOffloadedLedgerIds | ||
*/ | ||
void markDeletableLedgers(Collection<Long> deletableLedgerIds, Collection<Long> deletableOffloadedLedgerIds); | ||
|
||
/** | ||
* Get all deletable ledgers. | ||
* | ||
* @return all the deletable ledgers of the managed-ledger | ||
*/ | ||
Set<Long> getAllDeletableLedgers(); | ||
|
||
/** | ||
* Get all deletable offloaded ledgers. | ||
* | ||
* @return all the deletable offloaded ledgers of the managed-ledger | ||
*/ | ||
Set<Long> getAllDeletableOffloadedLedgers(); | ||
|
||
/** | ||
* Check and remove all the deletable ledgers. | ||
*/ | ||
void removeAllDeletableLedgers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to expose these methods to the ManagedLedger interface? Looks they are only used in the internal of the ManagedLedgerImpl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add tests for this feature to protect the ledger deletion logic. @wuzhanpeng
|
||
/** | ||
* Mark deletable ledgers for bookkeeper and offload storage. | ||
* | ||
* @param deletableLedgerIds | ||
* @param deletableOffloadedLedgerIds | ||
*/ | ||
void markDeletableLedgers(Collection<Long> deletableLedgerIds, Collection<Long> deletableOffloadedLedgerIds); | ||
|
||
/** | ||
* Get all deletable ledgers. | ||
* | ||
* @return all the deletable ledgers of the managed-ledger | ||
*/ | ||
Set<Long> getAllDeletableLedgers(); | ||
|
||
/** | ||
* Get all deletable offloaded ledgers. | ||
* | ||
* @return all the deletable offloaded ledgers of the managed-ledger | ||
*/ | ||
Set<Long> getAllDeletableOffloadedLedgers(); | ||
|
||
/** | ||
* Check and remove all the deletable ledgers. | ||
*/ | ||
void removeAllDeletableLedgers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
||
/** | ||
* Mark deletable ledgers for bookkeeper and offload storage. | ||
* | ||
* @param deletableLedgerIds | ||
* @param deletableOffloadedLedgerIds | ||
*/ | ||
void markDeletableLedgers(Collection<Long> deletableLedgerIds, Collection<Long> deletableOffloadedLedgerIds); | ||
|
||
/** | ||
* Get all deletable ledgers. | ||
* | ||
* @return all the deletable ledgers of the managed-ledger | ||
*/ | ||
Set<Long> getAllDeletableLedgers(); | ||
|
||
/** | ||
* Get all deletable offloaded ledgers. | ||
* | ||
* @return all the deletable offloaded ledgers of the managed-ledger | ||
*/ | ||
Set<Long> getAllDeletableOffloadedLedgers(); | ||
|
||
/** | ||
* Check and remove all the deletable ledgers. | ||
*/ | ||
void removeAllDeletableLedgers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
||
// Mark deletable offloaded ledgers | ||
Set<Long> deletableOffloadedLedgers = ledgersToDelete.stream() | ||
.filter(ls -> ls.getOffloadContext().hasUidMsb()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd better check ls.getOffloadContext().isComplete
instead of ls.getOffloadContext().hasUidMsb()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mainly implemented according to the original logic. See
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
Lines 2716 to 2729 in c18d645
private void asyncDeleteLedger(long ledgerId, LedgerInfo info) { | |
if (!info.getOffloadContext().getBookkeeperDeleted()) { | |
// only delete if it hasn't been previously deleted for offload | |
asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES); | |
} | |
if (info.getOffloadContext().hasUidMsb()) { | |
UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); | |
cleanupOffloaded(ledgerId, uuid, | |
OffloadUtils.getOffloadDriverName(info, config.getLedgerOffloader().getOffloadDriverName()), | |
OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()), | |
"Trimming"); | |
} | |
} |
|
||
if (allFailedLedgers.isEmpty()) { | ||
log.info("[{}] ledgers: {} and offloaded ledgers: {} are deleted successfully.", | ||
name, deletableLedgers, deletableOffloadedLedgers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deletableLedgers
and deletableOffloadedLedgers
are Set and not override toString
, So it will print the object address instead of the items.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those collection objects that inherit java.util.AbstractCollection
have the ability to return a string representation of this collection (java.util.AbstractCollection#toString
).
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Successfully delete bookkeeper ledgers: {} and offloaded ledgers: {}. " | ||
+ "Failed to delete bookkeeper ledgers: {} and offloaded ledgers: {}", name, | ||
succeedDeletedLedgers, succeedDeletedOffloadedLedgers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same above.
@@ -2547,21 +2582,15 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) { | |||
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() { | |||
@Override | |||
public void operationComplete(Void result, Stat stat) { | |||
// perform actual deletion | |||
removeAllDeletableLedgers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will block at most 30s to wait all the ledger delete operation completed. However, this method is called by meta store callback thread, which will block the thread and lead to deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for performming removeAllDeletableLedgers
synchronously here is that two locks of trimmer
and metadata
need to be released after the actual deletion. If the execution is asynchronous, the update process of the property map cannot be protected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should get the metadata lock in the deleting steps. The ledger deletion may cost a lot of time which may block other metadata operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The operationComplete will be called by bookkeeper-ml-scheduler
thread pool, If it executed by synchronously, it will block other operations. We can use another new thread pool the execute it.
For the property map problem, could we use another thread safe data structure the store it?
@@ -146,6 +151,13 @@ | |||
|
|||
protected static final int AsyncOperationTimeoutSeconds = 30; | |||
|
|||
protected static final String DELETABLE_LEDGER_MARKER_KEYWORD = "pulsar.ml.deletable.ledgers"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will append the prefix for each ledger, and store the property map into ZNode. However, if there are thousands of ledgers to delete, the property map will cost too many storage space especially into Znode, which limit by 5MB. The key point is the prefix will duplicate for each ledger. We'd better redesign the data structure of the properties storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your question. IMO, the problem of over-size znode should theoretically be a discussion about whether ManagedLedgerInfo
should be stored on zk. Or we can think of it this way, the ledger information stored in the property map actually comes from the LedgerInfoList
structure, and this part of the data is also stored in a znode. Therefore, if the number of ledgers is large, the corresponding ml-znode must be huge. To avoid complicating matters, I think we can skip this part in this proposal, and start a new discussion thread on this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For current implementation, we shouldn't introduce more overload for current managedLedger Znode storage. we'd better reduce the prefix of keyword or redesign the data structure for properties storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we keep the d_ledgers
is enough? The pulsar.ml
looks useless.
@wuzhanpeng We'd better expose the ledger delete state into metrics. |
24af2c1
to
8366b5b
Compare
According to the review comments, I have made the following modifications:
@codelipenghui @hangc0276 PTAL at your convenience~ |
@@ -2547,21 +2582,15 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) { | |||
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() { | |||
@Override | |||
public void operationComplete(Void result, Stat stat) { | |||
// perform actual deletion | |||
removeAllDeletableLedgers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The operationComplete will be called by bookkeeper-ml-scheduler
thread pool, If it executed by synchronously, it will block other operations. We can use another new thread pool the execute it.
For the property map problem, could we use another thread safe data structure the store it?
@@ -146,6 +151,13 @@ | |||
|
|||
protected static final int AsyncOperationTimeoutSeconds = 30; | |||
|
|||
protected static final String DELETABLE_LEDGER_MARKER_KEYWORD = "pulsar.ml.deletable.ledgers"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For current implementation, we shouldn't introduce more overload for current managedLedger Znode storage. we'd better reduce the prefix of keyword or redesign the data structure for properties storage.
// let current ledger close | ||
ml.rollCurrentLedgerIfFull(); | ||
// let retention expire | ||
Thread.sleep(1500); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoiding to use sleep, it will introduce flaky test.
// Update metadata | ||
// Mark deletable ledgers | ||
Set<Long> deletableLedgers = Stream | ||
.concat( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to concat the ledgersToDelete
and offloadedLedgersToDelete
? Looks like they may duplicate in the deletableLedgers set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be ledgers in both ledgersToDelete
and offloadedLedgersToDelete
that need to be deleted. They do have the potential to have duplicate ledgers and that's why toSet
is used.
asyncDeleteLedger(ledgerId, retry, new DeleteLedgerCallback() { | ||
@Override | ||
public void deleteLedgerComplete(Object ctx) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to handle the callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the default implementation, which is used to be compatible with the original logic.
@@ -2547,21 +2582,15 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) { | |||
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() { | |||
@Override | |||
public void operationComplete(Void result, Stat stat) { | |||
// perform actual deletion | |||
removeAllDeletableLedgers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should get the metadata lock in the deleting steps. The ledger deletion may cost a lot of time which may block other metadata operations.
@@ -146,6 +151,13 @@ | |||
|
|||
protected static final int AsyncOperationTimeoutSeconds = 30; | |||
|
|||
protected static final String DELETABLE_LEDGER_MARKER_KEYWORD = "pulsar.ml.deletable.ledgers"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we keep the d_ledgers
is enough? The pulsar.ml
looks useless.
8366b5b
to
f75891e
Compare
I have made some updates based on the comments, PTAL~ @hangc0276 @zymap |
/pulsarbot run-failure-checks |
The pr had no activity for 30 days, mark with Stale label. |
What is the stats of the change? @wuzhanpeng |
The pr had no activity for 30 days, mark with Stale label. |
Closed as stale and conflict. Please rebase and resubmit the patch if it's still relevant. |
Master Issue: #13526
Motivation
This pull request is the specific implementation plan of PIP-129
Modifications
Most changes are concentrated on
org.apache.bookkeeper.mledger.ManagedLedger
andorg.apache.bookkeeper.mledger.impl.ManagedLedgerImpl
.Verifying this change
This change is already covered by existing tests, such as managed-ledger.
This PR is just a refactoring of the ledger deletion. The original logic semantics has not changed. I think the existing test cases of unit test and integration test are sufficient for test coverage.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
no-need-doc
Only involves the improvement and reconstruction of internal logic.