Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runtime/worker): Structured cloning worker message passing. #9323

Merged
merged 14 commits into from
May 11, 2021
2 changes: 1 addition & 1 deletion cli/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const EXEC_TIME_BENCHMARKS: &[(&str, &[&str], Option<i32>)] = &[
&[
"run",
"--allow-read",
"cli/tests/workers_large_message_bench.ts",
"cli/tests/workers/bench_large_message.ts",
],
None,
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
// Copyright 2020 the Deno authors. All rights reserved. MIT license.

// deno-lint-ignore-file

import { deferred } from "../../test_util/std/async/deferred.ts";

function oneWorker(i: any): Promise<void> {
function oneWorker(i: number) {
return new Promise<void>((resolve) => {
let countDown = 10;
const worker = new Worker(
new URL("workers/large_message_worker.js", import.meta.url).href,
new URL("worker_large_message.js", import.meta.url).href,
{ type: "module" },
);
worker.onmessage = (e): void => {
Expand All @@ -23,8 +19,8 @@ function oneWorker(i: any): Promise<void> {
});
}

function bench(): Promise<any> {
let promises = [];
function bench() {
const promises = [];
for (let i = 0; i < 50; i++) {
promises.push(oneWorker(i));
}
Expand Down
34 changes: 19 additions & 15 deletions cli/tests/workers/racy_worker.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
// See issue for details
// https://github.com/denoland/deno/issues/4080
//
// After first call to `postMessage() this worker schedules
// [close(), postMessage()] ops on the same turn of microtask queue
// (because message is rather big).
// Only single `postMessage()` call should make it
// to host, ie. after calling `close()` no more code should be run.
// After first received message, this worker schedules
// [assert(), close(), assert()] ops on the same turn of microtask queue
// All tasks after close should not make it

setTimeout(() => {
close();
}, 50);

while (true) {
await new Promise((done) => {
onmessage = async function () {
let stage = 0;
await new Promise((_) => {
setTimeout(() => {
if (stage !== 0) throw "Unexpected stage";
stage = 1;
}, 50);
setTimeout(() => {
if (stage !== 1) throw "Unexpected stage";
stage = 2;
postMessage("DONE");
close();
}, 50);
setTimeout(() => {
postMessage({ buf: new Array(999999) });
done();
});
throw "This should not be run";
}, 50);
});
}
};
5 changes: 1 addition & 4 deletions cli/tests/workers/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,12 @@ Deno.test({
);

racyWorker.onmessage = (e): void => {
assertEquals(e.data.buf.length, 999999);
racyWorker.onmessage = (_e): void => {
throw new Error("unreachable");
};
setTimeout(() => {
promise.resolve();
}, 100);
};

racyWorker.postMessage("START");
await promise;
},
});
Expand Down
2 changes: 1 addition & 1 deletion core/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ fn deserialize(
match value {
Some(deserialized) => rv.set(deserialized),
None => {
let msg = v8::String::new(scope, "string too long").unwrap();
let msg = v8::String::new(scope, "could not deserialize value").unwrap();
let exception = v8::Exception::range_error(scope, msg);
scope.throw_exception(exception);
}
Expand Down
41 changes: 7 additions & 34 deletions runtime/js/11_workers.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,8 @@
return core.opAsync("op_host_get_message", id);
}

const encoder = new TextEncoder();
const decoder = new TextDecoder();

function encodeMessage(data) {
const dataJson = JSON.stringify(data);
return encoder.encode(dataJson);
}

function decodeMessage(dataIntArray) {
// Temporary solution until structured clone arrives in v8.
// Current clone is made by parsing json to byte array and from byte array back to json.
// In that case "undefined" transforms to empty byte array, but empty byte array does not transform back to undefined.
// Thats why this special is statement is needed.
if (dataIntArray.length == 0) {
return undefined;
}
const dataJson = decoder.decode(dataIntArray);
return JSON.parse(dataJson);
}

bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
/**
* @param {string} permission
* @return {boolean}
Expand Down Expand Up @@ -211,18 +193,7 @@
this.#poll();
}

#handleMessage = (msgData) => {
let data;
try {
data = decodeMessage(new Uint8Array(msgData));
} catch (e) {
const msgErrorEvent = new MessageEvent("messageerror", {
cancelable: false,
data,
});
return;
}

#handleMessage = (data) => {
const msgEvent = new MessageEvent("message", {
cancelable: false,
data,
Expand Down Expand Up @@ -269,7 +240,7 @@
throw new Error("Unhandled error event reached main worker.");
} else {
core.opSync(
"op_host_unhandled_error",
"op_worker_unhandled_error",
inteon marked this conversation as resolved.
Show resolved Hide resolved
event.error.message,
);
}
Expand All @@ -278,7 +249,8 @@
}

if (type === "msg") {
this.#handleMessage(event.data);
const data = core.deserialize(new Uint8Array(event.data));
this.#handleMessage(data);
continue;
}

Expand All @@ -288,7 +260,7 @@
throw new Error("Unhandled error event reached main worker.");
} else {
core.opSync(
"op_host_unhandled_error",
"op_worker_unhandled_error",
inteon marked this conversation as resolved.
Show resolved Hide resolved
event.error.message,
);
}
Expand Down Expand Up @@ -317,7 +289,8 @@
return;
}

hostPostMessage(this.#id, encodeMessage(message));
const bufferMsg = core.serialize(message);
hostPostMessage(this.#id, bufferMsg);
}

terminate() {
Expand Down
98 changes: 55 additions & 43 deletions runtime/js/99_main.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,60 +75,71 @@ delete Object.prototype.__proto__;
const onerror = () => {};

function postMessage(data) {
const dataJson = JSON.stringify(data);
const dataIntArray = encoder.encode(dataJson);
const dataIntArray = core.serialize(data);
opPostMessage(dataIntArray);
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
}

let isClosing = false;
async function workerMessageRecvCallback(data) {
const msgEvent = new MessageEvent("message", {
cancelable: false,
data,
});

try {
if (globalThis["onmessage"]) {
const result = globalThis.onmessage(msgEvent);
if (result && "then" in result) {
await result;
}
}
globalThis.dispatchEvent(msgEvent);
} catch (e) {
let handled = false;

const errorEvent = new ErrorEvent("error", {
cancelable: true,
message: e.message,
lineno: e.lineNumber ? e.lineNumber + 1 : undefined,
colno: e.columnNumber ? e.columnNumber + 1 : undefined,
filename: e.fileName,
error: null,
async function pollForMessages() {
while (!isClosing) {
const bufferMsg = await opGetMessage();
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
const data = core.deserialize(new Uint8Array(bufferMsg));
inteon marked this conversation as resolved.
Show resolved Hide resolved

const msgEvent = new MessageEvent("message", {
cancelable: false,
data,
});

if (globalThis["onerror"]) {
const ret = globalThis.onerror(
e.message,
e.fileName,
e.lineNumber,
e.columnNumber,
e,
);
handled = ret === true;
}
try {
if (globalThis["onmessage"]) {
const result = globalThis.onmessage(msgEvent);
if (result && "then" in result) {
inteon marked this conversation as resolved.
Show resolved Hide resolved
await result;
}
}
globalThis.dispatchEvent(msgEvent);
} catch (e) {
let handled = false;

const errorEvent = new ErrorEvent("error", {
cancelable: true,
message: e.message,
lineno: e.lineNumber ? e.lineNumber + 1 : undefined,
colno: e.columnNumber ? e.columnNumber + 1 : undefined,
filename: e.fileName,
error: null,
});

if (globalThis["onerror"]) {
const ret = globalThis.onerror(
e.message,
e.fileName,
e.lineNumber,
e.columnNumber,
e,
);
handled = ret === true;
}

globalThis.dispatchEvent(errorEvent);
if (errorEvent.defaultPrevented) {
handled = true;
}
globalThis.dispatchEvent(errorEvent);
if (errorEvent.defaultPrevented) {
handled = true;
}

if (!handled) {
throw e;
if (!handled) {
core.opSync(
"op_worker_unhandled_error",
e.message,
);
}
}
}
}

function opGetMessage() {
return core.opAsync("op_worker_get_message");
}

function opPostMessage(data) {
core.opSync("op_worker_post_message", null, data);
}
Expand Down Expand Up @@ -371,7 +382,6 @@ delete Object.prototype.__proto__;
// TODO(bartlomieju): should be readonly?
close: util.nonEnumerable(workerClose),
postMessage: util.writable(postMessage),
workerMessageRecvCallback: util.nonEnumerable(workerMessageRecvCallback),
};

let hasBootstrapped = false;
Expand Down Expand Up @@ -482,6 +492,8 @@ delete Object.prototype.__proto__;
location.setLocationHref(locationHref);
registerErrors();

pollForMessages();

const internalSymbol = Symbol("Deno.internal");

const finalDenoNs = {
Expand Down
44 changes: 34 additions & 10 deletions runtime/ops/web_worker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.

use crate::web_worker::WebWorkerHandle;
use crate::web_worker::WebWorkerInternalHandle;
use crate::web_worker::WorkerEvent;
use deno_core::error::generic_error;
use deno_core::error::null_opbuf;
use deno_core::futures::channel::mpsc;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::Extension;
use deno_core::ZeroCopyBuf;
Expand All @@ -16,26 +17,49 @@ pub fn init() -> Extension {
op_sync(move |state, _args: (), buf: Option<ZeroCopyBuf>| {
let buf = buf.ok_or_else(null_opbuf)?;
let msg_buf: Box<[u8]> = (*buf).into();
inteon marked this conversation as resolved.
Show resolved Hide resolved
let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
sender
.try_send(WorkerEvent::Message(msg_buf))
let handle = state.borrow::<WebWorkerInternalHandle>().clone();
handle
.post_event(WorkerEvent::Message(msg_buf))
.expect("Failed to post message to host");
Ok(())
}),
),
(
"op_worker_get_message",
op_async(move |state, _: (), _: ()| async move {
let temp = {
let a = state.borrow();
a.borrow::<WebWorkerInternalHandle>().clone()
};

let maybe_data = temp.get_message().await;

Ok(maybe_data.unwrap_or_default())
}),
),
// Notify host that guest worker closes.
(
"op_worker_close",
op_sync(move |state, _: (), _: ()| {
op_sync(|state, _: (), _: ()| {
// Notify parent that we're finished
let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
sender.close_channel();
// Terminate execution of current worker
let handle = state.borrow::<WebWorkerHandle>();
let mut handle =
state.borrow_mut::<WebWorkerInternalHandle>().clone();

handle.terminate();
Ok(())
}),
),
// Notify host that guest worker has unhandled error.
(
"op_worker_unhandled_error",
op_sync(|state, message: String, _: ()| {
let sender = state.borrow::<WebWorkerInternalHandle>().clone();
sender
.post_event(WorkerEvent::Error(generic_error(message)))
.expect("Failed to propagate error event to parent worker");
Ok(true)
}),
),
])
.build()
}
Loading