Skip to content

Commit cfd7e60

Browse files
authored
[fix][txn] Catch and log runtime exceptions in async operations (#19258)
1 parent 32cdf38 commit cfd7e60

File tree

3 files changed

+106
-88
lines changed

3 files changed

+106
-88
lines changed

pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java

+22
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.CompletableFuture;
2727
import java.util.concurrent.CompletionException;
2828
import java.util.concurrent.ExecutionException;
29+
import java.util.concurrent.Executor;
2930
import java.util.concurrent.ScheduledExecutorService;
3031
import java.util.concurrent.ScheduledFuture;
3132
import java.util.concurrent.TimeUnit;
@@ -258,4 +259,25 @@ public static CompletionException wrapToCompletionException(Throwable throwable)
258259
return new CompletionException(throwable);
259260
}
260261
}
262+
263+
/**
264+
* Executes an operation using the supplied {@link Executor}
265+
* and notify failures on the supplied {@link CompletableFuture}.
266+
*
267+
* @param runnable the runnable to execute
268+
* @param executor the executor to use for executing the runnable
269+
* @param completableFuture the future to complete in case of exceptions
270+
* @return
271+
*/
272+
273+
public static void safeRunAsync(Runnable runnable,
274+
Executor executor,
275+
CompletableFuture completableFuture) {
276+
CompletableFuture
277+
.runAsync(runnable, executor)
278+
.exceptionally((throwable) -> {
279+
completableFuture.completeExceptionally(throwable);
280+
return null;
281+
});
282+
}
261283
}

pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java

+49-50
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public CompletableFuture<TransactionMetadataStore> init(TransactionRecoverTracke
115115
+ tcID.toString() + " change state to Initializing error when init it"));
116116
} else {
117117
recoverTime.setRecoverStartTime(System.currentTimeMillis());
118-
internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
118+
FutureUtil.safeRunAsync(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
119119
@Override
120120
public void replayComplete() {
121121
recoverTracker.appendOpenTransactionToTimeoutTracker();
@@ -203,7 +203,7 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran
203203
log.error(e.getMessage(), e);
204204
}
205205
}
206-
}));
206+
}), internalPinnedExecutor, completableFuture);
207207
}
208208
return completableFuture;
209209
}
@@ -227,60 +227,59 @@ public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID) {
227227

228228
@Override
229229
public CompletableFuture<TxnID> newTransaction(long timeOut) {
230-
if (this.maxActiveTransactionsPerCoordinator == 0
231-
|| this.maxActiveTransactionsPerCoordinator > txnMetaMap.size()) {
232-
CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
233-
internalPinnedExecutor.execute(() -> {
234-
if (!checkIfReady()) {
235-
completableFuture.completeExceptionally(new CoordinatorException
236-
.TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
237-
return;
238-
}
239-
240-
long mostSigBits = tcID.getId();
241-
long leastSigBits = sequenceIdGenerator.generateSequenceId();
242-
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
243-
long currentTimeMillis = System.currentTimeMillis();
244-
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
245-
.setTxnidMostBits(mostSigBits)
246-
.setTxnidLeastBits(leastSigBits)
247-
.setStartTime(currentTimeMillis)
248-
.setTimeoutMs(timeOut)
249-
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
250-
.setLastModificationTime(currentTimeMillis)
251-
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
252-
transactionLog.append(transactionMetadataEntry)
253-
.whenComplete((position, throwable) -> {
254-
if (throwable != null) {
255-
completableFuture.completeExceptionally(throwable);
256-
} else {
257-
appendLogCount.increment();
258-
TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
259-
List<Position> positions = new ArrayList<>();
260-
positions.add(position);
261-
Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
262-
txnMetaMap.put(leastSigBits, pair);
263-
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
264-
createdTransactionCount.increment();
265-
completableFuture.complete(txnID);
266-
}
267-
});
268-
});
269-
return completableFuture;
270-
} else {
230+
if (this.maxActiveTransactionsPerCoordinator != 0
231+
&& this.maxActiveTransactionsPerCoordinator <= txnMetaMap.size()) {
271232
return FutureUtil.failedFuture(new CoordinatorException.ReachMaxActiveTxnException("New txn op "
272233
+ "reach max active txn! tcId : " + getTransactionCoordinatorID().getId()));
273234
}
235+
CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
236+
FutureUtil.safeRunAsync(() -> {
237+
if (!checkIfReady()) {
238+
completableFuture.completeExceptionally(new CoordinatorException
239+
.TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
240+
return;
241+
}
242+
243+
long mostSigBits = tcID.getId();
244+
long leastSigBits = sequenceIdGenerator.generateSequenceId();
245+
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
246+
long currentTimeMillis = System.currentTimeMillis();
247+
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
248+
.setTxnidMostBits(mostSigBits)
249+
.setTxnidLeastBits(leastSigBits)
250+
.setStartTime(currentTimeMillis)
251+
.setTimeoutMs(timeOut)
252+
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
253+
.setLastModificationTime(currentTimeMillis)
254+
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
255+
transactionLog.append(transactionMetadataEntry)
256+
.whenComplete((position, throwable) -> {
257+
if (throwable != null) {
258+
completableFuture.completeExceptionally(throwable);
259+
} else {
260+
appendLogCount.increment();
261+
TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
262+
List<Position> positions = new ArrayList<>();
263+
positions.add(position);
264+
Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
265+
txnMetaMap.put(leastSigBits, pair);
266+
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
267+
createdTransactionCount.increment();
268+
completableFuture.complete(txnID);
269+
}
270+
});
271+
}, internalPinnedExecutor, completableFuture);
272+
return completableFuture;
274273
}
275274

276275
@Override
277276
public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
278277
CompletableFuture<Void> promise = new CompletableFuture<>();
279-
internalPinnedExecutor.execute(() -> {
278+
FutureUtil.safeRunAsync(() -> {
280279
if (!checkIfReady()) {
281280
promise
282281
.completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(tcID,
283-
State.Ready, getState(), "add produced partition"));
282+
State.Ready, getState(), "add produced partition"));
284283
return;
285284
}
286285
getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
@@ -313,15 +312,15 @@ public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<Strin
313312
promise.completeExceptionally(ex);
314313
return null;
315314
});
316-
});
315+
}, internalPinnedExecutor, promise);
317316
return promise;
318317
}
319318

320319
@Override
321320
public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
322321
List<TransactionSubscription> txnSubscriptions) {
323322
CompletableFuture<Void> promise = new CompletableFuture<>();
324-
internalPinnedExecutor.execute(() -> {
323+
FutureUtil.safeRunAsync(() -> {
325324
if (!checkIfReady()) {
326325
promise.completeExceptionally(new CoordinatorException
327326
.TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "add acked partition"));
@@ -357,15 +356,15 @@ public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
357356
promise.completeExceptionally(ex);
358357
return null;
359358
});
360-
});
359+
}, internalPinnedExecutor, promise);
361360
return promise;
362361
}
363362

364363
@Override
365364
public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
366365
TxnStatus expectedStatus, boolean isTimeout) {
367366
CompletableFuture<Void> promise = new CompletableFuture<>();
368-
internalPinnedExecutor.execute(() -> {
367+
FutureUtil.safeRunAsync(() -> {
369368
if (!checkIfReady()) {
370369
promise.completeExceptionally(new CoordinatorException
371370
.TransactionMetadataStoreStateException(tcID,
@@ -426,7 +425,7 @@ public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
426425
promise.completeExceptionally(ex);
427426
return null;
428427
});
429-
});
428+
}, internalPinnedExecutor, promise);
430429
return promise;
431430
}
432431

pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java

+35-38
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.bookkeeper.mledger.ManagedLedgerException;
4040
import org.apache.bookkeeper.mledger.Position;
4141
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
42+
import org.apache.pulsar.common.util.FutureUtil;
4243

4344
/***
4445
* See PIP-160: https://github.com/apache/pulsar/issues/15516.
@@ -214,13 +215,13 @@ public void asyncAddData(T data, AddDataCallback callback, Object ctx){
214215
AsyncAddArgs.newInstance(callback, ctx, System.currentTimeMillis(), byteBuf));
215216
return;
216217
}
217-
singleThreadExecutorForWrite.execute(() -> {
218-
try {
219-
internalAsyncAddData(data, callback, ctx);
220-
} catch (Exception e){
221-
log.warn("Execute 'internalAsyncAddData' fail", e);
222-
}
223-
});
218+
CompletableFuture
219+
.runAsync(
220+
() -> internalAsyncAddData(data, callback, ctx), singleThreadExecutorForWrite)
221+
.exceptionally(e -> {
222+
log.warn("Execute 'internalAsyncAddData' fail", e);
223+
return null;
224+
});
224225
}
225226

226227
/**
@@ -271,21 +272,21 @@ private void internalAsyncAddData(T data, AddDataCallback callback, Object ctx){
271272
}
272273

273274
private void trigFlushByTimingTask(){
274-
singleThreadExecutorForWrite.execute(() -> {
275-
try {
276-
if (flushContext.asyncAddArgsList.isEmpty()) {
277-
return;
278-
}
279-
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), bytesSize,
280-
System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime);
281-
doFlush();
282-
} catch (Exception e){
283-
log.error("Trig flush by timing task fail.", e);
284-
} finally {
285-
// Start the next timing task.
286-
nextTimingTrigger();
287-
}
288-
});
275+
CompletableFuture
276+
.runAsync(() -> {
277+
if (flushContext.asyncAddArgsList.isEmpty()) {
278+
return;
279+
}
280+
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), bytesSize,
281+
System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime);
282+
doFlush();
283+
}, singleThreadExecutorForWrite)
284+
.whenComplete((ignore, e) -> {
285+
if (e != null) {
286+
log.warn("Execute 'trigFlushByTimingTask' fail", e);
287+
}
288+
nextTimingTrigger();
289+
});
289290
}
290291

291292
/**
@@ -379,24 +380,20 @@ public CompletableFuture<Void> close() {
379380
}
380381
CompletableFuture closeFuture = new CompletableFuture();
381382
// Cancel pending tasks and release resources.
382-
singleThreadExecutorForWrite.execute(() -> {
383-
try {
384-
// If some requests are flushed, BK will trigger these callbacks, and the remaining requests in should
385-
// fail.
386-
failureCallbackByContextAndRecycle(flushContext,
387-
new ManagedLedgerException.ManagedLedgerFencedException(
383+
FutureUtil.safeRunAsync(() -> {
384+
// If some requests are flushed, BK will trigger these callbacks, and the remaining requests in should
385+
// fail.
386+
failureCallbackByContextAndRecycle(flushContext,
387+
new ManagedLedgerException.ManagedLedgerFencedException(
388388
new Exception("Transaction log buffered write has closed")
389-
));
390-
// Cancel the timing task.
391-
if (!timeout.isCancelled()){
392-
this.timeout.cancel();
393-
}
394-
STATE_UPDATER.set(this, State.CLOSED);
395-
closeFuture.complete(null);
396-
} catch (Exception e){
397-
closeFuture.completeExceptionally(e);
389+
));
390+
// Cancel the timing task.
391+
if (!timeout.isCancelled()) {
392+
this.timeout.cancel();
398393
}
399-
});
394+
STATE_UPDATER.set(this, State.CLOSED);
395+
closeFuture.complete(null);
396+
}, singleThreadExecutorForWrite, closeFuture);
400397
return closeFuture;
401398
}
402399

0 commit comments

Comments
 (0)