Skip to content

Commit

Permalink
Log error when a queue consumer gets aborted
Browse files Browse the repository at this point in the history
To help debug the problem where queue consumer isolates can time out
unexpectedly.
  • Loading branch information
a-robinson committed Jan 31, 2025
1 parent 2e2a0cd commit acf620e
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -568,10 +568,10 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
auto result = co_await incomingRequest->finishScheduled();
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;

// Log some debug info if the request timed out.
// Log some debug info if the request timed out or was aborted.
// In particular, detect whether or not the users queue() handler function completed
// and include info about other waitUntil tasks that may have caused the request to timeout.
if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) {
if (!completed) {
kj::String status;
if (queueEventHolder->event.get() == nullptr) {
status = kj::str("Empty");
Expand All @@ -591,9 +591,24 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
}
}
}
auto scriptId = incomingRequest->getContext().getWorker().getScript().getId();
auto tasks = incomingRequest->getContext().getWaitUntilTasks().trace();
KJ_LOG(WARNING, "NOSENTRY queue event hit timeout", scriptId, status, tasks);
auto& ioContext = incomingRequest->getContext();
auto scriptId = ioContext.getWorker().getScript().getId();
auto tasks = ioContext.getWaitUntilTasks().trace();
if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) {
KJ_LOG(WARNING, "NOSENTRY queue event hit timeout", scriptId, status, tasks);
} else if (result == IoContext_IncomingRequest::FinishScheduledResult::ABORTED) {
// Attempt to grab the error message to understand the reason for the abort.
// Include a timeout just in case for some unexpected reason the onAbort promise hasn't
// already rejected.
kj::String abortError;
co_await ioContext.onAbort()
.then([] {}, [&abortError](kj::Exception&& e) {
abortError = kj::str(e);
}).exclusiveJoin(ioContext.afterLimitTimeout(1 * kj::MICROSECONDS).then([&abortError]() {
abortError = kj::str("onAbort() promise has unexpectedly not yet been rejected");
}));
KJ_LOG(WARNING, "NOSENTRY queue event aborted", abortError, scriptId, status, tasks);
}
}

co_return WorkerInterface::CustomEvent::Result{
Expand Down

0 comments on commit acf620e

Please sign in to comment.