From 986f755364aed10f2caa8cd979245a9948e3dba9 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Sat, 19 Jun 2021 15:28:49 +0200 Subject: [PATCH] feat: `MessageChannel` and `MessagePort` This commit introduces support for MessageChannel and MessagePort. MessagePorts can be transfered across a message channel. --- Cargo.lock | 1 + cli/dts/lib.deno.shared_globals.d.ts | 4 - extensions/web/02_event.js | 1 + extensions/web/13_message_port.js | 180 ++++++++++++++++++++++ extensions/web/Cargo.toml | 1 + extensions/web/internal.d.ts | 14 +- extensions/web/lib.deno_web.d.ts | 4 + extensions/web/lib.rs | 22 +++ extensions/web/message_port.rs | 219 +++++++++++++++++++++++++++ runtime/js/99_main.js | 3 + 10 files changed, 442 insertions(+), 7 deletions(-) create mode 100644 extensions/web/13_message_port.js create mode 100644 extensions/web/message_port.rs diff --git a/Cargo.lock b/Cargo.lock index b882fa43d610da..7059a69a9a2c64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -781,6 +781,7 @@ dependencies = [ "encoding_rs", "futures", "serde", + "tokio", "uuid", ] diff --git a/cli/dts/lib.deno.shared_globals.d.ts b/cli/dts/lib.deno.shared_globals.d.ts index d46a43e674db83..48883589ad2dbd 100644 --- a/cli/dts/lib.deno.shared_globals.d.ts +++ b/cli/dts/lib.deno.shared_globals.d.ts @@ -382,10 +382,6 @@ declare class ErrorEvent extends Event { constructor(type: string, eventInitDict?: ErrorEventInit); } -interface PostMessageOptions { - transfer?: any[]; -} - interface AbstractWorkerEventMap { "error": ErrorEvent; } diff --git a/extensions/web/02_event.js b/extensions/web/02_event.js index 8ee6acc61a1c1c..d0cec3273f0b32 100644 --- a/extensions/web/02_event.js +++ b/extensions/web/02_event.js @@ -1129,6 +1129,7 @@ }); this.data = eventInitDict?.data ?? null; + this.ports = eventInitDict?.ports ?? []; this.origin = eventInitDict?.origin ?? ""; this.lastEventId = eventInitDict?.lastEventId ?? ""; } diff --git a/extensions/web/13_message_port.js b/extensions/web/13_message_port.js new file mode 100644 index 00000000000000..29dbdc8f64836d --- /dev/null +++ b/extensions/web/13_message_port.js @@ -0,0 +1,180 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +// @ts-check +/// +/// +/// +/// + +"use strict"; + +((window) => { + const core = window.Deno.core; + const webidl = window.__bootstrap.webidl; + const { setEventTargetData } = window.__bootstrap.eventTarget; + + class MessageChannel { + /** @type {MessagePort} */ + #port1; + /** @type {MessagePort} */ + #port2; + + constructor() { + this[webidl.brand] = webidl.brand; + const [port1Id, port2Id] = opCreateEntangledMessagePort(); + const port1 = createMessagePort(port1Id); + const port2 = createMessagePort(port2Id); + this.#port1 = port1; + this.#port2 = port2; + } + + get port1() { + webidl.assertBranded(this, MessageChannel); + return this.#port1; + } + + get port2() { + webidl.assertBranded(this, MessageChannel); + return this.#port2; + } + + [Symbol.for("Deno.inspect")](inspect) { + return `MessageChannel ${ + inspect({ port1: this.port1, port2: this.port2 }) + }`; + } + + get [Symbol.toStringTag]() { + return "MessageChannel"; + } + } + + const _id = Symbol("id"); + + /** + * @param {number} id + * @returns {MessagePort} + */ + function createMessagePort(id) { + const port = webidl.createBranded(MessagePort); + setEventTargetData(port); + port[_id] = id; + return port; + } + + class MessagePort extends EventTarget { + /** @type {number | null} */ + [_id]; + + constructor() { + super(); + webidl.illegalConstructor(); + } + + /** + * @param {any} message + * @param {object[] | PostMessageOptions} transferOrOptions + */ + postMessage(message, transferOrOptions = {}) { + let transfer = []; + if (Array.isArray(transferOrOptions)) { + transfer = transferOrOptions; + } else if (Array.isArray(transferOrOptions.transfer)) { + transfer = transferOrOptions.transfer; + } + + const data = serializeJsMessageData(message, transfer); + core.opSync("op_message_port_post_message", this[_id], data); + } + + start() { + const self = this; + (async () => { + while (true) { + const data = await core.opAsync( + "op_message_port_recv_message", + this[_id], + ); + if (data === null) break; + const [message, transfer] = deserializeJsMessageData(data); + const event = new MessageEvent("message", { data: message, ports: transfer }); + self.dispatchEvent(event); + } + })(); + } + + close() { + if (this[_id] !== null) { + core.close(this[_id]); + } + } + } + + /** + * @returns {[number, number]} + */ + function opCreateEntangledMessagePort() { + return core.opSync("op_message_port_create_entangled"); + } + + /** + * @param {globalThis.__bootstrap.messagePort.MessageData} messageData + * @returns {[any, object[]]} + */ + function deserializeJsMessageData(messageData) { + /** @type {object[]} */ + const transferables = []; + + for (const transferable of messageData.transferables) { + switch (transferable.kind) { + case "messagePort": + const port = createMessagePort(transferable.data); + transferables.push(port); + break; + default: + throw new TypeError("Unreachable"); + } + } + + const data = core.deserialize(messageData.data); + + return [data, transferables]; + } + + /** + * @param {any} data + * @param {object[]} tranferables + * @returns {globalThis.__bootstrap.messagePort.MessageData} + */ + function serializeJsMessageData(data, tranferables) { + const serializedData = core.serialize(data); + + /** @type {globalThis.__bootstrap.messagePort.Transferable[]} */ + const serializedTransferables = []; + + for (const transferable of tranferables) { + if (transferable instanceof MessagePort) { + webidl.assertBranded(transferable, MessagePort); + const id = transferable[_id]; + if (id === null) { + throw new TypeError("Can not transfer disentangled message port"); + } + serializedTransferables.push({ kind: "messagePort", data: id }); + } else { + throw new TypeError("Value not transferable"); + } + } + + return { + data: serializedData, + transferables: serializedTransferables, + }; + } + + window.__bootstrap.messagePort = { + MessageChannel, + MessagePort, + deserializeJsMessageData, + serializeJsMessageData, + }; +})(globalThis); diff --git a/extensions/web/Cargo.toml b/extensions/web/Cargo.toml index 1d727b232f0958..2348721f353717 100644 --- a/extensions/web/Cargo.toml +++ b/extensions/web/Cargo.toml @@ -18,6 +18,7 @@ base64 = "0.13.0" deno_core = { version = "0.90.0", path = "../../core" } encoding_rs = "0.8.28" serde = "1.0" +tokio = "1.7" uuid = { version = "0.8.2", features = ["v4"] } [dev-dependencies] diff --git a/extensions/web/internal.d.ts b/extensions/web/internal.d.ts index 8ab1010771efc5..06976b28bf4633 100644 --- a/extensions/web/internal.d.ts +++ b/extensions/web/internal.d.ts @@ -4,9 +4,6 @@ /// declare namespace globalThis { - declare var TextEncoder: typeof TextEncoder; - declare var TextDecoder: typeof TextDecoder; - declare namespace __bootstrap { declare var infra: { collectSequenceOfCodepoints( @@ -85,5 +82,16 @@ declare namespace globalThis { ReadableStream: typeof ReadableStream; isReadableStreamDisturbed(stream: ReadableStream): boolean; }; + + declare namespace messagePort { + declare type Transferable = { + kind: "messagePort"; + data: number; + }; + declare interface MessageData { + data: Uint8Array; + transferables: Transferable[]; + } + } } } diff --git a/extensions/web/lib.deno_web.d.ts b/extensions/web/lib.deno_web.d.ts index 888fe9de9441f1..9eb42628a24892 100644 --- a/extensions/web/lib.deno_web.d.ts +++ b/extensions/web/lib.deno_web.d.ts @@ -648,3 +648,7 @@ interface TransformStreamDefaultControllerTransformCallback { controller: TransformStreamDefaultController, ): void | PromiseLike; } + +interface PostMessageOptions { + transfer?: any[]; +} diff --git a/extensions/web/lib.rs b/extensions/web/lib.rs index 9f783662050b7f..d74bb619d87bf6 100644 --- a/extensions/web/lib.rs +++ b/extensions/web/lib.rs @@ -1,11 +1,16 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +mod message_port; + +pub use crate::message_port::JsMessageData; + use deno_core::error::bad_resource_id; use deno_core::error::null_opbuf; use deno_core::error::range_error; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::include_js_files; +use deno_core::op_async; use deno_core::op_sync; use deno_core::url::Url; use deno_core::Extension; @@ -30,6 +35,10 @@ use std::sync::Mutex; use std::usize; use uuid::Uuid; +use crate::message_port::op_message_port_create_entangled; +use crate::message_port::op_message_port_post_message; +use crate::message_port::op_message_port_recv_message; + /// Load and execute the javascript code. pub fn init( blob_url_store: BlobUrlStore, @@ -52,6 +61,7 @@ pub fn init( "10_filereader.js", "11_blob_url.js", "12_location.js", + "13_message_port.js", )) .ops(vec![ ("op_base64_decode", op_sync(op_base64_decode)), @@ -71,6 +81,18 @@ pub fn init( "op_file_revoke_object_url", op_sync(op_file_revoke_object_url), ), + ( + "op_message_port_create_entangled", + op_sync(op_message_port_create_entangled), + ), + ( + "op_message_port_post_message", + op_sync(op_message_port_post_message), + ), + ( + "op_message_port_recv_message", + op_async(op_message_port_recv_message), + ), ]) .state(move |state| { state.put(blob_url_store.clone()); diff --git a/extensions/web/message_port.rs b/extensions/web/message_port.rs new file mode 100644 index 00000000000000..75f817c443148a --- /dev/null +++ b/extensions/web/message_port.rs @@ -0,0 +1,219 @@ +use std::borrow::Cow; +use std::cell::RefCell; +use std::rc::Rc; + +use deno_core::error::bad_resource_id; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::ZeroCopyBuf; +use deno_core::{CancelFuture, Resource}; +use deno_core::{CancelHandle, OpState}; +use deno_core::{RcRef, ResourceId}; +use serde::Deserialize; +use serde::Serialize; +use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc::UnboundedSender; + +enum Transferable { + MessagePort(MessagePort), +} + +type MessagePortMessage = (Vec, Vec); + +pub struct MessagePort { + rx: RefCell>, + tx: UnboundedSender, + cancel: Rc, +} + +impl MessagePort { + pub fn send( + &self, + state: &mut OpState, + data: JsMessageData, + ) -> Result<(), AnyError> { + let transferables = + deserialize_js_transferables(state, data.transferables)?; + if let Err(_) = self.tx.send((data.data.to_vec(), transferables)) { + // Swallow the failed to send error. It means the channel was disentangled, + // but not cleaned up. + } + + Ok(()) + } + + pub async fn recv( + &self, + state: Rc>, + ) -> Result, AnyError> { + let mut rx = self + .rx + .try_borrow_mut() + .map_err(|_| type_error("Port receiver is already borrowed"))?; + if let Ok(Some((data, transferables))) = + rx.recv().or_cancel(&self.cancel).await + { + let js_transferables = + serialize_transferables(&mut state.borrow_mut(), transferables); + return Ok(Some(JsMessageData { + data: ZeroCopyBuf::from(data), + transferables: js_transferables, + })); + } + Ok(None) + } + + pub fn disentangle(&self) { + self.cancel.cancel() + } +} +pub fn create_entangled_message_port() -> (MessagePort, MessagePort) { + let (port1_tx, port2_rx) = unbounded_channel::(); + let (port2_tx, port1_rx) = unbounded_channel::(); + + let cancel = CancelHandle::new_rc(); + + let port1 = MessagePort { + rx: RefCell::new(port1_rx), + tx: port1_tx, + cancel: cancel.clone(), + }; + + let port2 = MessagePort { + rx: RefCell::new(port2_rx), + tx: port2_tx, + cancel, + }; + + (port1, port2) +} + +pub struct MessagePortResource { + port: MessagePort, + cancel: CancelHandle, +} + +impl Resource for MessagePortResource { + fn name(&self) -> Cow { + "messagePort".into() + } + + fn close(self: Rc) { + self.port.disentangle(); + self.cancel.cancel(); + } +} + +pub fn op_message_port_create_entangled( + state: &mut OpState, + _: (), + _: (), +) -> Result<(ResourceId, ResourceId), AnyError> { + let (port1, port2) = create_entangled_message_port(); + + let port1_id = state.resource_table.add(MessagePortResource { + port: port1, + cancel: CancelHandle::new(), + }); + + let port2_id = state.resource_table.add(MessagePortResource { + port: port2, + cancel: CancelHandle::new(), + }); + + Ok((port1_id, port2_id)) +} + +#[derive(Deserialize, Serialize)] +#[serde(tag = "kind", content = "data", rename_all = "camelCase")] +pub enum JsTransferable { + #[serde(rename_all = "camelCase")] + MessagePort(ResourceId), +} + +fn deserialize_js_transferables( + state: &mut OpState, + js_transferables: Vec, +) -> Result, AnyError> { + let mut transferables = Vec::with_capacity(js_transferables.len()); + for js_transferable in js_transferables { + match js_transferable { + JsTransferable::MessagePort(id) => { + let resource = state + .resource_table + .take::(id) + .ok_or_else(|| type_error("Invalid message port transfer"))?; + resource.cancel.cancel(); + let resource = Rc::try_unwrap(resource) + .map_err(|_| type_error("Message port is not ready for transfer"))?; + transferables.push(Transferable::MessagePort(resource.port)); + } + } + } + Ok(transferables) +} + +fn serialize_transferables( + state: &mut OpState, + transferables: Vec, +) -> Vec { + let mut js_transferables = Vec::with_capacity(transferables.len()); + for transferable in transferables { + match transferable { + Transferable::MessagePort(port) => { + let rid = state.resource_table.add(MessagePortResource { + port, + cancel: CancelHandle::new(), + }); + js_transferables.push(JsTransferable::MessagePort(rid)); + } + } + } + js_transferables +} + +#[derive(Deserialize, Serialize)] +pub struct JsMessageData { + data: ZeroCopyBuf, + transferables: Vec, +} + +pub fn op_message_port_post_message( + state: &mut OpState, + rid: ResourceId, + data: JsMessageData, +) -> Result<(), AnyError> { + for js_transferable in &data.transferables { + match js_transferable { + JsTransferable::MessagePort(id) => { + if *id == rid { + return Err(type_error("Can not transfer self message port")); + } + } + } + } + + let resource = state + .resource_table + .get::(rid) + .ok_or_else(bad_resource_id)?; + + resource.port.send(state, data) +} + +pub async fn op_message_port_recv_message( + state: Rc>, + rid: ResourceId, + _: (), +) -> Result, AnyError> { + let resource = { + let state = state.borrow(); + state + .resource_table + .get::(rid) + .ok_or_else(bad_resource_id)? + }; + let cancel = RcRef::map(resource.clone(), |r| &r.cancel); + resource.port.recv(state.clone()).or_cancel(cancel).await? +} diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 633750675c7c22..09a2ebb554a3ee 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -36,6 +36,7 @@ delete Object.prototype.__proto__; const formData = window.__bootstrap.formData; const fetch = window.__bootstrap.fetch; const prompt = window.__bootstrap.prompt; + const messagePort = window.__bootstrap.messagePort; const denoNs = window.__bootstrap.denoNs; const denoNsUnstable = window.__bootstrap.denoNsUnstable; const errors = window.__bootstrap.errors.errors; @@ -299,6 +300,8 @@ delete Object.prototype.__proto__; URLSearchParams: util.nonEnumerable(url.URLSearchParams), WebSocket: util.nonEnumerable(webSocket.WebSocket), BroadcastChannel: util.nonEnumerable(broadcastChannel.BroadcastChannel), + MessageChannel: util.nonEnumerable(messagePort.MessageChannel), + MessagePort: util.nonEnumerable(messagePort.MessagePort), Worker: util.nonEnumerable(worker.Worker), WritableStream: util.nonEnumerable(streams.WritableStream), WritableStreamDefaultWriter: util.nonEnumerable(