@@ -3183,10 +3183,16 @@ private void cleanupOffloaded(long ledgerId, UUID uuid, String offloadDriverName
3183
3183
metadataMap .putAll (offloadDriverMetadata );
3184
3184
metadataMap .put ("ManagedLedgerName" , name );
3185
3185
3186
- Retries .run (Backoff .exponentialJittered (TimeUnit .SECONDS .toMillis (1 ), TimeUnit .SECONDS .toHours (1 )).limit (10 ),
3187
- Retries .NonFatalPredicate ,
3188
- () -> config .getLedgerOffloader ().deleteOffloaded (ledgerId , uuid , metadataMap ),
3189
- scheduledExecutor , name ).whenComplete ((ignored , exception ) -> {
3186
+ Retries
3187
+ // The purpose of not specifying the scheduler's key explicitly here is to avoid deadlock.
3188
+ // Otherwise when the caller of this method and the retry task are both running in the same
3189
+ // ordering thread, there will be a risk of race-condition.
3190
+ .run (
3191
+ Backoff .exponentialJittered (TimeUnit .SECONDS .toMillis (1 ), TimeUnit .SECONDS .toHours (1 )).limit (10 ),
3192
+ Retries .NonFatalPredicate ,
3193
+ () -> config .getLedgerOffloader ().deleteOffloaded (ledgerId , uuid , metadataMap ),
3194
+ scheduledExecutor )
3195
+ .whenComplete ((ignored , exception ) -> {
3190
3196
if (exception != null ) {
3191
3197
log .warn ("[{}] Error cleaning up offload for {}, (cleanup reason: {})" ,
3192
3198
name , ledgerId , cleanupReason , exception );
@@ -4134,6 +4140,7 @@ public void removeAllDeletableLedgers() {
4134
4140
final CountDownLatch counter = new CountDownLatch (deletableLedgers .size () + deletableOffloadedLedgers .size ());
4135
4141
4136
4142
Set <Long > finishedDeletedLedgers = ConcurrentHashMap .newKeySet ();
4143
+ Set <Long > finishedDeletedOffloadedLedgers = ConcurrentHashMap .newKeySet ();
4137
4144
Set <Long > timeoutDeletedLedgers = ConcurrentHashMap .newKeySet ();
4138
4145
4139
4146
Set <Long > succeedDeletedLedgers = ConcurrentHashMap .newKeySet ();
@@ -4170,7 +4177,7 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
4170
4177
public void deleteLedgerComplete (Object ctx ) {
4171
4178
ledgers .remove (deletableOffloadedLedger );
4172
4179
counter .countDown ();
4173
- finishedDeletedLedgers .add (deletableOffloadedLedger );
4180
+ finishedDeletedOffloadedLedgers .add (deletableOffloadedLedger );
4174
4181
succeedDeletedOffloadedLedgers .add (deletableOffloadedLedger );
4175
4182
}
4176
4183
@@ -4179,19 +4186,25 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
4179
4186
log .warn ("[{}] Failed to delete offloaded ledger:{} due to" ,
4180
4187
name , deletableOffloadedLedger , exception );
4181
4188
counter .countDown ();
4182
- finishedDeletedLedgers .add (deletableOffloadedLedger );
4189
+ finishedDeletedOffloadedLedgers .add (deletableOffloadedLedger );
4183
4190
failDeletedOffloadedLedgers .add (deletableOffloadedLedger );
4184
4191
}
4185
4192
});
4186
4193
}
4187
4194
4188
4195
try {
4189
4196
if (!counter .await (AsyncOperationTimeoutSeconds , TimeUnit .SECONDS )) {
4190
- for (Long ledgerId : Stream .concat (deletableLedgers .stream (), deletableOffloadedLedgers .stream ())
4191
- .collect (Collectors .toSet ())) {
4192
- if (!finishedDeletedLedgers .contains (ledgerId )) {
4193
- log .warn ("[{}] Failed to delete ledger:{} due to operation timeout" , name , ledgerId );
4194
- timeoutDeletedLedgers .add (ledgerId );
4197
+ for (Long deletableLedger : deletableLedgers ) {
4198
+ if (!finishedDeletedLedgers .contains (deletableLedger )) {
4199
+ log .warn ("[{}] Failed to delete ledger:{} due to operation timeout" , name , deletableLedger );
4200
+ timeoutDeletedLedgers .add (deletableLedger );
4201
+ }
4202
+ }
4203
+ for (Long deletableOffloadedLedger : deletableOffloadedLedgers ) {
4204
+ if (!finishedDeletedOffloadedLedgers .contains (deletableOffloadedLedger )) {
4205
+ log .warn ("[{}] Failed to delete offloaded ledger:{} due to operation timeout" ,
4206
+ name , deletableOffloadedLedger );
4207
+ timeoutDeletedLedgers .add (deletableOffloadedLedger );
4195
4208
}
4196
4209
}
4197
4210
}
0 commit comments