Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workers: experimental BroadcastChannel #36271

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,98 @@ if (isMainThread) {
}
```

## Class: `BroadcastChannel extends EventTarget`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
Instances of `BroadcastChannel` allow asynchronous one-to-many communication
with all other `BroadcastChannel` instances bound to the same channel name.

```js
'use strict';

const {
isMainThread,
BroadcastChannel,
Worker
} = require('worker_threads');

const bc = new BroadcastChannel('hello');

if (isMainThread) {
let c = 0;
bc.onmessage = (event) => {
console.log(event.data);
if (++c === 10) bc.close();
};
for (let n = 0; n < 10; n++)
new Worker(__filename);
} else {
bc.postMessage('hello from every worker');
bc.close();
}
```

### `new BroadcastChannel(name)`
<!-- YAML
added: REPLACEME
-->

* `name` {any} The name of the channel to connect to. Any JavaScript value
that can be converted to a string using ``${name}`` is permitted.

### `broadcastChannel.close()`
<!-- YAML
added: REPLACEME
-->

Closes the `BroadcastChannel` connection.

### `broadcastChannel.onmessage`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason these are documented as event handlers and not event "message" etc?

Copy link
Member Author

@jasnell jasnell Dec 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because from the documentation I've been able to find on web usage, the onmessage pattern tends to be more common. We can tweak the documentation later if necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify because of the "we can tweak the docs later" comment - a LGTM from me with comments always means "This is fine to land but I have these comments/questions we should probably address at some point".

If I think something isn't ready to land I never LGTM, I typically (like in this case) check the code out, play with it, put a debugger (in test/parallel/test-worker-broadcastchannel.js here, kind of annoying in worker_threads because I do it from ndb and not vscode), add some logs and LGTM.

I typically don't leave style nits because I usually really don't care about those.

There are a few (very minor) nits here that I might follow up with a PR about later if someone ever complains about them namely stuff like onmessage not being logged when you util.inspect a BroadcastChannel here but it being logged in Chrome. I honestly think these things aren't worth mentioning in PRs most of the time.

<!-- YAML
added: REPLACEME
-->

* Type: {Function} Invoked with a single `MessageEvent` argument
when a message is received.

### `broadcastChannel.onmessageerror`
<!-- YAML
added: REPLACEME
-->

* Type: {Function} Invoked with a received message cannot be
deserialized.

### `broadcastChannel.postMessage(message)`
<!-- YAML
added: REPLACEME
-->

* `message` {any} Any cloneable JavaScript value.

### `broadcastChannel.ref()`
<!-- YAML
added: REPLACEME
-->

Opposite of `unref()`. Calling `ref()` on a previously `unref()`ed
BroadcastChannel will *not* let the program exit if it's the only active handle
left (the default behavior). If the port is `ref()`ed, calling `ref()` again
will have no effect.

### `broadcastChannel.unref()`
<!-- YAML
added: REPLACEME
-->

Calling `unref()` on a BroadcastChannel will allow the thread to exit if this
is the only active handle in the event system. If the BroadcastChannel is
already `unref()`ed calling `unref()` again will have no effect.

## Class: `MessageChannel`
<!-- YAML
added: v10.5.0
Expand Down
87 changes: 83 additions & 4 deletions lib/internal/worker/io.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ const {
const {
MessagePort,
MessageChannel,
broadcastChannel,
drainMessagePort,
moveMessagePortToContext,
receiveMessageOnPort: receiveMessageOnPort_,
stopMessagePort,
checkMessagePort
checkMessagePort,
DOMException,
} = internalBinding('messaging');
const {
getEnvMessagePort
Expand All @@ -41,14 +43,20 @@ const {
} = require('internal/event_target');
const { inspect } = require('internal/util/inspect');
const {
ERR_INVALID_ARG_TYPE
} = require('internal/errors').codes;
codes: {
ERR_INVALID_ARG_TYPE,
ERR_MISSING_ARGS,
}
} = require('internal/errors');

const kData = Symbol('kData');
const kHandle = Symbol('kHandle');
const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const kLastEventId = Symbol('kLastEventId');
const kName = Symbol('kName');
const kOrigin = Symbol('kOrigin');
const kOnMessage = Symbol('kOnMessage');
const kOnMessageError = Symbol('kOnMessageError');
const kPort = Symbol('kPort');
const kPorts = Symbol('kPorts');
const kWaitingStreams = Symbol('kWaitingStreams');
Expand Down Expand Up @@ -324,6 +332,76 @@ function receiveMessageOnPort(port) {
return { message };
}

function onMessageEvent(type, data) {
this.dispatchEvent(new MessageEvent(type, { data }));
}

class BroadcastChannel extends EventTarget {
constructor(name) {
if (arguments.length === 0)
throw new ERR_MISSING_ARGS('name');
super();
this[kName] = `${name}`;
this[kHandle] = broadcastChannel(this[kName]);
this[kOnMessage] = onMessageEvent.bind(this, 'message');
this[kOnMessageError] = onMessageEvent.bind(this, 'messageerror');
this[kHandle].on('message', this[kOnMessage]);
this[kHandle].on('messageerror', this[kOnMessageError]);
}

[inspect.custom](depth, options) {
if (depth < 0)
return 'BroadcastChannel';

const opts = {
...options,
depth: options.depth == null ? null : options.depth - 1
};

return `BroadcastChannel ${inspect({
name: this[kName],
active: this[kHandle] !== undefined,
}, opts)}`;
}

get name() { return this[kName]; }

close() {
if (this[kHandle] === undefined)
return;
this[kHandle].off('message', this[kOnMessage]);
this[kHandle].off('messageerror', this[kOnMessageError]);
this[kOnMessage] = undefined;
this[kOnMessageError] = undefined;
this[kHandle].close();
this[kHandle] = undefined;
}

postMessage(message) {
if (arguments.length === 0)
throw new ERR_MISSING_ARGS('message');
if (this[kHandle] === undefined)
throw new DOMException('BroadcastChannel is closed.');
if (this[kHandle].postMessage(message) === undefined)
throw new DOMException('Message could not be posted.');
}

ref() {
if (this[kHandle])
this[kHandle].ref();
return this;
}

unref() {
if (this[kHandle])
this[kHandle].unref();
return this;
}
}

defineEventHandler(BroadcastChannel.prototype, 'message');
defineEventHandler(BroadcastChannel.prototype, 'messageerror');

module.exports = {
drainMessagePort,
messageTypes,
Expand All @@ -339,5 +417,6 @@ module.exports = {
setupPortReferencing,
ReadableWorkerStdio,
WritableWorkerStdio,
createWorkerStdio
createWorkerStdio,
BroadcastChannel,
};
2 changes: 2 additions & 0 deletions lib/worker_threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const {
MessageChannel,
moveMessagePortToContext,
receiveMessageOnPort,
BroadcastChannel,
} = require('internal/worker/io');

const {
Expand All @@ -32,4 +33,5 @@ module.exports = {
Worker,
parentPort: null,
workerData: null,
BroadcastChannel,
};
Loading