Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(deno/core): Move buffer_ops to deno core & merge logic with json_ops #9457

Merged
merged 7 commits into from
Mar 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import {

const readErrorStackPattern = new RegExp(
`^.*
at handleError \\(.*10_dispatch_buffer\\.js:.*\\)
at bufferOpParseResult \\(.*10_dispatch_buffer\\.js:.*\\)
at Array.<anonymous> \\(.*10_dispatch_buffer\\.js:.*\\).*$`,
at handleError \\(.*core\\.js:.*\\)
at binOpParseResult \\(.*core\\.js:.*\\)
at asyncHandle \\(.*core\\.js:.*\\).*$`,
"ms",
);

Expand All @@ -33,7 +33,7 @@ declare global {
}
}

unitTest(function bufferOpsHeaderTooShort(): void {
unitTest(function binOpsHeaderTooShort(): void {
for (const op of ["op_read_sync", "op_read_async"]) {
const readOpId = Deno.core.ops()[op];
const res = Deno.core.send(
Expand Down
4 changes: 2 additions & 2 deletions cli/tests/unit/dispatch_json_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ unitTest(function malformedJsonControlBuffer(): void {
assertMatch(resObj.err.message, /\bexpected value\b/);
});

unitTest(function invalidPromiseId(): void {
unitTest(function invalidRequestId(): void {
const opId = Deno.core.ops()["op_open_async"];
const reqBuf = new Uint8Array([0, 0, 0, 0, 0, 0, 0]);
const resBuf = Deno.core.send(opId, reqBuf);
Expand All @@ -28,5 +28,5 @@ unitTest(function invalidPromiseId(): void {
console.error(resText);
assertStrictEquals(resObj.ok, undefined);
assertStrictEquals(resObj.err.className, "TypeError");
assertMatch(resObj.err.message, /\bpromiseId\b/);
assertMatch(resObj.err.message, /\brequestId\b/);
});
2 changes: 1 addition & 1 deletion cli/tests/unit/unit_tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import "./console_test.ts";
import "./copy_file_test.ts";
import "./custom_event_test.ts";
import "./dir_test.ts";
import "./dispatch_buffer_test.ts";
import "./dispatch_bin_test.ts";
import "./dispatch_json_test.ts";
import "./error_stack_test.ts";
import "./event_test.ts";
Expand Down
266 changes: 197 additions & 69 deletions core/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,22 @@ SharedQueue Binary Layout
const core = window.Deno.core;
const { recv, send } = core;

////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////// Dispatch /////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

const dispatch = send;
const dispatchByName = (opName, control, ...zeroCopy) =>
dispatch(opsCache[opName], control, ...zeroCopy);

////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////// Shared array buffer ///////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

let sharedBytes;
let shared32;

let asyncHandlers;

let opsCache = {};
const errorMap = {};

function init() {
const shared = core.shared;
Expand All @@ -45,6 +54,7 @@ SharedQueue Binary Layout
assert(shared32 == null);
sharedBytes = new Uint8Array(shared);
shared32 = new Int32Array(shared);

asyncHandlers = [];
// Callers should not call core.recv, use setAsyncHandler.
recv(handleAsyncMsgFromRust);
Expand Down Expand Up @@ -150,124 +160,240 @@ SharedQueue Binary Layout
return [opId, buf];
}

////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////// Error handling //////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

const errorMap = {};

function registerErrorClass(errorName, className, args) {
if (typeof errorMap[errorName] !== "undefined") {
throw new TypeError(`Error class for "${errorName}" already registered`);
}
errorMap[errorName] = [className, args ?? []];
}

function handleError(className, message) {
if (typeof errorMap[className] === "undefined") {
return new Error(
`Unregistered error class: "${className}"\n` +
` ${message}\n` +
` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
);
}

const [ErrorClass, args] = errorMap[className];
return new ErrorClass(message, ...args);
}

////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////// Async handling //////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

let asyncHandlers = [];

function setAsyncHandler(opId, cb) {
assert(opId != null);
asyncHandlers[opId] = cb;
}

function setAsyncHandlerByName(opName, cb) {
setAsyncHandler(opsCache[opName], cb);
}

function handleAsyncMsgFromRust() {
while (true) {
const opIdBuf = shift();
if (opIdBuf == null) {
break;
}
assert(asyncHandlers[opIdBuf[0]] != null);
asyncHandlers[opIdBuf[0]](opIdBuf[1]);
asyncHandlers[opIdBuf[0]](opIdBuf[1], true);
}

for (let i = 0; i < arguments.length; i += 2) {
asyncHandlers[arguments[i]](arguments[i + 1]);
asyncHandlers[arguments[i]](arguments[i + 1], false);
}
}

function dispatch(opName, control, ...zeroCopy) {
return send(opsCache[opName], control, ...zeroCopy);
}
////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////// General sync & async ops handling ////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

function registerErrorClass(errorName, className, args) {
if (typeof errorMap[errorName] !== "undefined") {
throw new TypeError(`Error class for "${errorName}" already registered`);
let nextRequestId = 1;
const promiseTable = {};

function asyncHandle(u8Array, isCopyNeeded, opResultParser) {
const [requestId, result, error] = opResultParser(u8Array, isCopyNeeded);
if (error !== null) {
promiseTable[requestId][1](error);
} else {
promiseTable[requestId][0](result);
}
errorMap[errorName] = [className, args ?? []];
delete promiseTable[requestId];
}

function getErrorClassAndArgs(errorName) {
return errorMap[errorName] ?? [undefined, []];
function opAsync(opName, opRequestBuilder, opResultParser) {
const opId = opsCache[opName];
// Make sure requests of this type are handled by the asyncHandler
// The asyncHandler's role is to call the "promiseTable[requestId]" function
if (typeof asyncHandlers[opId] === "undefined") {
asyncHandlers[opId] = (buffer, isCopyNeeded) =>
asyncHandle(buffer, isCopyNeeded, opResultParser);
}

const requestId = nextRequestId++;

// Create and store promise
const promise = new Promise((resolve, reject) => {
promiseTable[requestId] = [resolve, reject];
});

// Synchronously dispatch async request
core.dispatch(opId, ...opRequestBuilder(requestId));

// Wait for async response
return promise;
}

// Returns Uint8Array
function encodeJson(args) {
const s = JSON.stringify(args);
return core.encode(s);
function opSync(opName, opRequestBuilder, opResultParser) {
const opId = opsCache[opName];
const u8Array = core.dispatch(opId, ...opRequestBuilder());

const [_, result, error] = opResultParser(u8Array, false);
if (error !== null) throw error;
return result;
}

function decodeJson(ui8) {
const s = core.decode(ui8);
return JSON.parse(s);
////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////// Bin ops handling /////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

const binRequestHeaderByteLength = 8 + 4;
const scratchBuffer = new ArrayBuffer(binRequestHeaderByteLength);
const scratchView = new DataView(scratchBuffer);

function binOpBuildRequest(requestId, argument, zeroCopy) {
scratchView.setBigUint64(0, BigInt(requestId), true);
scratchView.setUint32(8, argument, true);
return [scratchView, ...zeroCopy];
}

let nextPromiseId = 1;
const promiseTable = {};
function binOpParseResult(u8Array, isCopyNeeded) {
// Decode header value from u8Array
const headerByteLength = 8 + 2 * 4;
assert(u8Array.byteLength >= headerByteLength);
assert(u8Array.byteLength % 4 == 0);
const view = new DataView(
u8Array.buffer,
u8Array.byteOffset + u8Array.byteLength - headerByteLength,
headerByteLength,
);

const requestId = Number(view.getBigUint64(0, true));
const status = view.getUint32(8, true);
const result = view.getUint32(12, true);

// Error handling
if (status !== 0) {
const className = core.decode(u8Array.subarray(0, result));
const message = core.decode(u8Array.subarray(result, -headerByteLength))
.trim();

return [requestId, null, handleError(className, message)];
}

function processResponse(res) {
if ("ok" in res) {
return res.ok;
if (u8Array.byteLength === headerByteLength) {
return [requestId, result, null];
}
const [ErrorClass, args] = getErrorClassAndArgs(res.err.className);
if (!ErrorClass) {
throw new Error(
`Unregistered error class: "${res.err.className}"\n ${res.err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
);

// Rest of response buffer is passed as reference or as a copy
let respBuffer = null;
if (isCopyNeeded) {
// Copy part of the response array (if sent through shared array buf)
respBuffer = u8Array.slice(0, result);
} else {
// Create view on existing array (if sent through overflow)
respBuffer = u8Array.subarray(0, result);
}
throw new ErrorClass(res.err.message, ...args);

return [requestId, respBuffer, null];
}

async function jsonOpAsync(opName, args = null, ...zeroCopy) {
setAsyncHandler(opsCache[opName], jsonOpAsyncHandler);

const promiseId = nextPromiseId++;
const reqBuf = core.encode("\0".repeat(8) + JSON.stringify(args));
new DataView(reqBuf.buffer).setBigUint64(0, BigInt(promiseId));
dispatch(opName, reqBuf, ...zeroCopy);
let resolve, reject;
const promise = new Promise((resolve_, reject_) => {
resolve = resolve_;
reject = reject_;
});
promise.resolve = resolve;
promise.reject = reject;
promiseTable[promiseId] = promise;
return processResponse(await promise);
function binOpAsync(opName, argument = 0, ...zeroCopy) {
return opAsync(
opName,
(requestId) => binOpBuildRequest(requestId, argument, zeroCopy),
binOpParseResult,
);
}

function binOpSync(opName, argument = 0, ...zeroCopy) {
return opSync(
opName,
() => binOpBuildRequest(0, argument, zeroCopy),
binOpParseResult,
);
}

function jsonOpSync(opName, args = null, ...zeroCopy) {
const argsBuf = encodeJson(args);
const res = dispatch(opName, argsBuf, ...zeroCopy);
return processResponse(decodeJson(res));
////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////// Json ops handling ////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

const jsonRequestHeaderLength = 8;

function jsonOpBuildRequest(requestId, argument, zeroCopy) {
const u8Array = core.encode(
"\0".repeat(jsonRequestHeaderLength) + JSON.stringify(argument),
);
new DataView(u8Array.buffer).setBigUint64(0, BigInt(requestId), true);
return [u8Array, ...zeroCopy];
}

function jsonOpAsyncHandler(buf) {
// Json Op.
const res = decodeJson(buf);
const promise = promiseTable[res.promiseId];
delete promiseTable[res.promiseId];
promise.resolve(res);
function jsonOpParseResult(u8Array, _) {
const data = JSON.parse(core.decode(u8Array));

if ("err" in data) {
return [
data.requestId,
null,
handleError(data.err.className, data.err.message),
];
}

return [data.requestId, data.ok, null];
}

function jsonOpAsync(opName, argument = null, ...zeroCopy) {
return opAsync(
opName,
(requestId) => jsonOpBuildRequest(requestId, argument, zeroCopy),
jsonOpParseResult,
);
}

function jsonOpSync(opName, argument = null, ...zeroCopy) {
return opSync(
opName,
() => [core.encode(JSON.stringify(argument)), ...zeroCopy],
jsonOpParseResult,
);
}

function resources() {
return jsonOpSync("op_resources");
}

function close(rid) {
jsonOpSync("op_close", { rid });
return jsonOpSync("op_close", { rid });
}

Object.assign(window.Deno.core, {
jsonOpAsync,
jsonOpSync,
setAsyncHandler,
setAsyncHandlerByName,
dispatch: send,
dispatchByName: dispatch,
binOpAsync,
binOpSync,
dispatch,
dispatchByName,
ops,
close,
resources,
registerErrorClass,
getErrorClassAndArgs,
sharedQueueInit: init,
// sharedQueue is private but exposed for testing.
sharedQueue: {
Expand All @@ -279,5 +405,7 @@ SharedQueue Binary Layout
reset,
shift,
},
// setAsyncHandler is private but exposed for testing.
setAsyncHandler,
});
})(this);
Loading