|
31 | 31 | import com.google.common.collect.Queues;
|
32 | 32 | import com.google.common.collect.Range;
|
33 | 33 | import com.google.common.collect.Sets;
|
| 34 | +import com.google.common.io.BaseEncoding; |
34 | 35 | import io.netty.buffer.ByteBuf;
|
35 | 36 | import io.netty.buffer.Unpooled;
|
36 | 37 | import io.netty.util.Recycler;
|
@@ -2557,16 +2558,11 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
|
2557 | 2558 | }
|
2558 | 2559 |
|
2559 | 2560 | invalidateReadHandle(ls.getLedgerId());
|
2560 |
| - |
2561 |
| - // Retain the offloaded ledger info until actual delete |
2562 |
| - if (!deletableOffloadedLedgers.contains(ls.getLedgerId())) { |
2563 |
| - ledgers.remove(ls.getLedgerId()); |
2564 |
| - } |
| 2561 | + ledgers.remove(ls.getLedgerId()); |
| 2562 | + entryCache.invalidateAllEntries(ls.getLedgerId()); |
2565 | 2563 |
|
2566 | 2564 | NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
|
2567 | 2565 | TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
|
2568 |
| - |
2569 |
| - entryCache.invalidateAllEntries(ls.getLedgerId()); |
2570 | 2566 | }
|
2571 | 2567 | for (LedgerInfo ls : offloadedLedgersToDelete) {
|
2572 | 2568 | LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
|
@@ -4136,7 +4132,10 @@ public void markDeletableLedgers(Collection<Long> deletableLedgerIds,
|
4136 | 4132 | }
|
4137 | 4133 | for (Long ledgerId : deletableOffloadedLedgerIds) {
|
4138 | 4134 | final String deletableOffloadedLedgerMarker = DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId;
|
4139 |
| - propertiesMap.put(deletableOffloadedLedgerMarker, DELETABLE_LEDGER_PLACEHOLDER); |
| 4135 | + // Offload context info is required in ledger cleanup, therefore the serialized info object |
| 4136 | + // is kept in the propertiesMap until the ledger deletion is done |
| 4137 | + final String offloadedLedgerInfo = BaseEncoding.base64().encode(ledgers.get(ledgerId).toByteArray()); |
| 4138 | + propertiesMap.put(deletableOffloadedLedgerMarker, offloadedLedgerInfo); |
4140 | 4139 | }
|
4141 | 4140 | }
|
4142 | 4141 |
|
@@ -4213,26 +4212,38 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
|
4213 | 4212 | }
|
4214 | 4213 |
|
4215 | 4214 | for (Long deletableOffloadedLedger : deletableOffloadedLedgers) {
|
4216 |
| - asyncDeleteOffloadedLedger(deletableOffloadedLedger, |
4217 |
| - ledgers.get(deletableOffloadedLedger), DEFAULT_LEDGER_DELETE_RETRIES, |
4218 |
| - new DeleteLedgerCallback() { |
4219 |
| - @Override |
4220 |
| - public void deleteLedgerComplete(Object ctx) { |
4221 |
| - ledgers.remove(deletableOffloadedLedger); |
4222 |
| - counter.countDown(); |
4223 |
| - finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
4224 |
| - succeedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
4225 |
| - } |
| 4215 | + final String deletableOffloadedLedgerMarker = |
| 4216 | + DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + deletableOffloadedLedger; |
4226 | 4217 |
|
4227 |
| - @Override |
4228 |
| - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { |
4229 |
| - log.warn("[{}] Failed to delete offloaded ledger:{} due to", |
4230 |
| - name, deletableOffloadedLedger, exception); |
4231 |
| - counter.countDown(); |
4232 |
| - finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
4233 |
| - failDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
4234 |
| - } |
4235 |
| - }); |
| 4218 | + try { |
| 4219 | + final LedgerInfo deletableOffloadedLedgerInfo = LedgerInfo.parseFrom( |
| 4220 | + BaseEncoding.base64().decode(propertiesMap.get(deletableOffloadedLedgerMarker))); |
| 4221 | + asyncDeleteOffloadedLedger(deletableOffloadedLedger, deletableOffloadedLedgerInfo, |
| 4222 | + DEFAULT_LEDGER_DELETE_RETRIES, |
| 4223 | + new DeleteLedgerCallback() { |
| 4224 | + @Override |
| 4225 | + public void deleteLedgerComplete(Object ctx) { |
| 4226 | + counter.countDown(); |
| 4227 | + finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
| 4228 | + succeedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
| 4229 | + } |
| 4230 | + |
| 4231 | + @Override |
| 4232 | + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { |
| 4233 | + log.warn("[{}] Failed to delete offloaded ledger:{} due to", |
| 4234 | + name, deletableOffloadedLedger, exception); |
| 4235 | + counter.countDown(); |
| 4236 | + finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
| 4237 | + failDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
| 4238 | + } |
| 4239 | + }); |
| 4240 | + } catch (Exception e) { |
| 4241 | + log.warn("[{}] Failed to retrieve offloaded ledger info of {} due to", |
| 4242 | + name, deletableOffloadedLedger, e); |
| 4243 | + counter.countDown(); |
| 4244 | + finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
| 4245 | + failDeletedOffloadedLedgers.add(deletableOffloadedLedger); |
| 4246 | + } |
4236 | 4247 | }
|
4237 | 4248 |
|
4238 | 4249 | try {
|
|
0 commit comments