|
31 | 31 | import com.google.common.collect.Queues;
|
32 | 32 | import com.google.common.collect.Range;
|
33 | 33 | import com.google.common.collect.Sets;
|
| 34 | +import com.google.common.io.BaseEncoding; |
34 | 35 | import io.netty.buffer.ByteBuf;
|
35 | 36 | import io.netty.buffer.Unpooled;
|
36 | 37 | import io.netty.util.Recycler;
|
@@ -2542,16 +2543,11 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
|
2542 | 2543 | }
|
2543 | 2544 |
|
2544 | 2545 | invalidateReadHandle(ls.getLedgerId());
|
2545 |
| - |
2546 |
| - // Retain the offloaded ledger info until actual delete |
2547 |
| - if (!deletableOffloadedLedgers.contains(ls.getLedgerId())) { |
2548 |
| - ledgers.remove(ls.getLedgerId()); |
2549 |
| - } |
| 2546 | + ledgers.remove(ls.getLedgerId()); |
| 2547 | + entryCache.invalidateAllEntries(ls.getLedgerId()); |
2550 | 2548 |
|
2551 | 2549 | NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
|
2552 | 2550 | TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
|
2553 |
| - |
2554 |
| - entryCache.invalidateAllEntries(ls.getLedgerId()); |
2555 | 2551 | }
|
2556 | 2552 | for (LedgerInfo ls : offloadedLedgersToDelete) {
|
2557 | 2553 | LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
|
@@ -4112,7 +4108,10 @@ public void markDeletableLedgers(Collection<Long> deletableLedgerIds,
|
4112 | 4108 | }
|
4113 | 4109 | for (Long ledgerId : deletableOffloadedLedgerIds) {
|
4114 | 4110 | final String deletableOffloadedLedgerMarker = DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId;
|
4115 |
| - propertiesMap.put(deletableOffloadedLedgerMarker, DELETABLE_LEDGER_PLACEHOLDER); |
| 4111 | + // Offload context info is required in ledger cleanup, therefore the serialized info object |
| 4112 | + // is kept in the propertiesMap until the ledger deletion is done |
| 4113 | + final String offloadedLedgerInfo = BaseEncoding.base64().encode(ledgers.get(ledgerId).toByteArray()); |
| 4114 | + propertiesMap.put(deletableOffloadedLedgerMarker, offloadedLedgerInfo); |
4116 | 4115 | }
|
4117 | 4116 | }
|
4118 | 4117 |
|
@@ -4189,26 +4188,38 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
|
4189 | 4188 | }
|
4190 | 4189 |
|
4191 | 4190 | for (Long deletableOffloadedLedger : deletableOffloadedLedgers) {
|
4192 |
| - asyncDeleteOffloadedLedger(deletableOffloadedLedger, |
4193 |
| - ledgers.get(deletableOffloadedLedger), DEFAULT_LEDGER_DELETE_RETRIES, |
4194 |
| - new DeleteLedgerCallback() { |
4195 |
| - @Override |
4196 |
| - public void deleteLedgerComplete(Object ctx) { |
4197 |
| - ledgers.remove(deletableOffloadedLedger); |
4198 |
| - counter.countDown(); |
4199 |
| - finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
4200 |
| - succeedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
4201 |
| - } |
| 4191 | + final String deletableOffloadedLedgerMarker = |
| 4192 | + DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + deletableOffloadedLedger; |
4202 | 4193 |
|
4203 |
| - @Override |
4204 |
| - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { |
4205 |
| - log.warn("[{}] Failed to delete offloaded ledger:{} due to", |
4206 |
| - name, deletableOffloadedLedger, exception); |
4207 |
| - counter.countDown(); |
4208 |
| - finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
4209 |
| - failDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
4210 |
| - } |
4211 |
| - }); |
| 4194 | + try { |
| 4195 | + final LedgerInfo deletableOffloadedLedgerInfo = LedgerInfo.parseFrom( |
| 4196 | + BaseEncoding.base64().decode(propertiesMap.get(deletableOffloadedLedgerMarker))); |
| 4197 | + asyncDeleteOffloadedLedger(deletableOffloadedLedger, deletableOffloadedLedgerInfo, |
| 4198 | + DEFAULT_LEDGER_DELETE_RETRIES, |
| 4199 | + new DeleteLedgerCallback() { |
| 4200 | + @Override |
| 4201 | + public void deleteLedgerComplete(Object ctx) { |
| 4202 | + counter.countDown(); |
| 4203 | + finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
| 4204 | + succeedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
| 4205 | + } |
| 4206 | + |
| 4207 | + @Override |
| 4208 | + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { |
| 4209 | + log.warn("[{}] Failed to delete offloaded ledger:{} due to", |
| 4210 | + name, deletableOffloadedLedger, exception); |
| 4211 | + counter.countDown(); |
| 4212 | + finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
| 4213 | + failDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
| 4214 | + } |
| 4215 | + }); |
| 4216 | + } catch (Exception e) { |
| 4217 | + log.warn("[{}] Failed to retrieve offloaded ledger info of {} due to", |
| 4218 | + name, deletableOffloadedLedger, e); |
| 4219 | + counter.countDown(); |
| 4220 | + finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
| 4221 | + failDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
| 4222 | + } |
4212 | 4223 | }
|
4213 | 4224 |
|
4214 | 4225 | try {
|
|
0 commit comments