diff --git a/cli/tests/unit/dispatch_buffer_test.ts b/cli/tests/unit/dispatch_buffer_test.ts new file mode 100644 index 00000000000000..0e213fe3b66fbf --- /dev/null +++ b/cli/tests/unit/dispatch_buffer_test.ts @@ -0,0 +1,76 @@ +import { + assert, + assertEquals, + assertMatch, + unitTest, + unreachable, +} from "./test_util.ts"; + +const readErrorStackPattern = new RegExp( + `^.* + at handleError \\(.*10_dispatch_buffer\\.js:.*\\) + at bufferOpParseResult \\(.*10_dispatch_buffer\\.js:.*\\) + at Array. \\(.*10_dispatch_buffer\\.js:.*\\).*$`, + "ms", +); + +unitTest(async function sendAsyncStackTrace(): Promise { + const buf = new Uint8Array(10); + const rid = 10; + try { + await Deno.read(rid, buf); + unreachable(); + } catch (error) { + assertMatch(error.stack, readErrorStackPattern); + } +}); + +declare global { + // deno-lint-ignore no-namespace + namespace Deno { + // deno-lint-ignore no-explicit-any + var core: any; // eslint-disable-line no-var + } +} + +unitTest(function bufferOpsHeaderTooShort(): void { + for (const op of ["op_read_sync", "op_read_async"]) { + const readOpId = Deno.core.ops()[op]; + const res = Deno.core.send( + readOpId, + new Uint8Array([ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + ]), + ); + + const headerByteLength = 4 * 4; + assert(res.byteLength > headerByteLength); + const view = new DataView( + res.buffer, + res.byteOffset + res.byteLength - headerByteLength, + headerByteLength, + ); + + const requestId = Number(view.getBigUint64(0, true)); + const status = view.getUint32(8, true); + const result = view.getUint32(12, true); + + assert(requestId === 0); + assert(status !== 0); + assertEquals(new TextDecoder().decode(res.slice(0, result)), "TypeError"); + assertEquals( + new TextDecoder().decode(res.slice(result, -headerByteLength)).trim(), + "Unparsable control buffer", + ); + } +}); diff --git a/cli/tests/unit/dispatch_minimal_test.ts b/cli/tests/unit/dispatch_minimal_test.ts deleted file mode 100644 index 234ba6a1ce3661..00000000000000 --- a/cli/tests/unit/dispatch_minimal_test.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { - assert, - assertEquals, - assertMatch, - unitTest, - unreachable, -} from "./test_util.ts"; - -const readErrorStackPattern = new RegExp( - `^.* - at unwrapResponse \\(.*dispatch_minimal\\.js:.*\\) - at sendAsync \\(.*dispatch_minimal\\.js:.*\\) - at async Object\\.read \\(.*io\\.js:.*\\).*$`, - "ms", -); - -unitTest(async function sendAsyncStackTrace(): Promise { - const buf = new Uint8Array(10); - const rid = 10; - try { - await Deno.read(rid, buf); - unreachable(); - } catch (error) { - assertMatch(error.stack, readErrorStackPattern); - } -}); - -declare global { - // deno-lint-ignore no-namespace - namespace Deno { - // deno-lint-ignore no-explicit-any - var core: any; // eslint-disable-line no-var - } -} - -unitTest(function malformedMinimalControlBuffer(): void { - const readOpId = Deno.core.ops()["op_read"]; - const res = Deno.core.send(readOpId, new Uint8Array([1, 2, 3, 4, 5])); - const header = res.slice(0, 12); - const buf32 = new Int32Array( - header.buffer, - header.byteOffset, - header.byteLength / 4, - ); - const arg = buf32[1]; - const codeAndMessage = new TextDecoder().decode(res.slice(12)).trim(); - assert(arg < 0); - assertEquals(codeAndMessage, "TypeErrorUnparsable control buffer"); -}); diff --git a/cli/tests/unit/metrics_test.ts b/cli/tests/unit/metrics_test.ts index 2f12ac90d190aa..525e5aae654948 100644 --- a/cli/tests/unit/metrics_test.ts +++ b/cli/tests/unit/metrics_test.ts @@ -13,7 +13,7 @@ unitTest(async function metrics(): Promise { assert(m1.bytesSentControl > 0); assert(m1.bytesSentData >= 0); assert(m1.bytesReceived > 0); - const m1OpWrite = m1.ops["op_write"]; + const m1OpWrite = m1.ops["op_write_async"]; assert(m1OpWrite.opsDispatchedAsync > 0); assert(m1OpWrite.opsCompletedAsync > 0); assert(m1OpWrite.bytesSentControl > 0); @@ -28,7 +28,7 @@ unitTest(async function metrics(): Promise { assert(m2.bytesSentControl > m1.bytesSentControl); assert(m2.bytesSentData >= m1.bytesSentData + dataMsg.byteLength); assert(m2.bytesReceived > m1.bytesReceived); - const m2OpWrite = m2.ops["op_write"]; + const m2OpWrite = m2.ops["op_write_async"]; assert(m2OpWrite.opsDispatchedAsync > m1OpWrite.opsDispatchedAsync); assert(m2OpWrite.opsCompletedAsync > m1OpWrite.opsCompletedAsync); assert(m2OpWrite.bytesSentControl > m1OpWrite.bytesSentControl); diff --git a/cli/tests/unit/unit_tests.ts b/cli/tests/unit/unit_tests.ts index 2664a9ab0e3d03..6277abdfe171a9 100644 --- a/cli/tests/unit/unit_tests.ts +++ b/cli/tests/unit/unit_tests.ts @@ -15,7 +15,7 @@ import "./console_test.ts"; import "./copy_file_test.ts"; import "./custom_event_test.ts"; import "./dir_test.ts"; -import "./dispatch_minimal_test.ts"; +import "./dispatch_buffer_test.ts"; import "./dispatch_json_test.ts"; import "./error_stack_test.ts"; import "./event_test.ts"; diff --git a/core/core.js b/core/core.js index fead239075dccd..f44bf253ef282b 100644 --- a/core/core.js +++ b/core/core.js @@ -155,6 +155,10 @@ SharedQueue Binary Layout asyncHandlers[opId] = cb; } + function setAsyncHandlerByName(opName, cb) { + setAsyncHandler(opsCache[opName], cb); + } + function handleAsyncMsgFromRust() { while (true) { const opIdBuf = shift(); @@ -256,6 +260,7 @@ SharedQueue Binary Layout jsonOpAsync, jsonOpSync, setAsyncHandler, + setAsyncHandlerByName, dispatch: send, dispatchByName: dispatch, ops, diff --git a/runtime/js/10_dispatch_buffer.js b/runtime/js/10_dispatch_buffer.js new file mode 100644 index 00000000000000..091fce504a297f --- /dev/null +++ b/runtime/js/10_dispatch_buffer.js @@ -0,0 +1,150 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const core = window.Deno.core; + + function assert(cond) { + if (!cond) { + throw Error("assert"); + } + } + + //////////////////////////////////////////////////////////////////////////////////////////// + ////////////////////////////// General async handling ////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + // General Async response handling + let nextRequestId = 1; + const promiseTable = {}; + + function opAsync(opName, opRequestBuilder, opResultParser) { + // Make sure requests of this type are handled by the asyncHandler + // The asyncHandler's role is to call the "promiseTable[requestId]" function + core.setAsyncHandlerByName(opName, (bufUi8, _) => { + const [requestId, result, error] = opResultParser(bufUi8, true); + if (error !== null) { + promiseTable[requestId][1](error); + } else { + promiseTable[requestId][0](result); + } + delete promiseTable[requestId]; + }); + + const requestId = nextRequestId++; + + // Create and store promise + const promise = new Promise((resolve, reject) => { + promiseTable[requestId] = [resolve, reject]; + }); + + // Synchronously dispatch async request + core.dispatchByName(opName, ...opRequestBuilder(requestId)); + + // Wait for async response + return promise; + } + + function opSync(opName, opRequestBuilder, opResultParser) { + const rawResult = core.dispatchByName(opName, ...opRequestBuilder()); + + const [_, result, error] = opResultParser(rawResult, false); + if (error !== null) throw error; + return result; + } + + //////////////////////////////////////////////////////////////////////////////////////////// + /////////////////////////////////// Error handling ///////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + function handleError(className, message) { + const [ErrorClass, args] = core.getErrorClassAndArgs(className); + if (!ErrorClass) { + return new Error( + `Unregistered error class: "${className}"\n` + + ` ${message}\n` + + ` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`, + ); + } + return new ErrorClass(message, ...args); + } + + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////////// Buffer ops handling ////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const scratchBytes = new ArrayBuffer(3 * 4); + const scratchView = new DataView( + scratchBytes, + scratchBytes.byteOffset, + scratchBytes.byteLength, + ); + + function bufferOpBuildRequest(requestId, argument, zeroCopy) { + scratchView.setBigUint64(0, BigInt(requestId), true); + scratchView.setUint32(8, argument, true); + return [scratchView, ...zeroCopy]; + } + + function bufferOpParseResult(bufUi8, isCopyNeeded) { + // Decode header value from ui8 buffer + const headerByteLength = 4 * 4; + assert(bufUi8.byteLength >= headerByteLength); + assert(bufUi8.byteLength % 4 == 0); + const view = new DataView( + bufUi8.buffer, + bufUi8.byteOffset + bufUi8.byteLength - headerByteLength, + headerByteLength, + ); + + const requestId = Number(view.getBigUint64(0, true)); + const status = view.getUint32(8, true); + const result = view.getUint32(12, true); + + // Error handling + if (status !== 0) { + const className = core.decode(bufUi8.subarray(0, result)); + const message = core.decode(bufUi8.subarray(result, -headerByteLength)) + .trim(); + + return [requestId, null, handleError(className, message)]; + } + + if (bufUi8.byteLength === headerByteLength) { + return [requestId, result, null]; + } + + // Rest of response buffer is passed as reference or as a copy + let respBuffer = null; + if (isCopyNeeded) { + // Copy part of the response array (if sent through shared array buf) + respBuffer = bufUi8.slice(0, result); + } else { + // Create view on existing array (if sent through overflow) + respBuffer = bufUi8.subarray(0, result); + } + + return [requestId, respBuffer, null]; + } + + function bufferOpAsync(opName, argument = 0, ...zeroCopy) { + return opAsync( + opName, + (requestId) => bufferOpBuildRequest(requestId, argument, zeroCopy), + bufferOpParseResult, + ); + } + + function bufferOpSync(opName, argument = 0, ...zeroCopy) { + return opSync( + opName, + () => bufferOpBuildRequest(0, argument, zeroCopy), + bufferOpParseResult, + ); + } + + window.__bootstrap.dispatchBuffer = { + bufferOpSync, + bufferOpAsync, + }; +})(this); diff --git a/runtime/js/10_dispatch_minimal.js b/runtime/js/10_dispatch_minimal.js deleted file mode 100644 index e74f8c3934ff19..00000000000000 --- a/runtime/js/10_dispatch_minimal.js +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -"use strict"; - -((window) => { - const core = window.Deno.core; - const util = window.__bootstrap.util; - - // Using an object without a prototype because `Map` was causing GC problems. - const promiseTableMin = Object.create(null); - - const decoder = new TextDecoder(); - - // Note it's important that promiseId starts at 1 instead of 0, because sync - // messages are indicated with promiseId 0. If we ever add wrap around logic for - // overflows, this should be taken into account. - let _nextPromiseId = 1; - - function nextPromiseId() { - return _nextPromiseId++; - } - - function recordFromBufMinimal(ui8) { - const headerLen = 12; - const header = ui8.subarray(0, headerLen); - const buf32 = new Int32Array( - header.buffer, - header.byteOffset, - header.byteLength / 4, - ); - const promiseId = buf32[0]; - const arg = buf32[1]; - const result = buf32[2]; - let err; - - if (arg < 0) { - err = { - className: decoder.decode(ui8.subarray(headerLen, headerLen + result)), - message: decoder.decode(ui8.subarray(headerLen + result)), - }; - } else if (ui8.length != 12) { - throw new TypeError("Malformed response message"); - } - - return { - promiseId, - arg, - result, - err, - }; - } - - function unwrapResponse(res) { - if (res.err != null) { - const [ErrorClass, args] = core.getErrorClassAndArgs(res.err.className); - if (!ErrorClass) { - throw new Error( - `Unregistered error class: "${res.err.className}"\n ${res.err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`, - ); - } - throw new ErrorClass(res.err.message, ...args); - } - return res.result; - } - - const scratch32 = new Int32Array(3); - const scratchBytes = new Uint8Array( - scratch32.buffer, - scratch32.byteOffset, - scratch32.byteLength, - ); - util.assert(scratchBytes.byteLength === scratch32.length * 4); - - function asyncMsgFromRust(ui8) { - const record = recordFromBufMinimal(ui8); - const { promiseId } = record; - const promise = promiseTableMin[promiseId]; - delete promiseTableMin[promiseId]; - util.assert(promise); - promise.resolve(record); - } - - async function sendAsync(opName, arg, zeroCopy) { - const promiseId = nextPromiseId(); // AKA cmdId - scratch32[0] = promiseId; - scratch32[1] = arg; - scratch32[2] = 0; // result - const promise = util.createResolvable(); - const buf = core.dispatchByName(opName, scratchBytes, zeroCopy); - if (buf != null) { - const record = recordFromBufMinimal(buf); - // Sync result. - promise.resolve(record); - } else { - // Async result. - promiseTableMin[promiseId] = promise; - } - - const res = await promise; - return unwrapResponse(res); - } - - function sendSync(opName, arg, zeroCopy) { - scratch32[0] = 0; // promiseId 0 indicates sync - scratch32[1] = arg; - const res = core.dispatchByName(opName, scratchBytes, zeroCopy); - const resRecord = recordFromBufMinimal(res); - return unwrapResponse(resRecord); - } - - window.__bootstrap.dispatchMinimal = { - asyncMsgFromRust, - sendSync, - sendAsync, - }; -})(this); diff --git a/runtime/js/11_timers.js b/runtime/js/11_timers.js index 4c693aa4a46327..f07622388548b4 100644 --- a/runtime/js/11_timers.js +++ b/runtime/js/11_timers.js @@ -4,7 +4,7 @@ ((window) => { const assert = window.__bootstrap.util.assert; const core = window.Deno.core; - const { sendSync } = window.__bootstrap.dispatchMinimal; + const { bufferOpSync } = window.__bootstrap.dispatchBuffer; function opStopGlobalTimer() { core.jsonOpSync("op_global_timer_stop"); @@ -20,7 +20,7 @@ const nowBytes = new Uint8Array(8); function opNow() { - sendSync("op_now", 0, nowBytes); + bufferOpSync("op_now", 0, nowBytes); return new DataView(nowBytes.buffer).getFloat64(); } diff --git a/runtime/js/12_io.js b/runtime/js/12_io.js index 3818069c111510..09e87f990ffbb8 100644 --- a/runtime/js/12_io.js +++ b/runtime/js/12_io.js @@ -7,7 +7,7 @@ ((window) => { const DEFAULT_BUFFER_SIZE = 32 * 1024; - const { sendSync, sendAsync } = window.__bootstrap.dispatchMinimal; + const { bufferOpSync, bufferOpAsync } = window.__bootstrap.dispatchBuffer; // Seek whence values. // https://golang.org/pkg/io/#pkg-constants const SeekMode = { @@ -81,7 +81,7 @@ return 0; } - const nread = sendSync("op_read", rid, buffer); + const nread = bufferOpSync("op_read_sync", rid, buffer); if (nread < 0) { throw new Error("read error"); } @@ -97,7 +97,7 @@ return 0; } - const nread = await sendAsync("op_read", rid, buffer); + const nread = await bufferOpAsync("op_read_async", rid, buffer); if (nread < 0) { throw new Error("read error"); } @@ -106,7 +106,7 @@ } function writeSync(rid, data) { - const result = sendSync("op_write", rid, data); + const result = bufferOpSync("op_write_sync", rid, data); if (result < 0) { throw new Error("write error"); } @@ -115,7 +115,7 @@ } async function write(rid, data) { - const result = await sendAsync("op_write", rid, data); + const result = await bufferOpAsync("op_write_async", rid, data); if (result < 0) { throw new Error("write error"); } diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index d96aaaaae55945..233c5cd4375220 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -11,7 +11,6 @@ delete Object.prototype.__proto__; const eventTarget = window.__bootstrap.eventTarget; const globalInterfaces = window.__bootstrap.globalInterfaces; const location = window.__bootstrap.location; - const dispatchMinimal = window.__bootstrap.dispatchMinimal; const build = window.__bootstrap.build; const version = window.__bootstrap.version; const errorStack = window.__bootstrap.errorStack; @@ -142,12 +141,7 @@ delete Object.prototype.__proto__; } function runtimeStart(runtimeOptions, source) { - const opsMap = core.ops(); - for (const [name, opId] of Object.entries(opsMap)) { - if (name === "op_write" || name === "op_read") { - core.setAsyncHandler(opId, dispatchMinimal.asyncMsgFromRust); - } - } + core.ops(); core.setMacrotaskCallback(timers.handleTimerMacrotask); version.setVersions( diff --git a/runtime/ops/dispatch_minimal.rs b/runtime/ops/dispatch_minimal.rs deleted file mode 100644 index b35d0def5e6318..00000000000000 --- a/runtime/ops/dispatch_minimal.rs +++ /dev/null @@ -1,210 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -use deno_core::error::AnyError; -use deno_core::futures::future::FutureExt; -use deno_core::BufVec; -use deno_core::Op; -use deno_core::OpFn; -use deno_core::OpState; -use std::cell::RefCell; -use std::future::Future; -use std::iter::repeat; -use std::mem::size_of_val; -use std::pin::Pin; -use std::rc::Rc; -use std::slice; - -pub enum MinimalOp { - Sync(Result), - Async(Pin>>>), -} - -#[derive(Copy, Clone, Debug, PartialEq)] -// This corresponds to RecordMinimal on the TS side. -pub struct Record { - pub promise_id: i32, - pub arg: i32, - pub result: i32, -} - -impl Into> for Record { - fn into(self) -> Box<[u8]> { - let vec = vec![self.promise_id, self.arg, self.result]; - let buf32 = vec.into_boxed_slice(); - let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4]; - unsafe { Box::from_raw(ptr) } - } -} - -pub struct ErrorRecord { - pub promise_id: i32, - pub arg: i32, - pub error_len: i32, - pub error_class: &'static [u8], - pub error_message: Vec, -} - -impl Into> for ErrorRecord { - fn into(self) -> Box<[u8]> { - let Self { - promise_id, - arg, - error_len, - error_class, - error_message, - .. - } = self; - let header_i32 = [promise_id, arg, error_len]; - let header_u8 = unsafe { - slice::from_raw_parts( - &header_i32 as *const _ as *const u8, - size_of_val(&header_i32), - ) - }; - let padded_len = - (header_u8.len() + error_class.len() + error_message.len() + 3usize) - & !3usize; - header_u8 - .iter() - .cloned() - .chain(error_class.iter().cloned()) - .chain(error_message.into_iter()) - .chain(repeat(b' ')) - .take(padded_len) - .collect() - } -} - -pub fn parse_min_record(bytes: &[u8]) -> Option { - if bytes.len() % std::mem::size_of::() != 0 { - return None; - } - let p = bytes.as_ptr(); - #[allow(clippy::cast_ptr_alignment)] - let p32 = p as *const i32; - let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) }; - - if s.len() != 3 { - return None; - } - let ptr = s.as_ptr(); - let ints = unsafe { std::slice::from_raw_parts(ptr, 3) }; - Some(Record { - promise_id: ints[0], - arg: ints[1], - result: ints[2], - }) -} - -pub fn minimal_op(op_fn: F) -> Box -where - F: Fn(Rc>, bool, i32, BufVec) -> MinimalOp + 'static, -{ - Box::new(move |state: Rc>, bufs: BufVec| { - let mut bufs_iter = bufs.into_iter(); - let record_buf = bufs_iter.next().expect("Expected record at position 0"); - let zero_copy = bufs_iter.collect::(); - - let mut record = match parse_min_record(&record_buf) { - Some(r) => r, - None => { - let error_class = b"TypeError"; - let error_message = b"Unparsable control buffer"; - let error_record = ErrorRecord { - promise_id: 0, - arg: -1, - error_len: error_class.len() as i32, - error_class, - error_message: error_message[..].to_owned(), - }; - return Op::Sync(error_record.into()); - } - }; - let is_sync = record.promise_id == 0; - let rid = record.arg; - let min_op = op_fn(state.clone(), is_sync, rid, zero_copy); - - match min_op { - MinimalOp::Sync(sync_result) => Op::Sync(match sync_result { - Ok(r) => { - record.result = r; - record.into() - } - Err(err) => { - let error_class = (state.borrow().get_error_class_fn)(&err); - let error_record = ErrorRecord { - promise_id: record.promise_id, - arg: -1, - error_len: error_class.len() as i32, - error_class: error_class.as_bytes(), - error_message: err.to_string().as_bytes().to_owned(), - }; - error_record.into() - } - }), - MinimalOp::Async(min_fut) => { - let fut = async move { - match min_fut.await { - Ok(r) => { - record.result = r; - record.into() - } - Err(err) => { - let error_class = (state.borrow().get_error_class_fn)(&err); - let error_record = ErrorRecord { - promise_id: record.promise_id, - arg: -1, - error_len: error_class.len() as i32, - error_class: error_class.as_bytes(), - error_message: err.to_string().as_bytes().to_owned(), - }; - error_record.into() - } - } - }; - Op::Async(fut.boxed_local()) - } - } - }) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_error_record() { - let expected = vec![ - 1, 0, 0, 0, 255, 255, 255, 255, 11, 0, 0, 0, 66, 97, 100, 82, 101, 115, - 111, 117, 114, 99, 101, 69, 114, 114, 111, 114, - ]; - let err_record = ErrorRecord { - promise_id: 1, - arg: -1, - error_len: 11, - error_class: b"BadResource", - error_message: b"Error".to_vec(), - }; - let buf: Box<[u8]> = err_record.into(); - assert_eq!(buf, expected.into_boxed_slice()); - } - - #[test] - fn test_parse_min_record() { - let buf = vec![1, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]; - assert_eq!( - parse_min_record(&buf), - Some(Record { - promise_id: 1, - arg: 3, - result: 4 - }) - ); - - let buf = vec![]; - assert_eq!(parse_min_record(&buf), None); - - let buf = vec![5]; - assert_eq!(parse_min_record(&buf), None); - } -} diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index bda8a51cb90883..4073342be852ec 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -1,13 +1,8 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use super::dispatch_minimal::minimal_op; -use super::dispatch_minimal::MinimalOp; -use crate::metrics::metrics_op; use deno_core::error::resource_unavailable; -use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::error::{bad_resource_id, not_supported}; -use deno_core::futures::future::FutureExt; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; @@ -24,7 +19,6 @@ use deno_core::ZeroCopyBuf; use serde::Deserialize; use std::borrow::Cow; use std::cell::RefCell; -use std::convert::TryInto; use std::io::Read; use std::io::Write; use std::rc::Rc; @@ -105,8 +99,12 @@ lazy_static! { } pub fn init(rt: &mut JsRuntime) { - rt.register_op("op_read", metrics_op("op_read", minimal_op(op_read))); - rt.register_op("op_write", metrics_op("op_write", minimal_op(op_write))); + super::reg_buffer_async(rt, "op_read_async", op_read_async); + super::reg_buffer_async(rt, "op_write_async", op_write_async); + + super::reg_buffer_sync(rt, "op_read_sync", op_read_sync); + super::reg_buffer_sync(rt, "op_write_sync", op_write_sync); + super::reg_json_async(rt, "op_shutdown", op_shutdown); } @@ -138,10 +136,6 @@ fn get_stdio_stream( } } -fn no_buffer_specified() -> AnyError { - type_error("no buffer specified") -} - #[cfg(unix)] use nix::sys::termios; @@ -526,36 +520,15 @@ impl Resource for StdFileResource { } } -pub fn op_read( - state: Rc>, - is_sync: bool, - rid: i32, - bufs: BufVec, -) -> MinimalOp { - match bufs.len() { - 0 => return MinimalOp::Sync(Err(no_buffer_specified())), - 1 => {} - _ => panic!("Invalid number of arguments"), - }; - let buf = bufs.into_iter().next().unwrap(); - - if is_sync { - MinimalOp::Sync(op_read_sync(state, rid, buf)) - } else { - MinimalOp::Async(op_read_async(state, rid, buf).boxed_local()) - } -} - fn op_read_sync( - state: Rc>, - rid: i32, - mut buf: ZeroCopyBuf, -) -> Result { - let rid = rid.try_into().map_err(|_| bad_resource_id())?; - StdFileResource::with(&mut state.borrow_mut(), rid, move |r| match r { + state: &mut OpState, + rid: u32, + bufs: &mut [ZeroCopyBuf], +) -> Result { + StdFileResource::with(state, rid, move |r| match r { Ok(std_file) => std_file - .read(&mut buf) - .map(|n: usize| n as i32) + .read(&mut bufs[0]) + .map(|n: usize| n as u32) .map_err(AnyError::from), Err(_) => Err(not_supported()), }) @@ -563,65 +536,44 @@ fn op_read_sync( async fn op_read_async( state: Rc>, - rid: i32, - mut buf: ZeroCopyBuf, -) -> Result { - let rid = rid.try_into().map_err(|_| bad_resource_id())?; + rid: u32, + mut bufs: BufVec, +) -> Result { + let buf = &mut bufs[0]; let resource = state .borrow() .resource_table .get_any(rid) .ok_or_else(bad_resource_id)?; let nread = if let Some(s) = resource.downcast_rc::() { - s.read(&mut buf).await? + s.read(buf).await? } else if let Some(s) = resource.downcast_rc::() { - s.read(&mut buf).await? + s.read(buf).await? } else if let Some(s) = resource.downcast_rc::() { - s.read(&mut buf).await? + s.read(buf).await? } else if let Some(s) = resource.downcast_rc::() { - s.read(&mut buf).await? + s.read(buf).await? } else if let Some(s) = resource.downcast_rc::() { - s.read(&mut buf).await? + s.read(buf).await? } else if let Some(s) = resource.downcast_rc::() { - s.read(&mut buf).await? + s.read(buf).await? } else if let Some(s) = resource.downcast_rc::() { - s.read(&mut buf).await? + s.read(buf).await? } else { return Err(not_supported()); }; - Ok(nread as i32) -} - -pub fn op_write( - state: Rc>, - is_sync: bool, - rid: i32, - bufs: BufVec, -) -> MinimalOp { - match bufs.len() { - 0 => return MinimalOp::Sync(Err(no_buffer_specified())), - 1 => {} - _ => panic!("Invalid number of arguments"), - }; - let buf = bufs.into_iter().next().unwrap(); - - if is_sync { - MinimalOp::Sync(op_write_sync(state, rid, buf)) - } else { - MinimalOp::Async(op_write_async(state, rid, buf).boxed_local()) - } + Ok(nread as u32) } fn op_write_sync( - state: Rc>, - rid: i32, - buf: ZeroCopyBuf, -) -> Result { - let rid = rid.try_into().map_err(|_| bad_resource_id())?; - StdFileResource::with(&mut state.borrow_mut(), rid, move |r| match r { + state: &mut OpState, + rid: u32, + bufs: &mut [ZeroCopyBuf], +) -> Result { + StdFileResource::with(state, rid, move |r| match r { Ok(std_file) => std_file - .write(&buf) - .map(|nwritten: usize| nwritten as i32) + .write(&bufs[0]) + .map(|nwritten: usize| nwritten as u32) .map_err(AnyError::from), Err(_) => Err(not_supported()), }) @@ -629,36 +581,36 @@ fn op_write_sync( async fn op_write_async( state: Rc>, - rid: i32, - buf: ZeroCopyBuf, -) -> Result { - let rid = rid.try_into().map_err(|_| bad_resource_id())?; + rid: u32, + bufs: BufVec, +) -> Result { + let buf = &bufs[0]; let resource = state .borrow() .resource_table .get_any(rid) .ok_or_else(bad_resource_id)?; let nwritten = if let Some(s) = resource.downcast_rc::() { - s.write(&buf).await? + s.write(buf).await? } else if let Some(s) = resource.downcast_rc::() { - s.write(&buf).await? + s.write(buf).await? } else if let Some(s) = resource.downcast_rc::() { - s.write(&buf).await? + s.write(buf).await? } else if let Some(s) = resource.downcast_rc::() { - s.write(&buf).await? + s.write(buf).await? } else if let Some(s) = resource.downcast_rc::() { - s.write(&buf).await? + s.write(buf).await? } else if let Some(s) = resource.downcast_rc::() { - s.write(&buf).await? + s.write(buf).await? } else { return Err(not_supported()); }; - Ok(nwritten as i32) + Ok(nwritten as u32) } #[derive(Deserialize)] struct ShutdownArgs { - rid: i32, + rid: u32, } async fn op_shutdown( @@ -666,10 +618,7 @@ async fn op_shutdown( args: Value, _zero_copy: BufVec, ) -> Result { - let rid = serde_json::from_value::(args)? - .rid - .try_into() - .map_err(|_| bad_resource_id())?; + let rid = serde_json::from_value::(args)?.rid; let resource = state .borrow() .resource_table diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs index 6b64b8042f8887..e082c5d3a728f9 100644 --- a/runtime/ops/mod.rs +++ b/runtime/ops/mod.rs @@ -1,8 +1,5 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -mod dispatch_minimal; -pub use dispatch_minimal::MinimalOp; - pub mod crypto; pub mod fetch; pub mod fs; @@ -11,6 +8,7 @@ pub mod io; pub mod net; #[cfg(unix)] mod net_unix; +mod ops_buffer; pub mod os; pub mod permissions; pub mod plugin; @@ -36,6 +34,9 @@ use deno_core::BufVec; use deno_core::JsRuntime; use deno_core::OpState; use deno_core::ZeroCopyBuf; +use ops_buffer::buffer_op_async; +use ops_buffer::buffer_op_sync; +use ops_buffer::ValueOrVector; use std::cell::RefCell; use std::future::Future; use std::rc::Rc; @@ -62,6 +63,26 @@ where rt.register_op(name, metrics_op(name, json_op_sync(op_fn))); } +pub fn reg_buffer_async( + rt: &mut JsRuntime, + name: &'static str, + op_fn: F, +) where + F: Fn(Rc>, u32, BufVec) -> R + 'static, + R: Future> + 'static, + RV: ValueOrVector, +{ + rt.register_op(name, metrics_op(name, buffer_op_async(op_fn))); +} + +pub fn reg_buffer_sync(rt: &mut JsRuntime, name: &'static str, op_fn: F) +where + F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result + 'static, + R: ValueOrVector, +{ + rt.register_op(name, metrics_op(name, buffer_op_sync(op_fn))); +} + /// `UnstableChecker` is a struct so it can be placed inside `GothamState`; /// using type alias for a bool could work, but there's a high chance /// that there might be another type alias pointing to a bool, which diff --git a/runtime/ops/ops_buffer.rs b/runtime/ops/ops_buffer.rs new file mode 100644 index 00000000000000..6998144cf63f90 --- /dev/null +++ b/runtime/ops/ops_buffer.rs @@ -0,0 +1,377 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::AnyError; +use deno_core::futures::future::FutureExt; +use deno_core::BufVec; +use deno_core::Op; +use deno_core::OpFn; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use std::boxed::Box; +use std::cell::RefCell; +use std::convert::TryInto; +use std::future::Future; +use std::rc::Rc; + +#[derive(Copy, Clone, Debug, PartialEq)] +pub struct RequestHeader { + pub request_id: u64, + pub argument: u32, +} + +impl RequestHeader { + pub fn from_raw(bytes: &[u8]) -> Option { + if bytes.len() < 3 * 4 { + return None; + } + + Some(Self { + request_id: u64::from_le_bytes(bytes[0..8].try_into().unwrap()), + argument: u32::from_le_bytes(bytes[8..12].try_into().unwrap()), + }) + } +} + +#[derive(Copy, Clone, Debug, PartialEq)] +pub struct ResponseHeader { + pub request_id: u64, + pub status: u32, + pub result: u32, +} + +impl Into<[u8; 16]> for ResponseHeader { + fn into(self) -> [u8; 16] { + let mut resp_header = [0u8; 16]; + resp_header[0..8].copy_from_slice(&self.request_id.to_le_bytes()); + resp_header[8..12].copy_from_slice(&self.status.to_le_bytes()); + resp_header[12..16].copy_from_slice(&self.result.to_le_bytes()); + resp_header + } +} + +pub trait ValueOrVector { + fn value(&self) -> u32; + fn vector(self) -> Option>; +} + +impl ValueOrVector for Vec { + fn value(&self) -> u32 { + self.len() as u32 + } + fn vector(self) -> Option> { + Some(self) + } +} + +impl ValueOrVector for u32 { + fn value(&self) -> u32 { + *self + } + fn vector(self) -> Option> { + None + } +} + +fn gen_padding_32bit(len: usize) -> &'static [u8] { + &[b' ', b' ', b' '][0..(4 - (len & 3)) & 3] +} + +/// Creates an op that passes data synchronously using raw ui8 buffer. +/// +/// The provided function `op_fn` has the following parameters: +/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op. +/// * `argument`: the i32 value that is passed to the Rust function. +/// * `&mut [ZeroCopyBuf]`: raw bytes passed along. +/// +/// `op_fn` returns an array buffer value, which is directly returned to JavaScript. +/// +/// When registering an op like this... +/// ```ignore +/// let mut runtime = JsRuntime::new(...); +/// runtime.register_op("hello", deno_core::buffer_op_sync(Self::hello_op)); +/// ``` +/// +/// ...it can be invoked from JS using the provided name, for example: +/// ```js +/// Deno.core.ops(); +/// let result = Deno.core.bufferOpSync("function_name", args); +/// ``` +/// +/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. +/// A more complete example is available in the examples directory. +pub fn buffer_op_sync(op_fn: F) -> Box +where + F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result + 'static, + R: ValueOrVector, +{ + Box::new(move |state: Rc>, bufs: BufVec| -> Op { + let mut bufs_iter = bufs.into_iter(); + let record_buf = bufs_iter.next().expect("Expected record at position 0"); + let mut zero_copy = bufs_iter.collect::(); + + let req_header = match RequestHeader::from_raw(&record_buf) { + Some(r) => r, + None => { + let error_class = b"TypeError"; + let error_message = b"Unparsable control buffer"; + let len = error_class.len() + error_message.len(); + let padding = gen_padding_32bit(len); + let resp_header = ResponseHeader { + request_id: 0, + status: 1, + result: error_class.len() as u32, + }; + return Op::Sync( + error_class + .iter() + .chain(error_message.iter()) + .chain(padding) + .chain(&Into::<[u8; 16]>::into(resp_header)) + .cloned() + .collect(), + ); + } + }; + + match op_fn(&mut state.borrow_mut(), req_header.argument, &mut zero_copy) { + Ok(possibly_vector) => { + let resp_header = ResponseHeader { + request_id: req_header.request_id, + status: 0, + result: possibly_vector.value(), + }; + let resp_encoded_header = Into::<[u8; 16]>::into(resp_header); + + let resp_vector = match possibly_vector.vector() { + Some(mut vector) => { + let padding = gen_padding_32bit(vector.len()); + vector.extend(padding); + vector.extend(&resp_encoded_header); + vector + } + None => resp_encoded_header.to_vec(), + }; + Op::Sync(resp_vector.into_boxed_slice()) + } + Err(error) => { + let error_class = + (state.borrow().get_error_class_fn)(&error).as_bytes(); + let error_message = error.to_string().as_bytes().to_owned(); + let len = error_class.len() + error_message.len(); + let padding = gen_padding_32bit(len); + let resp_header = ResponseHeader { + request_id: req_header.request_id, + status: 1, + result: error_class.len() as u32, + }; + return Op::Sync( + error_class + .iter() + .chain(error_message.iter()) + .chain(padding) + .chain(&Into::<[u8; 16]>::into(resp_header)) + .cloned() + .collect(), + ); + } + } + }) +} + +/// Creates an op that passes data asynchronously using raw ui8 buffer. +/// +/// The provided function `op_fn` has the following parameters: +/// * `Rc>`: the op state, can be used to read/write resources in the runtime from an op. +/// * `argument`: the i32 value that is passed to the Rust function. +/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used. +/// +/// `op_fn` returns a future, whose output is a JSON value. This value will be asynchronously +/// returned to JavaScript. +/// +/// When registering an op like this... +/// ```ignore +/// let mut runtime = JsRuntime::new(...); +/// runtime.register_op("hello", deno_core::json_op_async(Self::hello_op)); +/// ``` +/// +/// ...it can be invoked from JS using the provided name, for example: +/// ```js +/// Deno.core.ops(); +/// let future = Deno.core.jsonOpAsync("function_name", args); +/// ``` +/// +/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. +/// A more complete example is available in the examples directory. +pub fn buffer_op_async(op_fn: F) -> Box +where + F: Fn(Rc>, u32, BufVec) -> R + 'static, + R: Future> + 'static, + RV: ValueOrVector, +{ + Box::new(move |state: Rc>, bufs: BufVec| -> Op { + let mut bufs_iter = bufs.into_iter(); + let record_buf = bufs_iter.next().expect("Expected record at position 0"); + let zero_copy = bufs_iter.collect::(); + + let req_header = match RequestHeader::from_raw(&record_buf) { + Some(r) => r, + None => { + let error_class = b"TypeError"; + let error_message = b"Unparsable control buffer"; + let len = error_class.len() + error_message.len(); + let padding = gen_padding_32bit(len); + let resp_header = ResponseHeader { + request_id: 0, + status: 1, + result: error_class.len() as u32, + }; + return Op::Sync( + error_class + .iter() + .chain(error_message.iter()) + .chain(padding) + .chain(&Into::<[u8; 16]>::into(resp_header)) + .cloned() + .collect(), + ); + } + }; + + let fut = + op_fn(state.clone(), req_header.argument, zero_copy).map(move |result| { + match result { + Ok(possibly_vector) => { + let resp_header = ResponseHeader { + request_id: req_header.request_id, + status: 0, + result: possibly_vector.value(), + }; + let resp_encoded_header = Into::<[u8; 16]>::into(resp_header); + + let resp_vector = match possibly_vector.vector() { + Some(mut vector) => { + let padding = gen_padding_32bit(vector.len()); + vector.extend(padding); + vector.extend(&resp_encoded_header); + vector + } + None => resp_encoded_header.to_vec(), + }; + resp_vector.into_boxed_slice() + } + Err(error) => { + let error_class = + (state.borrow().get_error_class_fn)(&error).as_bytes(); + let error_message = error.to_string().as_bytes().to_owned(); + let len = error_class.len() + error_message.len(); + let padding = gen_padding_32bit(len); + let resp_header = ResponseHeader { + request_id: req_header.request_id, + status: 1, + result: error_class.len() as u32, + }; + + error_class + .iter() + .chain(error_message.iter()) + .chain(padding) + .chain(&Into::<[u8; 16]>::into(resp_header)) + .cloned() + .collect() + } + } + }); + let temp = Box::pin(fut); + Op::Async(temp) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn padding() { + assert_eq!(gen_padding_32bit(0), &[] as &[u8]); + assert_eq!(gen_padding_32bit(1), &[b' ', b' ', b' ']); + assert_eq!(gen_padding_32bit(2), &[b' ', b' ']); + assert_eq!(gen_padding_32bit(3), &[b' ']); + assert_eq!(gen_padding_32bit(4), &[] as &[u8]); + assert_eq!(gen_padding_32bit(5), &[b' ', b' ', b' ']); + } + + #[test] + fn response_header_to_bytes() { + // Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´ + let resp_header = ResponseHeader { + request_id: 0x0102030405060708u64, + status: 0x090A0B0Cu32, + result: 0x0D0E0F10u32, + }; + + // All numbers are always little-endian encoded, as the js side also wants this to be fixed + assert_eq!( + &Into::<[u8; 16]>::into(resp_header), + &[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 16, 15, 14, 13] + ); + } + + #[test] + fn response_header_to_bytes_max_value() { + // Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´ + let resp_header = ResponseHeader { + request_id: (1u64 << 53u64) - 1u64, + status: 0xFFFFFFFFu32, + result: 0xFFFFFFFFu32, + }; + + // All numbers are always little-endian encoded, as the js side also wants this to be fixed + assert_eq!( + &Into::<[u8; 16]>::into(resp_header), + &[ + 255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255, 255, 255, 255, + 255 + ] + ); + } + + #[test] + fn request_header_from_bytes() { + let req_header = + RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9]) + .unwrap(); + + assert_eq!(req_header.request_id, 0x0102030405060708u64); + assert_eq!(req_header.argument, 0x090A0B0Cu32); + } + + #[test] + fn request_header_from_bytes_max_value() { + let req_header = RequestHeader::from_raw(&[ + 255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255, + ]) + .unwrap(); + + assert_eq!(req_header.request_id, (1u64 << 53u64) - 1u64); + assert_eq!(req_header.argument, 0xFFFFFFFFu32); + } + + #[test] + fn request_header_from_bytes_too_short() { + let req_header = + RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10]); + + assert_eq!(req_header, None); + } + + #[test] + fn request_header_from_bytes_long() { + let req_header = RequestHeader::from_raw(&[ + 8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 13, 14, 15, 16, 17, 18, 19, 20, 21, + ]) + .unwrap(); + + assert_eq!(req_header.request_id, 0x0102030405060708u64); + assert_eq!(req_header.argument, 0x090A0B0Cu32); + } +} diff --git a/runtime/ops/timers.rs b/runtime/ops/timers.rs index 34a3eea5fe3192..7c1718ce78c8d8 100644 --- a/runtime/ops/timers.rs +++ b/runtime/ops/timers.rs @@ -8,9 +8,6 @@ //! only need to be able to start, cancel and await a single timer (or Delay, as Tokio //! calls it) for an entire Isolate. This is what is implemented here. -use super::dispatch_minimal::minimal_op; -use super::dispatch_minimal::MinimalOp; -use crate::metrics::metrics_op; use crate::permissions::Permissions; use deno_core::error::type_error; use deno_core::error::AnyError; @@ -81,7 +78,7 @@ pub fn init(rt: &mut deno_core::JsRuntime) { super::reg_json_sync(rt, "op_global_timer_stop", op_global_timer_stop); super::reg_json_sync(rt, "op_global_timer_start", op_global_timer_start); super::reg_json_async(rt, "op_global_timer", op_global_timer); - rt.register_op("op_now", metrics_op("op_now", minimal_op(op_now))); + super::reg_buffer_sync(rt, "op_now", op_now); super::reg_json_sync(rt, "op_sleep_sync", op_sleep_sync); } @@ -143,21 +140,16 @@ async fn op_global_timer( // If the High precision flag is not set, the // nanoseconds are rounded on 2ms. fn op_now( - state: Rc>, - // Arguments are discarded - _sync: bool, - _x: i32, - mut zero_copy: BufVec, -) -> MinimalOp { + op_state: &mut OpState, + _argument: u32, + zero_copy: &mut [ZeroCopyBuf], +) -> Result { match zero_copy.len() { - 0 => return MinimalOp::Sync(Err(type_error("no buffer specified"))), + 0 => return Err(type_error("no buffer specified")), 1 => {} - _ => { - return MinimalOp::Sync(Err(type_error("Invalid number of arguments"))) - } + _ => return Err(type_error("Invalid number of arguments")), } - let op_state = state.borrow(); let start_time = op_state.borrow::(); let seconds = start_time.elapsed().as_secs(); let mut subsec_nanos = start_time.elapsed().subsec_nanos() as f64; @@ -174,7 +166,7 @@ fn op_now( (&mut zero_copy[0]).copy_from_slice(&result.to_be_bytes()); - MinimalOp::Sync(Ok(0)) + Ok(0) } #[derive(Deserialize)]