Skip to content

Commit

Permalink
worker: add experimental BroadcastChannel
Browse files Browse the repository at this point in the history
Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: nodejs#36271
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
jasnell authored and cjihrig committed Dec 8, 2020
1 parent 725f2c1 commit cc8147b
Show file tree
Hide file tree
Showing 8 changed files with 689 additions and 72 deletions.
92 changes: 92 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,98 @@ if (isMainThread) {
}
```

## Class: `BroadcastChannel extends EventTarget`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
Instances of `BroadcastChannel` allow asynchronous one-to-many communication
with all other `BroadcastChannel` instances bound to the same channel name.

```js
'use strict';

const {
isMainThread,
BroadcastChannel,
Worker
} = require('worker_threads');

const bc = new BroadcastChannel('hello');

if (isMainThread) {
let c = 0;
bc.onmessage = (event) => {
console.log(event.data);
if (++c === 10) bc.close();
};
for (let n = 0; n < 10; n++)
new Worker(__filename);
} else {
bc.postMessage('hello from every worker');
bc.close();
}
```

### `new BroadcastChannel(name)`
<!-- YAML
added: REPLACEME
-->

* `name` {any} The name of the channel to connect to. Any JavaScript value
that can be converted to a string using ``${name}`` is permitted.

### `broadcastChannel.close()`
<!-- YAML
added: REPLACEME
-->

Closes the `BroadcastChannel` connection.

### `broadcastChannel.onmessage`
<!-- YAML
added: REPLACEME
-->

* Type: {Function} Invoked with a single `MessageEvent` argument
when a message is received.

### `broadcastChannel.onmessageerror`
<!-- YAML
added: REPLACEME
-->

* Type: {Function} Invoked with a received message cannot be
deserialized.

### `broadcastChannel.postMessage(message)`
<!-- YAML
added: REPLACEME
-->

* `message` {any} Any cloneable JavaScript value.

### `broadcastChannel.ref()`
<!-- YAML
added: REPLACEME
-->

Opposite of `unref()`. Calling `ref()` on a previously `unref()`ed
BroadcastChannel will *not* let the program exit if it's the only active handle
left (the default behavior). If the port is `ref()`ed, calling `ref()` again
will have no effect.

### `broadcastChannel.unref()`
<!-- YAML
added: REPLACEME
-->

Calling `unref()` on a BroadcastChannel will allow the thread to exit if this
is the only active handle in the event system. If the BroadcastChannel is
already `unref()`ed calling `unref()` again will have no effect.

## Class: `MessageChannel`
<!-- YAML
added: v10.5.0
Expand Down
87 changes: 83 additions & 4 deletions lib/internal/worker/io.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ const {
const {
MessagePort,
MessageChannel,
broadcastChannel,
drainMessagePort,
moveMessagePortToContext,
receiveMessageOnPort: receiveMessageOnPort_,
stopMessagePort,
checkMessagePort
checkMessagePort,
DOMException,
} = internalBinding('messaging');
const {
getEnvMessagePort
Expand All @@ -41,14 +43,20 @@ const {
} = require('internal/event_target');
const { inspect } = require('internal/util/inspect');
const {
ERR_INVALID_ARG_TYPE
} = require('internal/errors').codes;
codes: {
ERR_INVALID_ARG_TYPE,
ERR_MISSING_ARGS,
}
} = require('internal/errors');

const kData = Symbol('kData');
const kHandle = Symbol('kHandle');
const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const kLastEventId = Symbol('kLastEventId');
const kName = Symbol('kName');
const kOrigin = Symbol('kOrigin');
const kOnMessage = Symbol('kOnMessage');
const kOnMessageError = Symbol('kOnMessageError');
const kPort = Symbol('kPort');
const kPorts = Symbol('kPorts');
const kWaitingStreams = Symbol('kWaitingStreams');
Expand Down Expand Up @@ -324,6 +332,76 @@ function receiveMessageOnPort(port) {
return { message };
}

function onMessageEvent(type, data) {
this.dispatchEvent(new MessageEvent(type, { data }));
}

class BroadcastChannel extends EventTarget {
constructor(name) {
if (arguments.length === 0)
throw new ERR_MISSING_ARGS('name');
super();
this[kName] = `${name}`;
this[kHandle] = broadcastChannel(this[kName]);
this[kOnMessage] = onMessageEvent.bind(this, 'message');
this[kOnMessageError] = onMessageEvent.bind(this, 'messageerror');
this[kHandle].on('message', this[kOnMessage]);
this[kHandle].on('messageerror', this[kOnMessageError]);
}

[inspect.custom](depth, options) {
if (depth < 0)
return 'BroadcastChannel';

const opts = {
...options,
depth: options.depth == null ? null : options.depth - 1
};

return `BroadcastChannel ${inspect({
name: this[kName],
active: this[kHandle] !== undefined,
}, opts)}`;
}

get name() { return this[kName]; }

close() {
if (this[kHandle] === undefined)
return;
this[kHandle].off('message', this[kOnMessage]);
this[kHandle].off('messageerror', this[kOnMessageError]);
this[kOnMessage] = undefined;
this[kOnMessageError] = undefined;
this[kHandle].close();
this[kHandle] = undefined;
}

postMessage(message) {
if (arguments.length === 0)
throw new ERR_MISSING_ARGS('message');
if (this[kHandle] === undefined)
throw new DOMException('BroadcastChannel is closed.');
if (this[kHandle].postMessage(message) === undefined)
throw new DOMException('Message could not be posted.');
}

ref() {
if (this[kHandle])
this[kHandle].ref();
return this;
}

unref() {
if (this[kHandle])
this[kHandle].unref();
return this;
}
}

defineEventHandler(BroadcastChannel.prototype, 'message');
defineEventHandler(BroadcastChannel.prototype, 'messageerror');

module.exports = {
drainMessagePort,
messageTypes,
Expand All @@ -339,5 +417,6 @@ module.exports = {
setupPortReferencing,
ReadableWorkerStdio,
WritableWorkerStdio,
createWorkerStdio
createWorkerStdio,
BroadcastChannel,
};
2 changes: 2 additions & 0 deletions lib/worker_threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const {
MessageChannel,
moveMessagePortToContext,
receiveMessageOnPort,
BroadcastChannel,
} = require('internal/worker/io');

const {
Expand All @@ -32,4 +33,5 @@ module.exports = {
Worker,
parentPort: null,
workerData: null,
BroadcastChannel,
};
Loading

0 comments on commit cc8147b

Please sign in to comment.