Skip to content

Commit e3e3f10

Browse files
committed
Add comments explaining the intended flow of QueueCustomEventImpl::run
As requested on #3647. It's possible that this would benefit from being split up into multiple functions, but I don't really want to mess with the actual code any more until I'm back from my time off unless absolutely needed.
1 parent b3e64ed commit e3e3f10

File tree

1 file changed

+21
-8
lines changed

1 file changed

+21
-8
lines changed

src/workerd/api/queue.c++

+21-8
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,10 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
535535
kj::Maybe<kj::StringPtr> entrypointName,
536536
Frankenvalue props,
537537
kj::TaskSet& waitUntilTasks) {
538+
// This method has three main chunks of logic:
539+
// 1. Do all necessary setup work. This starts right below this comment.
540+
// 2. Call into the worker's queue event handler.
541+
// 3. Wait on the necessary portions of the worker's code to complete.
538542
incomingRequest->delivered();
539543
auto& context = incomingRequest->getContext();
540544

@@ -572,9 +576,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
572576
};
573577
auto queueEventHolder = kj::refcounted<QueueEventHolder>();
574578

575-
// It's a little ugly, but the usage of waitUntil (and finishScheduled) down below are here so
576-
// that users can write queue handlers in the old addEventListener("queue", ...) syntax (where we
577-
// can't just wait on their addEventListener handler to resolve because it can't be async).
579+
// 2. This is where we call into the worker's queue event handler
578580
auto runProm = context.run(
579581
[this, entrypointName = entrypointName, &context, queueEvent = kj::addRef(*queueEventHolder),
580582
&metrics = incomingRequest->getMetrics(), outcomeObserver = kj::mv(outcomeObserver),
@@ -591,6 +593,13 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
591593
queueEvent->isServiceWorkerHandler = startResp.isServiceWorkerHandler;
592594
});
593595

596+
// 3. Now that we've (asynchronously) called into the event handler, wait on all necessary async
597+
// work to complete. This logic is split into two completely separate code paths depending on
598+
// whether the queueConsumerNoWaitForWaitUntil compatibility flag is enabled.
599+
// * In the enabled path, the queue event can be considered complete as soon as the event handler
600+
// returns and the promise that it returns (if any) has resolved.
601+
// * In the disabled path, the queue event isn't complete until all waitUntil'ed promises resolve.
602+
// This was how Queues originally worked, but made for a poor user experience.
594603
auto compatFlags = context.getWorker().getIsolate().getApi().getFeatureFlags();
595604
if (compatFlags.getQueueConsumerNoWaitForWaitUntil()) {
596605
// The user has opted in to only waiting on their event handler rather than all waitUntil'd
@@ -602,10 +611,11 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
602611
auto outcome = co_await runProm
603612
.then([queueEvent = kj::addRef(
604613
*queueEventHolder)]() mutable -> kj::Promise<EventOutcome> {
605-
// If it returned a promise, wait on the promise.
614+
// If the queue handler returned a promise, wait on the promise.
606615
KJ_IF_SOME(handlerProm, queueEvent->exportedHandlerProm) {
607616
return handlerProm.then([]() { return EventOutcome::OK; });
608617
}
618+
// If not, we can consider the invocation complete.
609619
return EventOutcome::OK;
610620
})
611621
.catch_([](kj::Exception&& e) {
@@ -633,9 +643,11 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
633643
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;
634644
outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU;
635645
} else {
636-
// If we aren't going to wait on the waitUntil tasks via a call to
637-
// incomingRequest->finishScheduled(), we're responsible for calling draing() on the
638-
// incomingRequest to ensure that waitUntil tasks are run in the backgound.
646+
// We're responsible for calling drain() on the incomingRequest to ensure that waitUntil tasks
647+
// can continue to run in the backgound for a while even after we return a result to the
648+
// caller of this event. But this is only needed in this code path because in all other code
649+
// paths we call incomingRequest->finishScheduled(), which already takes care of waiting on
650+
// waitUntil tasks.
639651
waitUntilTasks.add(incomingRequest->drain().attach(
640652
kj::mv(incomingRequest), kj::addRef(*queueEventHolder), kj::addRef(*this)));
641653
}
@@ -654,7 +666,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
654666
auto result = co_await incomingRequest->finishScheduled();
655667
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;
656668

657-
// Log some debug info if the request timed out or was aborted.
669+
// Log some debug info if the request timed out or was aborted, to aid in debugging situations
670+
// where consumer workers appear to get stuck and repeatedly take 15 minutes.
658671
// In particular, detect whether or not the users queue() handler function completed
659672
// and include info about other waitUntil tasks that may have caused the request to timeout.
660673
if (!completed) {

0 commit comments

Comments
 (0)