Skip to content

Commit

Permalink
Merge pull request #16 from joshLong145/ref/add-timeouts
Browse files Browse the repository at this point in the history
Ref/add timeouts
  • Loading branch information
joshLong145 authored Mar 21, 2024
2 parents 1951a75 + 2d83b44 commit 7649643
Show file tree
Hide file tree
Showing 10 changed files with 499 additions and 98 deletions.
125 changes: 125 additions & 0 deletions benchmark/wasm_instance_start_bench.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//@ts-nocheck

import { InstanceWrapper, WorkerDefinition } from "../src/mod.ts";
import { existsSync } from "https://deno.land/std/fs/mod.ts";
import * as path from "https://deno.land/std@0.188.0/path/mod.ts";

class TestExample extends WorkerDefinition {
public constructor(modulePath: string) {
Expand Down Expand Up @@ -80,3 +82,126 @@ Deno.bench("Wasm Worker Start Rust Module loading", async (_b) => {
example.terminateWorker();
});
});

Deno.bench("Wasm Worker Start Code Gen Bootstrapping Rust", async (_b) => {
if (!existsSync("./public")) {
Deno.mkdirSync("./public");
}

if (!existsSync("./public/bench")) {
Deno.mkdirSync("./public/bench");
}

const example: WorkerDefinition = new TestExample();

const wrapper: InstanceWrapper<TestExample> = new InstanceWrapper<
Example
>(
example,
{
outputPath: "./public/bench",
namespace: "asd",
addons: [
"./lib/wasm_test.js",
],
modulePath: "./examples/wasm/rust/wasm_test_bg.wasm",
addonLoader: (path: string) => {
return Deno.readTextFileSync(path);
},
moduleLoader: (path: string) => {
const fd = Deno.openSync(path);
const mod = Deno.readAllSync(fd);
fd.close();
return mod;
},
},
);
wrapper.create({
writeFileSync: Deno.writeFileSync,
});
const __dirname = path.dirname(path.fromFileUrl(import.meta.url));
await import(__dirname + "/../public/bench/bridge.js");
self["worker"].terminate();
});

Deno.bench("Wasm Worker Start Code Gen Bootstrapping Tiny Go", async (_b) => {
if (!existsSync("./public")) {
Deno.mkdirSync("./public");
}

if (!existsSync("./public/bench")) {
Deno.mkdirSync("./public/bench");
}

const example: WorkerDefinition = new TestExample();

const wrapper: InstanceWrapper<TestExample> = new InstanceWrapper<
Example
>(
example,
{
outputPath: "./public/bench",
namespace: "asd",
addons: [
"./lib/wasm_exec_tiny.js",
],
modulePath: "./examples/wasm/tiny-go/primes-2.wasm",
addonLoader: (path: string) => {
return Deno.readTextFileSync(path);
},
moduleLoader: (path: string) => {
const fd = Deno.openSync(path);
const mod = Deno.readAllSync(fd);
fd.close();
return mod;
},
},
);
wrapper.create({
writeFileSync: Deno.writeFileSync,
});
const __dirname = path.dirname(path.fromFileUrl(import.meta.url));
await import(__dirname + "/../public/bench/bridge.js");
self["worker"].terminate();
});

Deno.bench("Wasm Worker Start Code Gen Bootstrapping Go", async (_b) => {
if (!existsSync("./public")) {
Deno.mkdirSync("./public");
}

if (!existsSync("./public/bench")) {
Deno.mkdirSync("./public/bench");
}

const example: WorkerDefinition = new TestExample();

const wrapper: InstanceWrapper<TestExample> = new InstanceWrapper<
Example
>(
example,
{
outputPath: "./public/bench",
namespace: "asd",
addons: [
"./lib/wasm_exec.js",
],
modulePath: "./examples/wasm/tiny-go/primes-2.wasm",
addonLoader: (path: string) => {
return Deno.readTextFileSync(path);
},
moduleLoader: (path: string) => {
const fd = Deno.openSync(path);
const mod = Deno.readAllSync(fd);
fd.close();
return mod;
},
},
);
wrapper.create({
writeFileSync: Deno.writeFileSync,
});
const __dirname = path.dirname(path.fromFileUrl(import.meta.url));
await import(__dirname + "/../public/bench/bridge.js");
self["worker"].terminate();
});
40 changes: 33 additions & 7 deletions examples/js/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,22 @@ class Example extends WorkerDefinition {

return buffer;
};

undefinedExecution = (
buffer: SharedArrayBuffer,
_args: Record<string, any>,
): Promise<SharedArrayBuffer> => {
let id = 0;
return new Promise<SharedArrayBuffer>((res, _rej) => {
id = setTimeout(() => {
res(buffer);
}, 10_000);
}).finally(() => {
console.log("running cleanup", id);
clearTimeout(id);
return buffer;
});
};
}

const example: Example = new Example();
Expand All @@ -81,12 +97,6 @@ await example.execute("addOne", { name: "foo" }).then(
console.log("add one result: ", new Int32Array(buf));
},
);
await example.execute("addOne", { name: "foo" }).then(
(buf: SharedArrayBuffer) => {
console.log("add one result ", new Int32Array(buf)[0]);
},
);

await example.execute("getKeyPair");

await example.execute("fib", { count: 46 }).then(
Expand All @@ -96,6 +106,22 @@ await example.execute("fib", { count: 46 }).then(
},
);
await example.execute("getGpuAdapter");
await example.execute("getGpuAdapter");

const workerPrms = example.execute("undefinedExecution");

// handle a rejection of the promise due to a timeout
workerPrms.catch((e) => {
console.error("We can still handle the error with a catch statement ", e);
});

// timeout the above execution call in 1 second
workerPrms.timeout(1_000);

// you can also use await with a try catch to manage the timeout
try {
await workerPrms;
} catch (e) {
console.error("handling the await with a try catch ", e);
}

example.terminateWorker();
7 changes: 5 additions & 2 deletions examples/wasm/rust/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ class Example extends WorkerDefinition {

test2 = (buffer: SharedArrayBuffer, _args: Record<string, any>) => {
let arr = new Int8Array(buffer);
arr[0] += 1;
//@ts-ignore
self.greet();
let val = self.getValue();
arr[0] = val;

//@ts-ignore
self.create_doc();
return arr.buffer;
};
}
Expand Down
52 changes: 43 additions & 9 deletions src/InstanceWrapper.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { WorkerPromise } from "./types.ts";
import {
DiskIOProvider,
InstanceConfiguration,
Expand Down Expand Up @@ -35,9 +36,15 @@ import { WorkerMethod, WorkerWrapper } from "./WorkerWrapper.ts";
export class WorkerDefinition {
public execMap: Record<
string,
(args: Record<string, any>) => Promise<SharedArrayBuffer>
(args: Record<string, any>) => WorkerPromise
> = {};

/** */
public _executionMap: Record<string, any> = {};

/** */
public bufferMap: Record<string, SharedArrayBuffer> = {};

/**
* worker instance, can be stopped by calling terminateWorker
*/
Expand All @@ -46,6 +53,20 @@ export class WorkerDefinition {

constructor() {}

/**
* @returns
*/
public uuidv4() {
//@ts-ignore
return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(
/[018]/g,
(c: number) =>
(crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(
16,
),
);
}

/**
* Run implemented methods from within the worker instance
*
Expand All @@ -55,12 +76,21 @@ export class WorkerDefinition {
* @returns {SharedArrayBuffer}
*/
public execute(
name: keyof this,
name: Exclude<keyof this, keyof WorkerDefinition>,
args: Record<string, any> = {},
): Promise<SharedArrayBuffer> {
): WorkerPromise {
return this.execMap[name as unknown as string](args);
}

/**
* Return the promise controlling a specific method within the worker
*/
public get(
name: keyof this,
): void | Promise<SharedArrayBuffer> {
return this.execMap[name as unknown as string] as any;
}

/**
* Calls terminate on the worker instace
*/
Expand Down Expand Up @@ -108,7 +138,7 @@ export class InstanceWrapper<T extends WorkerDefinition> {
private _config: InstanceConfiguration;
private _instance: WorkerInstance<T>;
private _wm: WorkerManager | undefined;
private _wb: WorkerBridge<T> | undefined;
private _wb: WorkerBridge | undefined;
private _workerString = "";

constructor(instance: WorkerInstance<T>, config: InstanceConfiguration) {
Expand All @@ -122,10 +152,13 @@ export class InstanceWrapper<T extends WorkerDefinition> {
* Current only supports the `onmessage` handler. but object may be accesed as the `worker` property
*/
public async start(): Promise<void> {
this?._wb?.bufferMap(this._instance);
const ww = this?._wb?.workerWrappers(this._instance) ?? [];
this?._wb?.bufferMap(this._instance as WorkerDefinition);
const ww = this?._wb?.workerWrappers(this._instance as WorkerDefinition);
if (!ww) {
throw new Error("unable to process worker definition");
}
for (const w of ww) {
(this._instance as WorkerDefinition).execMap[(w as any)._name] = w;
(this._instance as WorkerDefinition).execMap[w._name] = w;
}

this._workerString = this._workerString + "\n" +
Expand Down Expand Up @@ -190,9 +223,10 @@ export class InstanceWrapper<T extends WorkerDefinition> {
Object.getPrototypeOf(this._instance),
) as [keyof T];
const baseKeys = Object.keys(this._instance) as [keyof T];
console.log("base keys", baseKeys, "instance prototype keys", protoKeys);
const allKeys = baseKeys.concat(protoKeys).filter((key) => {
return key !== "constructor" && key !== "execMap" && key !== "worker" &&
return key !== "constructor" && key !== "execMap" &&
key !== "bufferMap" &&
key !== "_executionMap" && key !== "worker" &&
key !== "ModulePath" && key !== "workerString";
});
const wrps: WorkerWrapper[] = [];
Expand Down
48 changes: 48 additions & 0 deletions src/PromiseExtension.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { WorkerWrapper } from "./WorkerWrapper.ts";
import { WorkerDefinition } from "./mod.ts";
import { WorkerPromise, WorkerPromiseGeneratorNamed } from "./types.ts";

export function buildPromiseExtension(
id: string,
wrapper: WorkerWrapper,
generator: WorkerPromiseGeneratorNamed,
self: WorkerDefinition,
): WorkerPromise {
let promiseResolve, promiseReject;
//@ts-ignore building object
const prms: WorkerPromise = new Promise<SharedArrayBuffer>(
(resolve, reject) => {
promiseResolve = resolve;
promiseReject = reject;
},
).finally(() => {
for (let i = 0; i < prms.timerIds.length; i++) {
clearTimeout(prms.timerIds[i]);
}
prms.settledCount += 1;
});

prms.resolve = promiseResolve as any;
prms.reject = promiseReject as any;
prms.timerIds = [];
prms.settledCount = 0;
prms.name = name;
prms.wrapper = generator;
prms.timeout = (delay: number) => {
const timerId = setTimeout(() => {
self.worker.postMessage({
name: `${wrapper.WorkerName}`,
id: id,
action: "TERM",
});

prms.reject(
new Error("Timeout has occured, aborting worker execution"),
);
}, delay);

prms.timerIds.push(timerId);
};

return prms;
}
Loading

0 comments on commit 7649643

Please sign in to comment.