diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 5aec784f60cfb3..19065fdb7d1be5 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -604,11 +604,30 @@ void MessagePort::OnMessage() { HandleScope handle_scope(env()->isolate()); Local context = object(env()->isolate())->CreationContext(); + size_t processing_limit; + { + Mutex::ScopedLock(data_->mutex_); + processing_limit = std::max(data_->incoming_messages_.size(), + static_cast(1000)); + } + // data_ can only ever be modified by the owner thread, so no need to lock. // However, the message port may be transferred while it is processing // messages, so we need to check that this handle still owns its `data_` field // on every iteration. while (data_) { + if (processing_limit-- == 0) { + // Prevent event loop starvation by only processing those messages without + // interruption that were already present when the OnMessage() call was + // first triggered, but at least 1000 messages because otherwise the + // overhead of repeatedly triggering the uv_async_t instance becomes + // noticable, at least on Windows. + // (That might require more investigation by somebody more familiar with + // Windows.) + TriggerAsync(); + return; + } + HandleScope handle_scope(env()->isolate()); Context::Scope context_scope(context); diff --git a/test/parallel/test-worker-message-port-close-while-receiving.js b/test/parallel/test-worker-message-port-close-while-receiving.js new file mode 100644 index 00000000000000..d6f73caff1fb66 --- /dev/null +++ b/test/parallel/test-worker-message-port-close-while-receiving.js @@ -0,0 +1,15 @@ +'use strict'; +const common = require('../common'); + +const { MessageChannel } = require('worker_threads'); + +// Make sure that closing a message port while receiving messages on it does +// not stop messages that are already in the queue from being emitted. + +const { port1, port2 } = new MessageChannel(); + +port1.on('message', common.mustCall(() => { + port1.close(); +}, 2)); +port2.postMessage('foo'); +port2.postMessage('bar'); diff --git a/test/parallel/test-worker-message-port-infinite-message-loop.js b/test/parallel/test-worker-message-port-infinite-message-loop.js new file mode 100644 index 00000000000000..640b3383ca62c3 --- /dev/null +++ b/test/parallel/test-worker-message-port-infinite-message-loop.js @@ -0,0 +1,29 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { MessageChannel } = require('worker_threads'); + +// Make sure that an infinite asynchronous .on('message')/postMessage loop +// does not lead to a stack overflow and does not starve the event loop. +// We schedule timeouts both from before the the .on('message') handler and +// inside of it, which both should run. + +const { port1, port2 } = new MessageChannel(); +let count = 0; +port1.on('message', () => { + if (count === 0) { + setTimeout(common.mustCall(() => { + port1.close(); + }), 0); + } + + port2.postMessage(0); + assert(count++ < 10000, `hit ${count} loop iterations`); +}); + +port2.postMessage(0); + +// This is part of the test -- the event loop should be available and not stall +// out due to the recursive .postMessage() calls. +setTimeout(common.mustCall(), 0);