Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runtime/worker): Structured cloning worker message passing. #9323

Merged
merged 14 commits into from
May 11, 2021
2 changes: 1 addition & 1 deletion cli/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const EXEC_TIME_BENCHMARKS: &[(&str, &[&str], Option<i32>)] = &[
&[
"run",
"--allow-read",
"cli/tests/workers_large_message_bench.ts",
"cli/tests/workers/bench_large_message.ts",
],
None,
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
// Copyright 2020 the Deno authors. All rights reserved. MIT license.

// deno-lint-ignore-file

import { deferred } from "../../test_util/std/async/deferred.ts";

function oneWorker(i: any): Promise<void> {
function oneWorker(i: number) {
return new Promise<void>((resolve) => {
let countDown = 10;
const worker = new Worker(
new URL("workers/large_message_worker.js", import.meta.url).href,
new URL("worker_large_message.js", import.meta.url).href,
{ type: "module" },
);
worker.onmessage = (e): void => {
Expand All @@ -23,8 +19,8 @@ function oneWorker(i: any): Promise<void> {
});
}

function bench(): Promise<any> {
let promises = [];
function bench() {
const promises = [];
for (let i = 0; i < 50; i++) {
promises.push(oneWorker(i));
}
Expand Down
34 changes: 19 additions & 15 deletions cli/tests/workers/racy_worker.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
// See issue for details
// https://github.com/denoland/deno/issues/4080
//
// After first call to `postMessage() this worker schedules
// [close(), postMessage()] ops on the same turn of microtask queue
// (because message is rather big).
// Only single `postMessage()` call should make it
// to host, ie. after calling `close()` no more code should be run.
// After first received message, this worker schedules
// [assert(), close(), assert()] ops on the same turn of microtask queue
// All tasks after close should not make it

setTimeout(() => {
close();
}, 50);

while (true) {
await new Promise((done) => {
onmessage = async function () {
let stage = 0;
await new Promise((_) => {
setTimeout(() => {
if (stage !== 0) throw "Unexpected stage";
stage = 1;
}, 50);
setTimeout(() => {
if (stage !== 1) throw "Unexpected stage";
stage = 2;
postMessage("DONE");
close();
}, 50);
setTimeout(() => {
postMessage({ buf: new Array(999999) });
done();
});
throw "This should not be run";
}, 50);
});
}
};
5 changes: 1 addition & 4 deletions cli/tests/workers/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,12 @@ Deno.test({
);

racyWorker.onmessage = (e): void => {
assertEquals(e.data.buf.length, 999999);
racyWorker.onmessage = (_e): void => {
throw new Error("unreachable");
};
setTimeout(() => {
promise.resolve();
}, 100);
};

racyWorker.postMessage("START");
await promise;
},
});
Expand Down
2 changes: 1 addition & 1 deletion core/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ fn deserialize(
match value {
Some(deserialized) => rv.set(deserialized),
None => {
let msg = v8::String::new(scope, "string too long").unwrap();
let msg = v8::String::new(scope, "could not deserialize value").unwrap();
let exception = v8::Exception::range_error(scope, msg);
scope.throw_exception(exception);
}
Expand Down
102 changes: 31 additions & 71 deletions runtime/js/11_workers.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,8 @@
return core.opAsync("op_host_get_message", id);
}

const encoder = new TextEncoder();
const decoder = new TextDecoder();

function encodeMessage(data) {
const dataJson = JSON.stringify(data);
return encoder.encode(dataJson);
}

function decodeMessage(dataIntArray) {
// Temporary solution until structured clone arrives in v8.
// Current clone is made by parsing json to byte array and from byte array back to json.
// In that case "undefined" transforms to empty byte array, but empty byte array does not transform back to undefined.
// Thats why this special is statement is needed.
if (dataIntArray.length == 0) {
return undefined;
}
const dataJson = decoder.decode(dataIntArray);
return JSON.parse(dataJson);
}

bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
/**
* @param {string} permission
* @return {boolean}
Expand Down Expand Up @@ -211,18 +193,7 @@
this.#poll();
}

#handleMessage = (msgData) => {
let data;
try {
data = decodeMessage(new Uint8Array(msgData));
} catch (e) {
const msgErrorEvent = new MessageEvent("messageerror", {
cancelable: false,
data,
});
return;
}

#handleMessage = (data) => {
const msgEvent = new MessageEvent("message", {
cancelable: false,
data,
Expand Down Expand Up @@ -253,56 +224,44 @@

#poll = async () => {
while (!this.#terminated) {
const event = await hostGetMessage(this.#id);
const [type, data] = await hostGetMessage(this.#id);

// If terminate was called then we ignore all messages
if (this.#terminated) {
return;
}

const type = event.type;

if (type === "terminalError") {
this.#terminated = true;
if (!this.#handleError(event.error)) {
if (globalThis instanceof Window) {
throw new Error("Unhandled error event reached main worker.");
} else {
core.opSync(
"op_host_unhandled_error",
event.error.message,
);
}
switch (type) {
case 0: { // Message
const msg = core.deserialize(data);
this.#handleMessage(msg);
break;
}
continue;
}

if (type === "msg") {
this.#handleMessage(event.data);
continue;
}

if (type === "error") {
if (!this.#handleError(event.error)) {
if (globalThis instanceof Window) {
throw new Error("Unhandled error event reached main worker.");
} else {
core.opSync(
"op_host_unhandled_error",
event.error.message,
);
case 1: { // TerminalError
this.#terminated = true;
} /* falls through */
case 2: { // Error
if (!this.#handleError(data)) {
if (globalThis instanceof Window) {
throw new Error("Unhandled error event reached main worker.");
} else {
core.opSync(
"op_worker_unhandled_error",
data.message,
);
}
}
break;
}
case 3: { // Close
log(`Host got "close" message from worker: ${this.#name}`);
this.#terminated = true;
return;
}
default: {
throw new Error(`Unknown worker event: "${type}"`);
}
continue;
}

if (type === "close") {
log(`Host got "close" message from worker: ${this.#name}`);
this.#terminated = true;
return;
}

throw new Error(`Unknown worker event: "${type}"`);
}
};

Expand All @@ -317,7 +276,8 @@
return;
}

hostPostMessage(this.#id, encodeMessage(message));
const bufferMsg = core.serialize(message);
hostPostMessage(this.#id, bufferMsg);
}

terminate() {
Expand Down
106 changes: 53 additions & 53 deletions runtime/js/99_main.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ delete Object.prototype.__proto__;
}

isClosing = true;
opCloseWorker();
core.opSync("op_worker_close");
}

// TODO(bartlomieju): remove these functions
Expand All @@ -75,68 +75,67 @@ delete Object.prototype.__proto__;
const onerror = () => {};

function postMessage(data) {
const dataJson = JSON.stringify(data);
const dataIntArray = encoder.encode(dataJson);
opPostMessage(dataIntArray);
const dataIntArray = core.serialize(data);
core.opSync("op_worker_post_message", null, dataIntArray);
}

let isClosing = false;
async function workerMessageRecvCallback(data) {
const msgEvent = new MessageEvent("message", {
cancelable: false,
data,
});

try {
if (globalThis["onmessage"]) {
const result = globalThis.onmessage(msgEvent);
if (result && "then" in result) {
await result;
}
}
globalThis.dispatchEvent(msgEvent);
} catch (e) {
let handled = false;

const errorEvent = new ErrorEvent("error", {
cancelable: true,
message: e.message,
lineno: e.lineNumber ? e.lineNumber + 1 : undefined,
colno: e.columnNumber ? e.columnNumber + 1 : undefined,
filename: e.fileName,
error: null,
async function pollForMessages() {
while (!isClosing) {
const bufferMsg = await core.opAsync("op_worker_get_message");
const data = core.deserialize(bufferMsg);

const msgEvent = new MessageEvent("message", {
cancelable: false,
data,
});

if (globalThis["onerror"]) {
const ret = globalThis.onerror(
e.message,
e.fileName,
e.lineNumber,
e.columnNumber,
e,
);
handled = ret === true;
}
try {
if (globalThis["onmessage"]) {
const result = globalThis.onmessage(msgEvent);
if (result && "then" in result) {
inteon marked this conversation as resolved.
Show resolved Hide resolved
await result;
}
}
globalThis.dispatchEvent(msgEvent);
} catch (e) {
let handled = false;

const errorEvent = new ErrorEvent("error", {
cancelable: true,
message: e.message,
lineno: e.lineNumber ? e.lineNumber + 1 : undefined,
colno: e.columnNumber ? e.columnNumber + 1 : undefined,
filename: e.fileName,
error: null,
});

if (globalThis["onerror"]) {
const ret = globalThis.onerror(
e.message,
e.fileName,
e.lineNumber,
e.columnNumber,
e,
);
handled = ret === true;
}

globalThis.dispatchEvent(errorEvent);
if (errorEvent.defaultPrevented) {
handled = true;
}
globalThis.dispatchEvent(errorEvent);
if (errorEvent.defaultPrevented) {
handled = true;
}

if (!handled) {
throw e;
if (!handled) {
core.opSync(
"op_worker_unhandled_error",
e.message,
);
}
}
}
}

function opPostMessage(data) {
core.opSync("op_worker_post_message", null, data);
}

function opCloseWorker() {
core.opSync("op_worker_close");
}

function opMainModule() {
return core.opSync("op_main_module");
}
Expand Down Expand Up @@ -371,7 +370,6 @@ delete Object.prototype.__proto__;
// TODO(bartlomieju): should be readonly?
close: util.nonEnumerable(workerClose),
postMessage: util.writable(postMessage),
workerMessageRecvCallback: util.nonEnumerable(workerMessageRecvCallback),
};

let hasBootstrapped = false;
Expand Down Expand Up @@ -482,6 +480,8 @@ delete Object.prototype.__proto__;
location.setLocationHref(locationHref);
registerErrors();

pollForMessages();

const internalSymbol = Symbol("Deno.internal");

const finalDenoNs = {
Expand Down
Loading