Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runtime/worker): Structured cloning worker message passing. #9323

Merged
merged 14 commits into from
May 11, 2021
2 changes: 1 addition & 1 deletion cli/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const EXEC_TIME_BENCHMARKS: &[(&str, &[&str], Option<i32>)] = &[
&[
"run",
"--allow-read",
"cli/tests/workers_large_message_bench.ts",
"cli/tests/workers/bench_large_message.ts",
],
None,
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
// Copyright 2020 the Deno authors. All rights reserved. MIT license.

// deno-lint-ignore-file

import { deferred } from "../../test_util/std/async/deferred.ts";

function oneWorker(i: any): Promise<void> {
function oneWorker(i: number) {
return new Promise<void>((resolve) => {
let countDown = 10;
const worker = new Worker(
new URL("workers/large_message_worker.js", import.meta.url).href,
new URL("worker_large_message.js", import.meta.url).href,
{ type: "module" },
);
worker.onmessage = (e): void => {
Expand All @@ -23,8 +19,8 @@ function oneWorker(i: any): Promise<void> {
});
}

function bench(): Promise<any> {
let promises = [];
function bench() {
const promises = [];
for (let i = 0; i < 50; i++) {
promises.push(oneWorker(i));
}
Expand Down
34 changes: 19 additions & 15 deletions cli/tests/workers/racy_worker.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
// See issue for details
// https://github.com/denoland/deno/issues/4080
//
// After first call to `postMessage() this worker schedules
// [close(), postMessage()] ops on the same turn of microtask queue
// (because message is rather big).
// Only single `postMessage()` call should make it
// to host, ie. after calling `close()` no more code should be run.
// After first received message, this worker schedules
// [assert(), close(), assert()] ops on the same turn of microtask queue
// All tasks after close should not make it

setTimeout(() => {
close();
}, 50);

while (true) {
await new Promise((done) => {
onmessage = async function () {
let stage = 0;
await new Promise((_) => {
setTimeout(() => {
if (stage !== 0) throw "Unexpected stage";
stage = 1;
}, 50);
setTimeout(() => {
if (stage !== 1) throw "Unexpected stage";
stage = 2;
postMessage("DONE");
close();
}, 50);
setTimeout(() => {
postMessage({ buf: new Array(999999) });
done();
});
throw "This should not be run";
}, 50);
});
}
};
40 changes: 36 additions & 4 deletions cli/tests/workers/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,12 @@ Deno.test({
);

racyWorker.onmessage = (e): void => {
assertEquals(e.data.buf.length, 999999);
racyWorker.onmessage = (_e): void => {
throw new Error("unreachable");
};
setTimeout(() => {
promise.resolve();
}, 100);
};

racyWorker.postMessage("START");
await promise;
},
});
Expand Down Expand Up @@ -726,3 +723,38 @@ Deno.test({
worker.terminate();
},
});

Deno.test({
name: "structured cloning postMessage",
fn: async function (): Promise<void> {
const result = deferred();
const worker = new Worker(
new URL("worker_structured_cloning.ts", import.meta.url).href,
{ type: "module" },
);

worker.onmessage = (e): void => {
// self field should reference itself (circular ref)
const value = e.data.self.self.self;

// fields a and b refer to the same array
assertEquals(value.a, ["a", true, 432]);
assertEquals(value.a, ["a", true, 432]);
value.b[0] = "b";
value.a[2] += 5;
assertEquals(value.a, ["b", true, 437]);
assertEquals(value.b, ["b", true, 437]);

const len = value.c.size;
value.c.add(1); // This value is already in the set.
value.c.add(2);
assertEquals(len + 1, value.c.size);

result.resolve();
};

worker.postMessage("START");
await result;
worker.terminate();
},
});
15 changes: 15 additions & 0 deletions cli/tests/workers/worker_structured_cloning.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// More info on structured cloning can be found here:
// https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm

self.onmessage = () => {
const arr = ["a", true, 432];
const set = new Set([1, 3, 5, 7, 9]);
const selfReference = {
a: arr,
b: arr,
c: set,
};
// deno-lint-ignore no-explicit-any
(selfReference as any).self = selfReference;
self.postMessage(selfReference);
};
2 changes: 1 addition & 1 deletion core/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ fn deserialize(
match value {
Some(deserialized) => rv.set(deserialized),
None => {
let msg = v8::String::new(scope, "string too long").unwrap();
let msg = v8::String::new(scope, "could not deserialize value").unwrap();
let exception = v8::Exception::range_error(scope, msg);
scope.throw_exception(exception);
}
Expand Down
102 changes: 31 additions & 71 deletions runtime/js/11_workers.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,8 @@
return core.opAsync("op_host_get_message", id);
}

const encoder = new TextEncoder();
const decoder = new TextDecoder();

function encodeMessage(data) {
const dataJson = JSON.stringify(data);
return encoder.encode(dataJson);
}

function decodeMessage(dataIntArray) {
// Temporary solution until structured clone arrives in v8.
// Current clone is made by parsing json to byte array and from byte array back to json.
// In that case "undefined" transforms to empty byte array, but empty byte array does not transform back to undefined.
// Thats why this special is statement is needed.
if (dataIntArray.length == 0) {
return undefined;
}
const dataJson = decoder.decode(dataIntArray);
return JSON.parse(dataJson);
}

bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
/**
* @param {string} permission
* @return {boolean}
Expand Down Expand Up @@ -211,18 +193,7 @@
this.#poll();
}

#handleMessage = (msgData) => {
let data;
try {
data = decodeMessage(new Uint8Array(msgData));
} catch (e) {
const msgErrorEvent = new MessageEvent("messageerror", {
cancelable: false,
data,
});
return;
}

#handleMessage = (data) => {
const msgEvent = new MessageEvent("message", {
cancelable: false,
data,
Expand Down Expand Up @@ -253,56 +224,44 @@

#poll = async () => {
while (!this.#terminated) {
const event = await hostGetMessage(this.#id);
const [type, data] = await hostGetMessage(this.#id);

// If terminate was called then we ignore all messages
if (this.#terminated) {
return;
}

const type = event.type;

if (type === "terminalError") {
this.#terminated = true;
if (!this.#handleError(event.error)) {
if (globalThis instanceof Window) {
throw new Error("Unhandled error event reached main worker.");
} else {
core.opSync(
"op_host_unhandled_error",
event.error.message,
);
}
switch (type) {
case 0: { // Message
const msg = core.deserialize(data);
this.#handleMessage(msg);
break;
}
continue;
}

if (type === "msg") {
this.#handleMessage(event.data);
continue;
}

if (type === "error") {
if (!this.#handleError(event.error)) {
if (globalThis instanceof Window) {
throw new Error("Unhandled error event reached main worker.");
} else {
core.opSync(
"op_host_unhandled_error",
event.error.message,
);
case 1: { // TerminalError
this.#terminated = true;
} /* falls through */
case 2: { // Error
if (!this.#handleError(data)) {
if (globalThis instanceof Window) {
throw new Error("Unhandled error event reached main worker.");
} else {
core.opSync(
"op_worker_unhandled_error",
data.message,
);
}
}
break;
}
case 3: { // Close
log(`Host got "close" message from worker: ${this.#name}`);
this.#terminated = true;
return;
}
default: {
throw new Error(`Unknown worker event: "${type}"`);
}
continue;
}

if (type === "close") {
log(`Host got "close" message from worker: ${this.#name}`);
this.#terminated = true;
return;
}

throw new Error(`Unknown worker event: "${type}"`);
}
};

Expand All @@ -317,7 +276,8 @@
return;
}

hostPostMessage(this.#id, encodeMessage(message));
const bufferMsg = core.serialize(message);
hostPostMessage(this.#id, bufferMsg);
}

terminate() {
Expand Down
Loading