From f5a11a48606a805a6025db2feb8213c767bcb02e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Fri, 20 Sep 2019 23:45:28 +0200 Subject: [PATCH 01/34] minimal op registration in isolate --- core/examples/http_bench.js | 3 +- core/examples/http_bench.rs | 147 +++++++++++++++++++----------------- core/isolate.rs | 26 ++++++- core/lib.rs | 1 + core/ops.rs | 35 +++++++++ 5 files changed, 140 insertions(+), 72 deletions(-) create mode 100644 core/ops.rs diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 4c68f2be64e7bb..f502f4f6ca86e8 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -1,11 +1,12 @@ // This is not a real HTTP server. We read blindly one time into 'requestBuf', // then write this fixed 'responseBuf'. The point of this benchmark is to // exercise the event loop in a simple yet semi-realistic way. +// TODO: sync these ops via `Deno.core.ops`; const OP_LISTEN = 1; const OP_ACCEPT = 2; const OP_READ = 3; const OP_WRITE = 4; -const OP_CLOSE = 5; +const OP_CLOSE = 0; const requestBuf = new Uint8Array(64 * 1024); const responseBuf = new Uint8Array( "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 3c077562d68c40..1cb18d58d96e6c 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -36,12 +36,6 @@ impl log::Log for Logger { fn flush(&self) {} } -const OP_LISTEN: OpId = 1; -const OP_ACCEPT: OpId = 2; -const OP_READ: OpId = 3; -const OP_WRITE: OpId = 4; -const OP_CLOSE: OpId = 5; - #[derive(Clone, Debug, PartialEq)] pub struct Record { pub promise_id: i32, @@ -106,65 +100,46 @@ fn test_record_from() { pub type HttpBenchOp = dyn Future + Send; -fn dispatch( - op_id: OpId, - control: &[u8], +pub type HttpBenchOpHandler = fn( + is_sync: bool, + record: Record, zero_copy_buf: Option, -) -> CoreOp { - let record = Record::from(control); - let is_sync = record.promise_id == 0; - let http_bench_op = match op_id { - OP_LISTEN => { - assert!(is_sync); - op_listen() - } - OP_CLOSE => { - assert!(is_sync); - let rid = record.arg; - op_close(rid) - } - OP_ACCEPT => { - assert!(!is_sync); - let listener_rid = record.arg; - op_accept(listener_rid) - } - OP_READ => { - assert!(!is_sync); - let rid = record.arg; - op_read(rid, zero_copy_buf) - } - OP_WRITE => { - assert!(!is_sync); - let rid = record.arg; - op_write(rid, zero_copy_buf) - } - _ => panic!("bad op {}", op_id), - }; - let mut record_a = record.clone(); - let mut record_b = record.clone(); - - let fut = Box::new( - http_bench_op - .and_then(move |result| { - record_a.result = result; - Ok(record_a) - }) - .or_else(|err| -> Result { - eprintln!("unexpected err {}", err); - record_b.result = -1; - Ok(record_b) - }) - .then(|result| -> Result { - let record = result.unwrap(); - Ok(record.into()) - }), - ); +) -> Box; + +fn serialize_http_bench_op(handler: HttpBenchOpHandler) -> Box { + let serialized_op = + move |control: &[u8], zero_copy_buf: Option| -> CoreOp { + let record = Record::from(control); + let is_sync = record.promise_id == 0; + let op = handler(is_sync, record.clone(), zero_copy_buf); + + let mut record_a = record.clone(); + let mut record_b = record.clone(); + + let fut = Box::new( + op.and_then(move |result| { + record_a.result = result; + Ok(record_a) + }) + .or_else(|err| -> Result { + eprintln!("unexpected err {}", err); + record_b.result = -1; + Ok(record_b) + }) + .then(|result| -> Result { + let record = result.unwrap(); + Ok(record.into()) + }), + ); + + if is_sync { + Op::Sync(fut.wait().unwrap()) + } else { + Op::Async(fut) + } + }; - if is_sync { - Op::Sync(fut.wait().unwrap()) - } else { - Op::Async(fut) - } + Box::new(serialized_op) } fn main() { @@ -181,7 +156,11 @@ fn main() { }); let mut isolate = deno::Isolate::new(startup_data, false); - isolate.set_dispatch(dispatch); + isolate.register_op("close", serialize_http_bench_op(op_close)); + isolate.register_op("listen", serialize_http_bench_op(op_listen)); + isolate.register_op("accept", serialize_http_bench_op(op_accept)); + isolate.register_op("read", serialize_http_bench_op(op_read)); + isolate.register_op("write", serialize_http_bench_op(op_write)); isolate.then(|r| { js_check(r); @@ -225,7 +204,13 @@ fn new_rid() -> i32 { rid as i32 } -fn op_accept(listener_rid: i32) -> Box { +fn op_accept( + is_sync: bool, + record: Record, + _zero_copy_buf: Option, +) -> Box { + assert!(!is_sync); + let listener_rid = record.arg; debug!("accept {}", listener_rid); Box::new( futures::future::poll_fn(move || { @@ -248,9 +233,13 @@ fn op_accept(listener_rid: i32) -> Box { ) } -fn op_listen() -> Box { +fn op_listen( + is_sync: bool, + _record: Record, + _zero_copy_buf: Option, +) -> Box { + assert!(is_sync); debug!("listen"); - Box::new(lazy(move || { let addr = "127.0.0.1:4544".parse::().unwrap(); let listener = tokio::net::TcpListener::bind(&addr).unwrap(); @@ -262,8 +251,14 @@ fn op_listen() -> Box { })) } -fn op_close(rid: i32) -> Box { +fn op_close( + is_sync: bool, + record: Record, + _zero_copy_buf: Option, +) -> Box { + assert!(is_sync); debug!("close"); + let rid = record.arg; Box::new(lazy(move || { let mut table = RESOURCE_TABLE.lock().unwrap(); let r = table.remove(&rid); @@ -272,7 +267,13 @@ fn op_close(rid: i32) -> Box { })) } -fn op_read(rid: i32, zero_copy_buf: Option) -> Box { +fn op_read( + is_sync: bool, + record: Record, + zero_copy_buf: Option, +) -> Box { + assert!(!is_sync); + let rid = record.arg; debug!("read rid={}", rid); let mut zero_copy_buf = zero_copy_buf.unwrap(); Box::new( @@ -293,7 +294,13 @@ fn op_read(rid: i32, zero_copy_buf: Option) -> Box { ) } -fn op_write(rid: i32, zero_copy_buf: Option) -> Box { +fn op_write( + is_sync: bool, + record: Record, + zero_copy_buf: Option, +) -> Box { + assert!(!is_sync); + let rid = record.arg; debug!("write rid={}", rid); let zero_copy_buf = zero_copy_buf.unwrap(); Box::new( diff --git a/core/isolate.rs b/core/isolate.rs index bad79b5793c1c0..86a6b4f06143c8 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -17,6 +17,7 @@ use crate::libdeno::OpId; use crate::libdeno::PinnedBuf; use crate::libdeno::Snapshot1; use crate::libdeno::Snapshot2; +use crate::ops::OpRegistry; use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use futures::stream::FuturesUnordered; @@ -54,6 +55,8 @@ pub type OpResult = Result, E>; /// Args: op_id, control_buf, zero_copy_buf type CoreDispatchFn = dyn Fn(OpId, &[u8], Option) -> CoreOp; +/// Main type describing op +pub type CoreOpHandler = dyn Fn(&[u8], Option) -> CoreOp; /// Stores a script used to initalize a Isolate pub struct Script<'a> { @@ -179,6 +182,7 @@ pub struct Isolate { pending_dyn_imports: FuturesUnordered>, have_unpolled_ops: bool, startup_script: Option, + op_registry: OpRegistry, } unsafe impl Send for Isolate {} @@ -244,6 +248,7 @@ impl Isolate { have_unpolled_ops: false, pending_dyn_imports: FuturesUnordered::new(), startup_script, + op_registry: OpRegistry::default(), } } @@ -257,6 +262,21 @@ impl Isolate { self.dispatch = Some(Arc::new(f)); } + pub fn register_op(&mut self, name: &str, op: Box) -> OpId { + self.op_registry.register_op(name, op) + } + + pub fn call_op( + &self, + op_id: OpId, + control: &[u8], + zero_copy_buf: Option, + ) -> CoreOp { + let ops = &self.op_registry.ops; + let op_handler = &*ops.get(op_id as usize).expect("Op not found!"); + op_handler(control, zero_copy_buf) + } + pub fn set_dyn_import(&mut self, f: F) where F: Fn(deno_dyn_import_id, &str, &str) -> DynImportStream @@ -331,7 +351,11 @@ impl Isolate { let op = if let Some(ref f) = isolate.dispatch { f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf)) } else { - panic!("isolate.dispatch not set") + isolate.call_op( + op_id, + control_buf.as_ref(), + PinnedBuf::new(zero_copy_buf), + ) }; debug_assert_eq!(isolate.shared.size(), 0); diff --git a/core/lib.rs b/core/lib.rs index 9be1c3891789c5..56d38497a874d2 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -11,6 +11,7 @@ mod js_errors; mod libdeno; mod module_specifier; mod modules; +mod ops; mod shared_queue; pub use crate::any_error::*; diff --git a/core/ops.rs b/core/ops.rs new file mode 100644 index 00000000000000..12aed2b10a26a5 --- /dev/null +++ b/core/ops.rs @@ -0,0 +1,35 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::CoreOpHandler; +#[allow(dead_code)] +use crate::OpId; +use std::collections::HashMap; + +#[derive(Default)] +pub struct OpRegistry { + pub ops: Vec>, + pub phone_book: HashMap, +} + +impl OpRegistry { + #[allow(dead_code)] + pub fn get_op_map(&self) -> HashMap { + self.phone_book.clone() + } + + pub fn register_op( + &mut self, + name: &str, + serialized_op: Box, + ) -> OpId { + let op_id = self.ops.len() as u32; + + self + .phone_book + .entry(name.to_string()) + .and_modify(|_| panic!("Op already registered {}", op_id)) + .or_insert(op_id); + + self.ops.push(serialized_op); + op_id + } +} From 6375869507ecc1864c3f649a4caba3a5c0e70e87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 21 Sep 2019 00:32:36 +0200 Subject: [PATCH 02/34] mock getOpMap --- core/examples/http_bench.js | 62 ++++++++++++++++++++++++++++++++++--- core/ops.rs | 1 - 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index f502f4f6ca86e8..764432371ad4ce 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -16,6 +16,43 @@ const responseBuf = new Uint8Array( const promiseMap = new Map(); let nextPromiseId = 1; +const opRegistry = []; + +class Op { + constructor(name, handler) { + this.name = name; + this.handler = handler; + this.opId = 0; + opRegistry.push(this); + } + + setOpId(opId) { + this.opId = opId; + } + + /** Returns i32 number */ + sendSync(arg, zeroCopy = null) { + const buf = send(0, this.opId, arg, zeroCopy); + const record = recordFromBuf(buf); + return record.result; + } + + /** Returns Promise */ + sendAsync(arg, zeroCopy = null) { + const promiseId = nextPromiseId++; + const p = createResolvable(); + promiseMap.set(promiseId, p); + send(promiseId, this.opId, arg, zeroCopy); + return p; + } +} + +const opListen = new Op("listen"); +const opAccept = new Op("accept"); +const opClose = new Op("close"); +const opRead = new Op("read"); +const opWrite = new Op("write"); + function assert(cond) { if (!cond) { throw Error("assert"); @@ -81,12 +118,12 @@ function handleAsyncMsgFromRust(opId, buf) { /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { - return sendSync(OP_LISTEN, -1); + return sendSync(opListen.opId, -1); } /** Accepts a connection, returns rid. */ async function accept(rid) { - return await sendAsync(OP_ACCEPT, rid); + return await sendAsync(opAccept.opId, rid); } /** @@ -94,16 +131,16 @@ async function accept(rid) { * Returns bytes read. */ async function read(rid, data) { - return await sendAsync(OP_READ, rid, data); + return await sendAsync(opRead.opId, rid, data); } /** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ async function write(rid, data) { - return await sendAsync(OP_WRITE, rid, data); + return await sendAsync(opWrite.opId, rid, data); } function close(rid) { - return sendSync(OP_CLOSE, rid); + return sendSync(opClose.opId, rid); } async function serve(rid) { @@ -121,9 +158,24 @@ async function serve(rid) { close(rid); } +// TODO: this should be acquired from Rust via `Deno.core.getOpMap()` +const opMap = { + listen: 1, + accept: 2, + read: 3, + write: 4, + close: 0 +}; + async function main() { Deno.core.setAsyncHandler(handleAsyncMsgFromRust); + // TODO: poor man's Deno.core.getOpMap() + for (const [key, opId] of Object.entries(opMap)) { + const op = opRegistry.find(el => el.name === key); + op.setOpId(opId); + } + Deno.core.print("http_bench.js start\n"); const listenerRid = listen(); diff --git a/core/ops.rs b/core/ops.rs index 12aed2b10a26a5..76bc4ebff008cb 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -1,6 +1,5 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use crate::CoreOpHandler; -#[allow(dead_code)] use crate::OpId; use std::collections::HashMap; From 425e83d3f68a11b5543c5d45601716fa2fd16433 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 21 Sep 2019 01:06:54 +0200 Subject: [PATCH 03/34] minimal op map fetching --- core/examples/http_bench.js | 75 +++++------------------------- core/examples/http_bench.rs | 2 +- core/isolate.rs | 8 +++- core/ops.rs | 15 +++++- core/shared_queue.js | 7 ++- deno_typescript/lib.deno_core.d.ts | 2 + 6 files changed, 42 insertions(+), 67 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 764432371ad4ce..d1d3c94dbbfdf4 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -1,12 +1,11 @@ // This is not a real HTTP server. We read blindly one time into 'requestBuf', // then write this fixed 'responseBuf'. The point of this benchmark is to // exercise the event loop in a simple yet semi-realistic way. -// TODO: sync these ops via `Deno.core.ops`; -const OP_LISTEN = 1; -const OP_ACCEPT = 2; -const OP_READ = 3; -const OP_WRITE = 4; -const OP_CLOSE = 0; +const OP_LISTEN = Deno.core.opsMap["listen"]; +const OP_ACCEPT = Deno.core.opsMap["accept"]; +const OP_READ = Deno.core.opsMap["read"]; +const OP_WRITE = Deno.core.opsMap["write"]; +const OP_CLOSE = Deno.core.opsMap["close"]; const requestBuf = new Uint8Array(64 * 1024); const responseBuf = new Uint8Array( "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" @@ -16,43 +15,6 @@ const responseBuf = new Uint8Array( const promiseMap = new Map(); let nextPromiseId = 1; -const opRegistry = []; - -class Op { - constructor(name, handler) { - this.name = name; - this.handler = handler; - this.opId = 0; - opRegistry.push(this); - } - - setOpId(opId) { - this.opId = opId; - } - - /** Returns i32 number */ - sendSync(arg, zeroCopy = null) { - const buf = send(0, this.opId, arg, zeroCopy); - const record = recordFromBuf(buf); - return record.result; - } - - /** Returns Promise */ - sendAsync(arg, zeroCopy = null) { - const promiseId = nextPromiseId++; - const p = createResolvable(); - promiseMap.set(promiseId, p); - send(promiseId, this.opId, arg, zeroCopy); - return p; - } -} - -const opListen = new Op("listen"); -const opAccept = new Op("accept"); -const opClose = new Op("close"); -const opRead = new Op("read"); -const opWrite = new Op("write"); - function assert(cond) { if (!cond) { throw Error("assert"); @@ -118,12 +80,12 @@ function handleAsyncMsgFromRust(opId, buf) { /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { - return sendSync(opListen.opId, -1); + return sendSync(OP_LISTEN, -1); } /** Accepts a connection, returns rid. */ async function accept(rid) { - return await sendAsync(opAccept.opId, rid); + return await sendAsync(OP_ACCEPT, rid); } /** @@ -131,16 +93,16 @@ async function accept(rid) { * Returns bytes read. */ async function read(rid, data) { - return await sendAsync(opRead.opId, rid, data); + return await sendAsync(OP_READ, rid, data); } /** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ async function write(rid, data) { - return await sendAsync(opWrite.opId, rid, data); + return await sendAsync(OP_WRITE, rid, data); } function close(rid) { - return sendSync(opClose.opId, rid); + return sendSync(OP_CLOSE, rid); } async function serve(rid) { @@ -158,26 +120,13 @@ async function serve(rid) { close(rid); } -// TODO: this should be acquired from Rust via `Deno.core.getOpMap()` -const opMap = { - listen: 1, - accept: 2, - read: 3, - write: 4, - close: 0 -}; - async function main() { Deno.core.setAsyncHandler(handleAsyncMsgFromRust); - // TODO: poor man's Deno.core.getOpMap() - for (const [key, opId] of Object.entries(opMap)) { - const op = opRegistry.find(el => el.name === key); - op.setOpId(opId); - } - Deno.core.print("http_bench.js start\n"); + Deno.core.print("ops map " + JSON.stringify(Deno.core.opsMap) + "\n"); + const listenerRid = listen(); Deno.core.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}\n`); while (true) { diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 1cb18d58d96e6c..bf32ad8b0612c3 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -156,11 +156,11 @@ fn main() { }); let mut isolate = deno::Isolate::new(startup_data, false); - isolate.register_op("close", serialize_http_bench_op(op_close)); isolate.register_op("listen", serialize_http_bench_op(op_listen)); isolate.register_op("accept", serialize_http_bench_op(op_accept)); isolate.register_op("read", serialize_http_bench_op(op_read)); isolate.register_op("write", serialize_http_bench_op(op_write)); + isolate.register_op("close", serialize_http_bench_op(op_close)); isolate.then(|r| { js_check(r); diff --git a/core/isolate.rs b/core/isolate.rs index 86a6b4f06143c8..8c7404c00c3259 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -248,7 +248,7 @@ impl Isolate { have_unpolled_ops: false, pending_dyn_imports: FuturesUnordered::new(), startup_script, - op_registry: OpRegistry::default(), + op_registry: OpRegistry::new(), } } @@ -272,6 +272,12 @@ impl Isolate { control: &[u8], zero_copy_buf: Option, ) -> CoreOp { + if op_id == 0 { + let op_map = self.op_registry.get_op_map(); + let op_map_json = serde_json::to_string(&op_map).unwrap(); + let buf = op_map_json.as_bytes().to_owned().into_boxed_slice(); + return Op::Sync(buf); + } let ops = &self.op_registry.ops; let op_handler = &*ops.get(op_id as usize).expect("Op not found!"); op_handler(control, zero_copy_buf) diff --git a/core/ops.rs b/core/ops.rs index 76bc4ebff008cb..c4b851060978e6 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -1,6 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::CoreOp; use crate::CoreOpHandler; +use crate::Op; use crate::OpId; +use crate::PinnedBuf; use std::collections::HashMap; #[derive(Default)] @@ -9,8 +12,18 @@ pub struct OpRegistry { pub phone_book: HashMap, } +fn get_op_map(_control: &[u8], _zero_copy_buf: Option) -> CoreOp { + Op::Sync(Box::new([])) +} + impl OpRegistry { - #[allow(dead_code)] + pub fn new() -> Self { + // TODO: this is make shift fix for get op map + let mut registry = Self::default(); + registry.register_op("get_op_map", Box::new(get_op_map)); + registry + } + pub fn get_op_map(&self) -> HashMap { self.phone_book.clone() } diff --git a/core/shared_queue.js b/core/shared_queue.js index 22a64a312bd4c7..d75fc8a5c84485 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -178,6 +178,10 @@ SharedQueue Binary Layout return Deno.core.send(opId, control, zeroCopy); } + const opsMapBytes = dispatch(0, []); + const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); + const opsMap = JSON.parse(opsMapJson); + const denoCore = { setAsyncHandler, dispatch, @@ -189,7 +193,8 @@ SharedQueue Binary Layout push, reset, shift - } + }, + opsMap }; assert(window[GLOBAL_NAMESPACE] != null); diff --git a/deno_typescript/lib.deno_core.d.ts b/deno_typescript/lib.deno_core.d.ts index 0bd3b6415bd7bc..c884e9101e4887 100644 --- a/deno_typescript/lib.deno_core.d.ts +++ b/deno_typescript/lib.deno_core.d.ts @@ -37,6 +37,8 @@ declare interface DenoCore { shift(): Uint8Array | null; }; + opsMap: Record; + recv(cb: MessageCallback): void; send( From ac1df446fa439908b074e99370267f861cec141b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 21 Sep 2019 01:26:41 +0200 Subject: [PATCH 04/34] add Deno.core.refreshOpsMap --- core/examples/http_bench.js | 1 + core/shared_queue.js | 11 +++++++---- deno_typescript/lib.deno_core.d.ts | 1 + 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index d1d3c94dbbfdf4..8cac1b904e2a92 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -1,6 +1,7 @@ // This is not a real HTTP server. We read blindly one time into 'requestBuf', // then write this fixed 'responseBuf'. The point of this benchmark is to // exercise the event loop in a simple yet semi-realistic way. +Deno.core.refreshOpsMap(); const OP_LISTEN = Deno.core.opsMap["listen"]; const OP_ACCEPT = Deno.core.opsMap["accept"]; const OP_READ = Deno.core.opsMap["read"]; diff --git a/core/shared_queue.js b/core/shared_queue.js index d75fc8a5c84485..a6f27a222dfa8c 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -178,9 +178,11 @@ SharedQueue Binary Layout return Deno.core.send(opId, control, zeroCopy); } - const opsMapBytes = dispatch(0, []); - const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); - const opsMap = JSON.parse(opsMapJson); + function refreshOpsMap() { + const opsMapBytes = dispatch(0, []); + const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); + Deno.core.opsMap = JSON.parse(opsMapJson); + } const denoCore = { setAsyncHandler, @@ -194,7 +196,8 @@ SharedQueue Binary Layout reset, shift }, - opsMap + opsMap: {}, + refreshOpsMap }; assert(window[GLOBAL_NAMESPACE] != null); diff --git a/deno_typescript/lib.deno_core.d.ts b/deno_typescript/lib.deno_core.d.ts index c884e9101e4887..b4e0d0c7e5bc4c 100644 --- a/deno_typescript/lib.deno_core.d.ts +++ b/deno_typescript/lib.deno_core.d.ts @@ -37,6 +37,7 @@ declare interface DenoCore { shift(): Uint8Array | null; }; + refreshOpsMap(): void; opsMap: Record; recv(cb: MessageCallback): void; From 9cc3b31ea6873651300eba2cd94c55bf255f5f75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 21 Sep 2019 11:46:42 +0200 Subject: [PATCH 05/34] phone_book -> op_map --- core/ops.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/ops.rs b/core/ops.rs index c4b851060978e6..5d1826b6b5712b 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -9,7 +9,7 @@ use std::collections::HashMap; #[derive(Default)] pub struct OpRegistry { pub ops: Vec>, - pub phone_book: HashMap, + pub op_map: HashMap, } fn get_op_map(_control: &[u8], _zero_copy_buf: Option) -> CoreOp { @@ -25,7 +25,7 @@ impl OpRegistry { } pub fn get_op_map(&self) -> HashMap { - self.phone_book.clone() + self.op_map.clone() } pub fn register_op( @@ -36,7 +36,7 @@ impl OpRegistry { let op_id = self.ops.len() as u32; self - .phone_book + .op_map .entry(name.to_string()) .and_modify(|_| panic!("Op already registered {}", op_id)) .or_insert(op_id); From ef6116139d0ce8ca7d9babd055a589bbb3a3a7b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 21 Sep 2019 18:10:05 +0200 Subject: [PATCH 06/34] minimal js ops --- core/examples/http_bench.js | 97 ++++++++++++++++++------------ core/shared_queue.js | 44 +++++++++++--- deno_typescript/lib.deno_core.d.ts | 5 +- 3 files changed, 95 insertions(+), 51 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 8cac1b904e2a92..117be3393756c5 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -1,12 +1,58 @@ // This is not a real HTTP server. We read blindly one time into 'requestBuf', // then write this fixed 'responseBuf'. The point of this benchmark is to // exercise the event loop in a simple yet semi-realistic way. -Deno.core.refreshOpsMap(); -const OP_LISTEN = Deno.core.opsMap["listen"]; -const OP_ACCEPT = Deno.core.opsMap["accept"]; -const OP_READ = Deno.core.opsMap["read"]; -const OP_WRITE = Deno.core.opsMap["write"]; -const OP_CLOSE = Deno.core.opsMap["close"]; +class Op { + constructor(name) { + this.name = name; + this.opId = 0; + Deno.core.registerOp(this); + } + + setOpId(opId) { + this.opId = opId; + } +} + +class HttpOp extends Op { + static handleAsyncMsgFromRust(opId, buf) { + const record = recordFromBuf(buf); + const { promiseId, result } = record; + const p = promiseMap.get(promiseId); + promiseMap.delete(promiseId); + p.resolve(result); + } + + /** Returns i32 number */ + static sendSync(opId, arg, zeroCopy) { + const buf = send(0, opId, arg, zeroCopy); + const record = recordFromBuf(buf); + return record.result; + } + + /** Returns Promise */ + static sendAsync(opId, arg, zeroCopy = null) { + const promiseId = nextPromiseId++; + const p = createResolvable(); + promiseMap.set(promiseId, p); + send(promiseId, opId, arg, zeroCopy); + return p; + } + + sendSync(arg, zeroCopy = null) { + return HttpOp.sendSync(this.opId, arg, zeroCopy); + } + + sendAsync(arg, zeroCopy = null) { + return HttpOp.sendAsync(this.opId, arg, zeroCopy); + } +} + +const OP_LISTEN = new HttpOp("listen"); +const OP_ACCEPT = new HttpOp("accept"); +const OP_READ = new HttpOp("read"); +const OP_WRITE = new HttpOp("write"); +const OP_CLOSE = new HttpOp("close"); + const requestBuf = new Uint8Array(64 * 1024); const responseBuf = new Uint8Array( "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" @@ -45,15 +91,6 @@ function send(promiseId, opId, arg, zeroCopy = null) { return Deno.core.dispatch(opId, scratchBytes, zeroCopy); } -/** Returns Promise */ -function sendAsync(opId, arg, zeroCopy = null) { - const promiseId = nextPromiseId++; - const p = createResolvable(); - promiseMap.set(promiseId, p); - send(promiseId, opId, arg, zeroCopy); - return p; -} - function recordFromBuf(buf) { assert(buf.byteLength === 3 * 4); const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); @@ -64,29 +101,14 @@ function recordFromBuf(buf) { }; } -/** Returns i32 number */ -function sendSync(opId, arg) { - const buf = send(0, opId, arg); - const record = recordFromBuf(buf); - return record.result; -} - -function handleAsyncMsgFromRust(opId, buf) { - const record = recordFromBuf(buf); - const { promiseId, result } = record; - const p = promiseMap.get(promiseId); - promiseMap.delete(promiseId); - p.resolve(result); -} - /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { - return sendSync(OP_LISTEN, -1); + return OP_LISTEN.sendSync(-1); } /** Accepts a connection, returns rid. */ async function accept(rid) { - return await sendAsync(OP_ACCEPT, rid); + return await OP_ACCEPT.sendAsync(rid); } /** @@ -94,16 +116,16 @@ async function accept(rid) { * Returns bytes read. */ async function read(rid, data) { - return await sendAsync(OP_READ, rid, data); + return await OP_READ.sendAsync(rid, data); } /** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ async function write(rid, data) { - return await sendAsync(OP_WRITE, rid, data); + return await OP_WRITE.sendAsync(rid, data); } function close(rid) { - return sendSync(OP_CLOSE, rid); + return OP_CLOSE.sendSync(rid); } async function serve(rid) { @@ -122,12 +144,9 @@ async function serve(rid) { } async function main() { - Deno.core.setAsyncHandler(handleAsyncMsgFromRust); - + Deno.core.initOps(); Deno.core.print("http_bench.js start\n"); - Deno.core.print("ops map " + JSON.stringify(Deno.core.opsMap) + "\n"); - const listenerRid = listen(); Deno.core.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}\n`); while (true) { diff --git a/core/shared_queue.js b/core/shared_queue.js index a6f27a222dfa8c..a9b07576a10115 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -38,6 +38,9 @@ SharedQueue Binary Layout let sharedBytes; let shared32; + let opsMap; + const ops = []; + let opsCb; let initialized = false; function maybeInit() { @@ -58,6 +61,33 @@ SharedQueue Binary Layout Deno.core.recv(handleAsyncMsgFromRust); } + function registerOp(op) { + ops.push(op); + } + + function initOps() { + const opsMapBytes = Deno.core.send(0, new Uint8Array([]), null); + // core.print("op map bytes" + opsMapBytes + "\n"); + const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); + // core.print("op map" + opsMapJson + "\n"); + opsMap = JSON.parse(opsMapJson); + // core.print("ops map parsed" + opsMap + "\n"); + const opVector = []; + + for (const [name, opId] of Object.entries(opsMap)) { + const op = ops.find(op => op.name === name); + // core.print("op, name: " + name + " op: " + op + " opId: " + opId + "\n"); + if (!op) { + continue; + } + + op.setOpId(opId); + opVector[opId] = op.constructor.handleAsyncMsgFromRust; + } + + opsCb = opVector; + } + function assert(cond) { if (!cond) { throw Error("assert"); @@ -161,14 +191,14 @@ SharedQueue Binary Layout function handleAsyncMsgFromRust(opId, buf) { if (buf) { // This is the overflow_response case of deno::Isolate::poll(). - asyncHandler(opId, buf); + opsCb[opId](opId, buf); } else { while (true) { const opIdBuf = shift(); if (opIdBuf == null) { break; } - asyncHandler(...opIdBuf); + opsCb[opIdBuf[0]](...opIdBuf); } } } @@ -178,12 +208,6 @@ SharedQueue Binary Layout return Deno.core.send(opId, control, zeroCopy); } - function refreshOpsMap() { - const opsMapBytes = dispatch(0, []); - const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); - Deno.core.opsMap = JSON.parse(opsMapJson); - } - const denoCore = { setAsyncHandler, dispatch, @@ -196,8 +220,8 @@ SharedQueue Binary Layout reset, shift }, - opsMap: {}, - refreshOpsMap + initOps, + registerOp }; assert(window[GLOBAL_NAMESPACE] != null); diff --git a/deno_typescript/lib.deno_core.d.ts b/deno_typescript/lib.deno_core.d.ts index b4e0d0c7e5bc4c..9e51953a4569b6 100644 --- a/deno_typescript/lib.deno_core.d.ts +++ b/deno_typescript/lib.deno_core.d.ts @@ -37,8 +37,9 @@ declare interface DenoCore { shift(): Uint8Array | null; }; - refreshOpsMap(): void; - opsMap: Record; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + registerOp(op: any): void; + initOps(): void; recv(cb: MessageCallback): void; From c64cf04396af371ba07edda526de482f055bf473 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 21 Sep 2019 18:23:08 +0200 Subject: [PATCH 07/34] dual dispatch --- core/shared_queue.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/shared_queue.js b/core/shared_queue.js index a9b07576a10115..7c5f69c4ead722 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -191,14 +191,16 @@ SharedQueue Binary Layout function handleAsyncMsgFromRust(opId, buf) { if (buf) { // This is the overflow_response case of deno::Isolate::poll(). - opsCb[opId](opId, buf); + const cb = asyncHandler ? asyncHandler : opsCb[opId]; + cb(opId, buf); } else { while (true) { const opIdBuf = shift(); if (opIdBuf == null) { break; } - opsCb[opIdBuf[0]](...opIdBuf); + const cb = asyncHandler ? asyncHandler : opsCb[opIdBuf[0]]; + cb(...opIdBuf); } } } From 6480d8a32d86d0315b56774ef46feb528f130b1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 21 Sep 2019 21:40:42 +0200 Subject: [PATCH 08/34] move Op to Deno.core --- core/examples/http_bench.js | 14 +------ core/shared_queue.js | 61 ++++++++++++++++++++---------- deno_typescript/lib.deno_core.d.ts | 24 +++++++++++- 3 files changed, 64 insertions(+), 35 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 117be3393756c5..06f74f241d3c9f 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -1,19 +1,7 @@ // This is not a real HTTP server. We read blindly one time into 'requestBuf', // then write this fixed 'responseBuf'. The point of this benchmark is to // exercise the event loop in a simple yet semi-realistic way. -class Op { - constructor(name) { - this.name = name; - this.opId = 0; - Deno.core.registerOp(this); - } - - setOpId(opId) { - this.opId = opId; - } -} - -class HttpOp extends Op { +class HttpOp extends Deno.core.Op { static handleAsyncMsgFromRust(opId, buf) { const record = recordFromBuf(buf); const { promiseId, result } = record; diff --git a/core/shared_queue.js b/core/shared_queue.js index 7c5f69c4ead722..e3c7f39995ddff 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -38,9 +38,9 @@ SharedQueue Binary Layout let sharedBytes; let shared32; - let opsMap; - const ops = []; - let opsCb; + let rustOpsMap; + const jsOpsMap = new Map(); + let jsOpsAsyncHandlers; let initialized = false; function maybeInit() { @@ -61,22 +61,15 @@ SharedQueue Binary Layout Deno.core.recv(handleAsyncMsgFromRust); } - function registerOp(op) { - ops.push(op); - } - function initOps() { const opsMapBytes = Deno.core.send(0, new Uint8Array([]), null); - // core.print("op map bytes" + opsMapBytes + "\n"); const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); - // core.print("op map" + opsMapJson + "\n"); - opsMap = JSON.parse(opsMapJson); - // core.print("ops map parsed" + opsMap + "\n"); - const opVector = []; - - for (const [name, opId] of Object.entries(opsMap)) { - const op = ops.find(op => op.name === name); - // core.print("op, name: " + name + " op: " + op + " opId: " + opId + "\n"); + rustOpsMap = JSON.parse(opsMapJson); + const opVector = new Array(Object.keys(rustOpsMap).length); + + for (const [name, opId] of Object.entries(rustOpsMap)) { + const op = jsOpsMap.get(name); + if (!op) { continue; } @@ -85,7 +78,7 @@ SharedQueue Binary Layout opVector[opId] = op.constructor.handleAsyncMsgFromRust; } - opsCb = opVector; + jsOpsAsyncHandlers = opVector; } function assert(cond) { @@ -191,7 +184,7 @@ SharedQueue Binary Layout function handleAsyncMsgFromRust(opId, buf) { if (buf) { // This is the overflow_response case of deno::Isolate::poll(). - const cb = asyncHandler ? asyncHandler : opsCb[opId]; + const cb = asyncHandler ? asyncHandler : jsOpsAsyncHandlers[opId]; cb(opId, buf); } else { while (true) { @@ -199,7 +192,7 @@ SharedQueue Binary Layout if (opIdBuf == null) { break; } - const cb = asyncHandler ? asyncHandler : opsCb[opIdBuf[0]]; + const cb = asyncHandler ? asyncHandler : jsOpsAsyncHandlers[opIdBuf[0]]; cb(...opIdBuf); } } @@ -210,6 +203,34 @@ SharedQueue Binary Layout return Deno.core.send(opId, control, zeroCopy); } + class Op { + constructor(name) { + if (typeof jsOpsMap.get(name) !== "undefined") { + throw new Error(`Duplicate op: ${name}`); + } + + this.name = name; + this.opId = 0; + jsOpsMap.set(name, this); + } + + setOpId(opId) { + this.opId = opId; + } + + static handleAsyncMsgFromRust(_opId, _buf) { + throw new Error("Unimplemented"); + } + + static sendSync(_opId, _control, _zeroCopy = null) { + throw new Error("Unimplemented"); + } + + static sendAsync(_opId, _control, _zeroCopy = null) { + throw new Error("Unimplemented"); + } + } + const denoCore = { setAsyncHandler, dispatch, @@ -223,7 +244,7 @@ SharedQueue Binary Layout shift }, initOps, - registerOp + Op }; assert(window[GLOBAL_NAMESPACE] != null); diff --git a/deno_typescript/lib.deno_core.d.ts b/deno_typescript/lib.deno_core.d.ts index 9e51953a4569b6..c1e8798c545f2d 100644 --- a/deno_typescript/lib.deno_core.d.ts +++ b/deno_typescript/lib.deno_core.d.ts @@ -20,6 +20,27 @@ interface EvalErrorInfo { thrown: any; } +declare type OpId = number; + +declare class Op { + name: string; + opId: OpId; + + constructor(name: string); + + setOpId(opId: Opid): void; + + static handleAsyncMsgFromRust(opId: OpId, buf: Uint8Array): void; + + static sendSync(opId: OpId, control: Uint8Array, zeroCopy?: Uint8Array): void; + + static sendAsync( + opId: OpId, + control: Uint8Array, + zeroCopy?: Uint8Array + ): void; +} + declare interface DenoCore { print(s: string, isErr?: boolean); dispatch( @@ -37,8 +58,7 @@ declare interface DenoCore { shift(): Uint8Array | null; }; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - registerOp(op: any): void; + Op: Op; initOps(): void; recv(cb: MessageCallback): void; From 1c45ed95e543ee7eef1fef4a9db1348523ab95c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 21 Sep 2019 21:45:24 +0200 Subject: [PATCH 09/34] move op related types to core/ops.rs --- core/isolate.rs | 23 +---------------------- core/lib.rs | 1 + core/ops.rs | 27 +++++++++++++++++++++++---- 3 files changed, 25 insertions(+), 26 deletions(-) diff --git a/core/isolate.rs b/core/isolate.rs index 8c7404c00c3259..4890b9801582a2 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -13,11 +13,10 @@ use crate::libdeno::deno_buf; use crate::libdeno::deno_dyn_import_id; use crate::libdeno::deno_mod; use crate::libdeno::deno_pinned_buf; -use crate::libdeno::OpId; use crate::libdeno::PinnedBuf; use crate::libdeno::Snapshot1; use crate::libdeno::Snapshot2; -use crate::ops::OpRegistry; +use crate::ops::*; use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use futures::stream::FuturesUnordered; @@ -35,28 +34,8 @@ use std::fmt; use std::ptr::null; use std::sync::{Arc, Mutex, Once}; -pub type Buf = Box<[u8]>; - -pub type OpAsyncFuture = Box + Send>; - -type PendingOpFuture = - Box + Send>; - -pub enum Op { - Sync(Buf), - Async(OpAsyncFuture), -} - -pub type CoreError = (); - -pub type CoreOp = Op; - -pub type OpResult = Result, E>; - /// Args: op_id, control_buf, zero_copy_buf type CoreDispatchFn = dyn Fn(OpId, &[u8], Option) -> CoreOp; -/// Main type describing op -pub type CoreOpHandler = dyn Fn(&[u8], Option) -> CoreOp; /// Stores a script used to initalize a Isolate pub struct Script<'a> { diff --git a/core/lib.rs b/core/lib.rs index 56d38497a874d2..42a692f1a10b53 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -23,6 +23,7 @@ pub use crate::libdeno::OpId; pub use crate::libdeno::PinnedBuf; pub use crate::module_specifier::*; pub use crate::modules::*; +pub use crate::ops::*; pub fn v8_version() -> &'static str { use std::ffi::CStr; diff --git a/core/ops.rs b/core/ops.rs index 5d1826b6b5712b..8b01a20d77c968 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -1,11 +1,30 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::CoreOp; -use crate::CoreOpHandler; -use crate::Op; -use crate::OpId; +pub use crate::libdeno::OpId; use crate::PinnedBuf; +use futures::Future; use std::collections::HashMap; +pub type Buf = Box<[u8]>; + +pub type OpAsyncFuture = Box + Send>; + +pub(crate) type PendingOpFuture = + Box + Send>; + +pub type OpResult = Result, E>; + +pub enum Op { + Sync(Buf), + Async(OpAsyncFuture), +} + +pub type CoreError = (); + +pub type CoreOp = Op; + +/// Main type describing op +pub type CoreOpHandler = dyn Fn(&[u8], Option) -> CoreOp; + #[derive(Default)] pub struct OpRegistry { pub ops: Vec>, From e33d050c5aa3f737e99cc6fb16f4ff23aec44e3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 21 Sep 2019 23:24:15 +0200 Subject: [PATCH 10/34] fixes --- core/shared_queue.rs | 2 +- deno_typescript/lib.deno_core.d.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/shared_queue.rs b/core/shared_queue.rs index 5f9554ad2140fb..dbb738f15d5be5 100644 --- a/core/shared_queue.rs +++ b/core/shared_queue.rs @@ -196,7 +196,7 @@ impl SharedQueue { #[cfg(test)] mod tests { use super::*; - use crate::isolate::Buf; + use crate::ops::Buf; #[test] fn basic() { diff --git a/deno_typescript/lib.deno_core.d.ts b/deno_typescript/lib.deno_core.d.ts index c1e8798c545f2d..1961f385e212fd 100644 --- a/deno_typescript/lib.deno_core.d.ts +++ b/deno_typescript/lib.deno_core.d.ts @@ -28,7 +28,7 @@ declare class Op { constructor(name: string); - setOpId(opId: Opid): void; + setOpId(opId: OpId): void; static handleAsyncMsgFromRust(opId: OpId, buf: Uint8Array): void; @@ -58,7 +58,7 @@ declare interface DenoCore { shift(): Uint8Array | null; }; - Op: Op; + Op: typeof Op; initOps(): void; recv(cb: MessageCallback): void; From 04e4516c3331bc2ff4922e6f57200d0fec41764b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sun, 22 Sep 2019 12:00:37 +0200 Subject: [PATCH 11/34] reset CI From 94c3bdb483095967c64debe73667958d2a5a8e37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 24 Sep 2019 18:14:12 +0200 Subject: [PATCH 12/34] Deno.core.getOps() --- core/examples/http_bench.js | 50 ++++++++++++++++---- core/shared_queue.js | 73 ++++++------------------------ deno_typescript/lib.deno_core.d.ts | 3 +- 3 files changed, 55 insertions(+), 71 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 06f74f241d3c9f..b8d6faef5e2d56 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -1,20 +1,47 @@ // This is not a real HTTP server. We read blindly one time into 'requestBuf', // then write this fixed 'responseBuf'. The point of this benchmark is to // exercise the event loop in a simple yet semi-realistic way. -class HttpOp extends Deno.core.Op { +const registry = {}; + +function initOps(opsMap) { + for (const [name, opId] of Object.entries(opsMap)) { + const op = registry[name]; + + if (!op) { + continue; + } + + op.setOpId(opId); + } +} + +class Op { + constructor(name) { + if (typeof registry[name] !== "undefined") { + throw new Error(`Duplicate op: ${name}`); + } + + this.name = name; + this.opId = 0; + registry[name] = this; + } + + setOpId(opId) { + this.opId = opId; + } + static handleAsyncMsgFromRust(opId, buf) { const record = recordFromBuf(buf); - const { promiseId, result } = record; + const { promiseId } = record; const p = promiseMap.get(promiseId); promiseMap.delete(promiseId); - p.resolve(result); + p.resolve(record); } /** Returns i32 number */ static sendSync(opId, arg, zeroCopy) { const buf = send(0, opId, arg, zeroCopy); - const record = recordFromBuf(buf); - return record.result; + return recordFromBuf(buf); } /** Returns Promise */ @@ -25,13 +52,17 @@ class HttpOp extends Deno.core.Op { send(promiseId, opId, arg, zeroCopy); return p; } +} +class HttpOp extends Op { sendSync(arg, zeroCopy = null) { - return HttpOp.sendSync(this.opId, arg, zeroCopy); + const res = HttpOp.sendSync(this.opId, arg, zeroCopy); + return res.result; } - sendAsync(arg, zeroCopy = null) { - return HttpOp.sendAsync(this.opId, arg, zeroCopy); + async sendAsync(arg, zeroCopy = null) { + const res = await HttpOp.sendAsync(this.opId, arg, zeroCopy); + return res.result; } } @@ -132,7 +163,8 @@ async function serve(rid) { } async function main() { - Deno.core.initOps(); + Deno.core.setAsyncHandler(Op.handleAsyncMsgFromRust); + initOps(Deno.core.getOps()); Deno.core.print("http_bench.js start\n"); const listenerRid = listen(); diff --git a/core/shared_queue.js b/core/shared_queue.js index e3c7f39995ddff..4aa1b11a9dd337 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -38,9 +38,6 @@ SharedQueue Binary Layout let sharedBytes; let shared32; - let rustOpsMap; - const jsOpsMap = new Map(); - let jsOpsAsyncHandlers; let initialized = false; function maybeInit() { @@ -61,24 +58,10 @@ SharedQueue Binary Layout Deno.core.recv(handleAsyncMsgFromRust); } - function initOps() { + function getOps() { const opsMapBytes = Deno.core.send(0, new Uint8Array([]), null); const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); - rustOpsMap = JSON.parse(opsMapJson); - const opVector = new Array(Object.keys(rustOpsMap).length); - - for (const [name, opId] of Object.entries(rustOpsMap)) { - const op = jsOpsMap.get(name); - - if (!op) { - continue; - } - - op.setOpId(opId); - opVector[opId] = op.constructor.handleAsyncMsgFromRust; - } - - jsOpsAsyncHandlers = opVector; + return JSON.parse(opsMapJson); } function assert(cond) { @@ -107,17 +90,18 @@ SharedQueue Binary Layout return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF]; } - // TODO(ry) rename to setMeta - function setMeta(index, end, opId) { + function setMeta(index, end, opId, promiseId) { shared32[INDEX_OFFSETS + 2 * index] = end; shared32[INDEX_OFFSETS + 2 * index + 1] = opId; + shared32[INDEX_OFFSETS + 2 * index + 2] = promiseId; } function getMeta(index) { if (index < numRecords()) { const buf = shared32[INDEX_OFFSETS + 2 * index]; const opId = shared32[INDEX_OFFSETS + 2 * index + 1]; - return [opId, buf]; + const promiseId = shared32[INDEX_OFFSETS + 2 * index + 2]; + return [opId, promiseId, buf]; } else { return null; } @@ -135,7 +119,7 @@ SharedQueue Binary Layout } } - function push(opId, buf) { + function push(opId, promiseId, buf) { const off = head(); const end = off + buf.byteLength; const index = numRecords(); @@ -143,7 +127,7 @@ SharedQueue Binary Layout // console.log("shared_queue.js push fail"); return false; } - setMeta(index, end, opId); + setMeta(index, end, opId, promiseId); assert(end - off == buf.byteLength); sharedBytes.set(buf, off); shared32[INDEX_NUM_RECORDS] += 1; @@ -160,7 +144,7 @@ SharedQueue Binary Layout } const off = getOffset(i); - const [opId, end] = getMeta(i); + const [opId, promiseId, end] = getMeta(i); if (size() > 1) { shared32[INDEX_NUM_SHIFTED_OFF] += 1; @@ -171,7 +155,7 @@ SharedQueue Binary Layout assert(off != null); assert(end != null); const buf = sharedBytes.subarray(off, end); - return [opId, buf]; + return [opId, promiseId, buf]; } let asyncHandler; @@ -184,16 +168,14 @@ SharedQueue Binary Layout function handleAsyncMsgFromRust(opId, buf) { if (buf) { // This is the overflow_response case of deno::Isolate::poll(). - const cb = asyncHandler ? asyncHandler : jsOpsAsyncHandlers[opId]; - cb(opId, buf); + asyncHandler(opId, buf); } else { while (true) { const opIdBuf = shift(); if (opIdBuf == null) { break; } - const cb = asyncHandler ? asyncHandler : jsOpsAsyncHandlers[opIdBuf[0]]; - cb(...opIdBuf); + asyncHandler(opId, buf); } } } @@ -203,34 +185,6 @@ SharedQueue Binary Layout return Deno.core.send(opId, control, zeroCopy); } - class Op { - constructor(name) { - if (typeof jsOpsMap.get(name) !== "undefined") { - throw new Error(`Duplicate op: ${name}`); - } - - this.name = name; - this.opId = 0; - jsOpsMap.set(name, this); - } - - setOpId(opId) { - this.opId = opId; - } - - static handleAsyncMsgFromRust(_opId, _buf) { - throw new Error("Unimplemented"); - } - - static sendSync(_opId, _control, _zeroCopy = null) { - throw new Error("Unimplemented"); - } - - static sendAsync(_opId, _control, _zeroCopy = null) { - throw new Error("Unimplemented"); - } - } - const denoCore = { setAsyncHandler, dispatch, @@ -243,8 +197,7 @@ SharedQueue Binary Layout reset, shift }, - initOps, - Op + getOps }; assert(window[GLOBAL_NAMESPACE] != null); diff --git a/deno_typescript/lib.deno_core.d.ts b/deno_typescript/lib.deno_core.d.ts index 1961f385e212fd..d8f2b5f044880b 100644 --- a/deno_typescript/lib.deno_core.d.ts +++ b/deno_typescript/lib.deno_core.d.ts @@ -58,8 +58,7 @@ declare interface DenoCore { shift(): Uint8Array | null; }; - Op: typeof Op; - initOps(): void; + getOps(): Record; recv(cb: MessageCallback): void; From 42f0630b348cabc0a3972fcfa240b7d00b7edb15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 24 Sep 2019 18:35:41 +0200 Subject: [PATCH 13/34] fix shared queue --- core/shared_queue.js | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/core/shared_queue.js b/core/shared_queue.js index 4aa1b11a9dd337..2cea811482c6b0 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -90,18 +90,16 @@ SharedQueue Binary Layout return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF]; } - function setMeta(index, end, opId, promiseId) { + function setMeta(index, end, opId) { shared32[INDEX_OFFSETS + 2 * index] = end; shared32[INDEX_OFFSETS + 2 * index + 1] = opId; - shared32[INDEX_OFFSETS + 2 * index + 2] = promiseId; } function getMeta(index) { if (index < numRecords()) { const buf = shared32[INDEX_OFFSETS + 2 * index]; const opId = shared32[INDEX_OFFSETS + 2 * index + 1]; - const promiseId = shared32[INDEX_OFFSETS + 2 * index + 2]; - return [opId, promiseId, buf]; + return [opId, buf]; } else { return null; } @@ -119,7 +117,7 @@ SharedQueue Binary Layout } } - function push(opId, promiseId, buf) { + function push(opId, buf) { const off = head(); const end = off + buf.byteLength; const index = numRecords(); @@ -127,7 +125,7 @@ SharedQueue Binary Layout // console.log("shared_queue.js push fail"); return false; } - setMeta(index, end, opId, promiseId); + setMeta(index, end, opId); assert(end - off == buf.byteLength); sharedBytes.set(buf, off); shared32[INDEX_NUM_RECORDS] += 1; @@ -144,7 +142,7 @@ SharedQueue Binary Layout } const off = getOffset(i); - const [opId, promiseId, end] = getMeta(i); + const [opId, end] = getMeta(i); if (size() > 1) { shared32[INDEX_NUM_SHIFTED_OFF] += 1; @@ -155,7 +153,7 @@ SharedQueue Binary Layout assert(off != null); assert(end != null); const buf = sharedBytes.subarray(off, end); - return [opId, promiseId, buf]; + return [opId, buf]; } let asyncHandler; @@ -175,7 +173,7 @@ SharedQueue Binary Layout if (opIdBuf == null) { break; } - asyncHandler(opId, buf); + asyncHandler(...opIdBuf); } } } From a97827b60b2870c740bc457b49d9c3d561206220 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 24 Sep 2019 22:29:37 +0200 Subject: [PATCH 14/34] cleanup --- core/examples/http_bench.js | 4 ++-- core/examples/http_bench.rs | 28 ++++++++++++++-------------- deno_typescript/lib.deno_core.d.ts | 21 --------------------- 3 files changed, 16 insertions(+), 37 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index b8d6faef5e2d56..862bc93fc8d148 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -38,13 +38,11 @@ class Op { p.resolve(record); } - /** Returns i32 number */ static sendSync(opId, arg, zeroCopy) { const buf = send(0, opId, arg, zeroCopy); return recordFromBuf(buf); } - /** Returns Promise */ static sendAsync(opId, arg, zeroCopy = null) { const promiseId = nextPromiseId++; const p = createResolvable(); @@ -55,11 +53,13 @@ class Op { } class HttpOp extends Op { + /** Returns i32 number */ sendSync(arg, zeroCopy = null) { const res = HttpOp.sendSync(this.opId, arg, zeroCopy); return res.result; } + /** Returns Promise */ async sendAsync(arg, zeroCopy = null) { const res = await HttpOp.sendAsync(this.opId, arg, zeroCopy); return res.result; diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index bf32ad8b0612c3..bed585a1eac1b3 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -98,15 +98,15 @@ fn test_record_from() { // TODO test From<&[u8]> for Record } -pub type HttpBenchOp = dyn Future + Send; +pub type HttpOp = dyn Future + Send; -pub type HttpBenchOpHandler = fn( +pub type HttpOpHandler = fn( is_sync: bool, record: Record, zero_copy_buf: Option, -) -> Box; +) -> Box; -fn serialize_http_bench_op(handler: HttpBenchOpHandler) -> Box { +fn serialize_http_op(handler: HttpOpHandler) -> Box { let serialized_op = move |control: &[u8], zero_copy_buf: Option| -> CoreOp { let record = Record::from(control); @@ -156,11 +156,11 @@ fn main() { }); let mut isolate = deno::Isolate::new(startup_data, false); - isolate.register_op("listen", serialize_http_bench_op(op_listen)); - isolate.register_op("accept", serialize_http_bench_op(op_accept)); - isolate.register_op("read", serialize_http_bench_op(op_read)); - isolate.register_op("write", serialize_http_bench_op(op_write)); - isolate.register_op("close", serialize_http_bench_op(op_close)); + isolate.register_op("listen", serialize_http_op(op_listen)); + isolate.register_op("accept", serialize_http_op(op_accept)); + isolate.register_op("read", serialize_http_op(op_read)); + isolate.register_op("write", serialize_http_op(op_write)); + isolate.register_op("close", serialize_http_op(op_close)); isolate.then(|r| { js_check(r); @@ -208,7 +208,7 @@ fn op_accept( is_sync: bool, record: Record, _zero_copy_buf: Option, -) -> Box { +) -> Box { assert!(!is_sync); let listener_rid = record.arg; debug!("accept {}", listener_rid); @@ -237,7 +237,7 @@ fn op_listen( is_sync: bool, _record: Record, _zero_copy_buf: Option, -) -> Box { +) -> Box { assert!(is_sync); debug!("listen"); Box::new(lazy(move || { @@ -255,7 +255,7 @@ fn op_close( is_sync: bool, record: Record, _zero_copy_buf: Option, -) -> Box { +) -> Box { assert!(is_sync); debug!("close"); let rid = record.arg; @@ -271,7 +271,7 @@ fn op_read( is_sync: bool, record: Record, zero_copy_buf: Option, -) -> Box { +) -> Box { assert!(!is_sync); let rid = record.arg; debug!("read rid={}", rid); @@ -298,7 +298,7 @@ fn op_write( is_sync: bool, record: Record, zero_copy_buf: Option, -) -> Box { +) -> Box { assert!(!is_sync); let rid = record.arg; debug!("write rid={}", rid); diff --git a/deno_typescript/lib.deno_core.d.ts b/deno_typescript/lib.deno_core.d.ts index d8f2b5f044880b..c1b1f7ef9c3ecf 100644 --- a/deno_typescript/lib.deno_core.d.ts +++ b/deno_typescript/lib.deno_core.d.ts @@ -20,27 +20,6 @@ interface EvalErrorInfo { thrown: any; } -declare type OpId = number; - -declare class Op { - name: string; - opId: OpId; - - constructor(name: string); - - setOpId(opId: OpId): void; - - static handleAsyncMsgFromRust(opId: OpId, buf: Uint8Array): void; - - static sendSync(opId: OpId, control: Uint8Array, zeroCopy?: Uint8Array): void; - - static sendAsync( - opId: OpId, - control: Uint8Array, - zeroCopy?: Uint8Array - ): void; -} - declare interface DenoCore { print(s: string, isErr?: boolean); dispatch( From c641e126cc0e4ffc341c8a65160381bdbf1dc7cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 25 Sep 2019 16:36:38 +0200 Subject: [PATCH 15/34] review --- core/examples/http_bench.js | 30 +++++++++++++----------------- core/isolate.rs | 3 ++- core/ops.rs | 19 ++++++++++++++++--- 3 files changed, 31 insertions(+), 21 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 862bc93fc8d148..2aa96131a6a4ea 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -3,19 +3,7 @@ // exercise the event loop in a simple yet semi-realistic way. const registry = {}; -function initOps(opsMap) { - for (const [name, opId] of Object.entries(opsMap)) { - const op = registry[name]; - - if (!op) { - continue; - } - - op.setOpId(opId); - } -} - -class Op { +class HttpOp { constructor(name) { if (typeof registry[name] !== "undefined") { throw new Error(`Duplicate op: ${name}`); @@ -50,9 +38,7 @@ class Op { send(promiseId, opId, arg, zeroCopy); return p; } -} -class HttpOp extends Op { /** Returns i32 number */ sendSync(arg, zeroCopy = null) { const res = HttpOp.sendSync(this.opId, arg, zeroCopy); @@ -163,8 +149,18 @@ async function serve(rid) { } async function main() { - Deno.core.setAsyncHandler(Op.handleAsyncMsgFromRust); - initOps(Deno.core.getOps()); + Deno.core.setAsyncHandler(HttpOp.handleAsyncMsgFromRust); + const opsMap = Deno.core.getOps(); + for (const [name, opId] of Object.entries(opsMap)) { + const op = registry[name]; + + if (!op) { + continue; + } + + op.setOpId(opId); + } + Deno.core.print("http_bench.js start\n"); const listenerRid = listen(); diff --git a/core/isolate.rs b/core/isolate.rs index 4890b9801582a2..43c6f44059a8bc 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -245,7 +245,7 @@ impl Isolate { self.op_registry.register_op(name, op) } - pub fn call_op( + fn call_op( &self, op_id: OpId, control: &[u8], @@ -334,6 +334,7 @@ impl Isolate { let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; let op = if let Some(ref f) = isolate.dispatch { + assert!(op_id != 0); f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf)) } else { isolate.call_op( diff --git a/core/ops.rs b/core/ops.rs index 8b01a20d77c968..d549314629aa76 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -31,15 +31,21 @@ pub struct OpRegistry { pub op_map: HashMap, } -fn get_op_map(_control: &[u8], _zero_copy_buf: Option) -> CoreOp { +fn op_noop(_control: &[u8], _zero_copy_buf: Option) -> CoreOp { Op::Sync(Box::new([])) } impl OpRegistry { pub fn new() -> Self { - // TODO: this is make shift fix for get op map let mut registry = Self::default(); - registry.register_op("get_op_map", Box::new(get_op_map)); + // TODO: We should register actual "get_op_map" op here, but I couldn't + // get past borrow checker when I wanted to do: + // registry.register_op("get_op_map", Box::new(self.op_noop)); + + // Add single noop symbolizing "get_op_map" function. The actual + // handling is done in `isolate.rs`. + let op_id = registry.register_op("get_op_map", Box::new(op_noop)); + assert_eq!(op_id, 0); registry } @@ -64,3 +70,10 @@ impl OpRegistry { op_id } } + +#[test] +fn test_register_op() { + let mut op_registry = OpRegistry::new(); + let op_id = op_registry.register_op("test", Box::new(op_noop)); + assert!(op_id != 0); +} From de98a7b6c9244382f93a7a7146b61e3385c1b13b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 25 Sep 2019 16:43:40 +0200 Subject: [PATCH 16/34] add comment --- core/examples/http_bench.js | 2 ++ third_party | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 2aa96131a6a4ea..46bfad9b7b7061 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -150,6 +150,8 @@ async function serve(rid) { async function main() { Deno.core.setAsyncHandler(HttpOp.handleAsyncMsgFromRust); + // Initialize ops by getting their ids from Rust + // and assign id for each of our ops. const opsMap = Deno.core.getOps(); for (const [name, opId] of Object.entries(opsMap)) { const op = registry[name]; diff --git a/third_party b/third_party index 1cceee1f4e315e..86f683ab4a58e1 160000 --- a/third_party +++ b/third_party @@ -1 +1 @@ -Subproject commit 1cceee1f4e315eaeee1bd4d1fe4cfbe7162fc4fb +Subproject commit 86f683ab4a58e1c20995d08c3b57c3aba227dbe2 From b0da81f8339d220e6d3f994e6b58c1cd9cd62db6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 25 Sep 2019 16:51:48 +0200 Subject: [PATCH 17/34] comments --- core/isolate.rs | 7 +++++++ core/ops.rs | 10 ++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/core/isolate.rs b/core/isolate.rs index 43c6f44059a8bc..8d96ffdad901fa 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -234,6 +234,9 @@ impl Isolate { /// Defines the how Deno.core.dispatch() acts. /// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf /// corresponds to the second argument of Deno.core.dispatch(). + /// + /// If this method is used then ops registered using `op_register` function are + /// ignored and all dispatching must be handled manually in provided callback. pub fn set_dispatch(&mut self, f: F) where F: Fn(OpId, &[u8], Option) -> CoreOp + Send + Sync + 'static, @@ -251,6 +254,10 @@ impl Isolate { control: &[u8], zero_copy_buf: Option, ) -> CoreOp { + // TODO: see TODO in `core/ops.rs` to handle this op via public API + // Op with id 0 has special meaning - it's special op that is + // always provided to retrieve op id map. + // Op id map consists of name to `OpId` mappings. if op_id == 0 { let op_map = self.op_registry.get_op_map(); let op_map_json = serde_json::to_string(&op_map).unwrap(); diff --git a/core/ops.rs b/core/ops.rs index d549314629aa76..326fd227ec889a 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -40,7 +40,7 @@ impl OpRegistry { let mut registry = Self::default(); // TODO: We should register actual "get_op_map" op here, but I couldn't // get past borrow checker when I wanted to do: - // registry.register_op("get_op_map", Box::new(self.op_noop)); + // registry.register_op("get_op_map", Box::new(registry.op_noop)); // Add single noop symbolizing "get_op_map" function. The actual // handling is done in `isolate.rs`. @@ -72,8 +72,14 @@ impl OpRegistry { } #[test] -fn test_register_op() { +fn test_op_registry() { let mut op_registry = OpRegistry::new(); let op_id = op_registry.register_op("test", Box::new(op_noop)); assert!(op_id != 0); + + let mut expected_map = HashMap::new(); + expected_map.insert("get_op_map", 0); + expected_map.insert("test", 1); + let op_map = op_registry.get_op_map(); + assert_eq!(op_map, expected_map); } From 4ccae4acc84fc2d30458e895e14943b6b9d52d72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 25 Sep 2019 17:08:06 +0200 Subject: [PATCH 18/34] move call_op to OpRegistry --- core/examples/http_bench.js | 15 +++++++++++---- core/isolate.rs | 28 ++++++---------------------- core/ops.rs | 33 ++++++++++++++++++++++++++++----- 3 files changed, 45 insertions(+), 31 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 46bfad9b7b7061..6342fa15adf18d 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -1,17 +1,24 @@ // This is not a real HTTP server. We read blindly one time into 'requestBuf', // then write this fixed 'responseBuf'. The point of this benchmark is to // exercise the event loop in a simple yet semi-realistic way. -const registry = {}; + +/** This structure is used to collect all ops + * and assign ids to them after we get them + * from Rust. + * + * @type {Map} + */ +const opRegistry = new Map(); class HttpOp { constructor(name) { - if (typeof registry[name] !== "undefined") { + if (typeof opRegistry.get(name) !== "undefined") { throw new Error(`Duplicate op: ${name}`); } this.name = name; this.opId = 0; - registry[name] = this; + opRegistry.set(name, this); } setOpId(opId) { @@ -154,7 +161,7 @@ async function main() { // and assign id for each of our ops. const opsMap = Deno.core.getOps(); for (const [name, opId] of Object.entries(opsMap)) { - const op = registry[name]; + const op = opRegistry.get(name); if (!op) { continue; diff --git a/core/isolate.rs b/core/isolate.rs index 8d96ffdad901fa..aef70f1b06629e 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -244,31 +244,15 @@ impl Isolate { self.dispatch = Some(Arc::new(f)); } + /// New dispatch mechanism. Requires runtime to explicitly ask for op ids + /// before using any of the ops. + /// + /// Ops added using this method are only usable if `dispatch` is not set + /// (using `set_dispatch` method). pub fn register_op(&mut self, name: &str, op: Box) -> OpId { self.op_registry.register_op(name, op) } - fn call_op( - &self, - op_id: OpId, - control: &[u8], - zero_copy_buf: Option, - ) -> CoreOp { - // TODO: see TODO in `core/ops.rs` to handle this op via public API - // Op with id 0 has special meaning - it's special op that is - // always provided to retrieve op id map. - // Op id map consists of name to `OpId` mappings. - if op_id == 0 { - let op_map = self.op_registry.get_op_map(); - let op_map_json = serde_json::to_string(&op_map).unwrap(); - let buf = op_map_json.as_bytes().to_owned().into_boxed_slice(); - return Op::Sync(buf); - } - let ops = &self.op_registry.ops; - let op_handler = &*ops.get(op_id as usize).expect("Op not found!"); - op_handler(control, zero_copy_buf) - } - pub fn set_dyn_import(&mut self, f: F) where F: Fn(deno_dyn_import_id, &str, &str) -> DynImportStream @@ -344,7 +328,7 @@ impl Isolate { assert!(op_id != 0); f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf)) } else { - isolate.call_op( + isolate.op_registry.call_op( op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf), diff --git a/core/ops.rs b/core/ops.rs index 326fd227ec889a..0fd59b62fb1a31 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -35,15 +35,18 @@ fn op_noop(_control: &[u8], _zero_copy_buf: Option) -> CoreOp { Op::Sync(Box::new([])) } +fn op_get_op_map(op_registry: &OpRegistry) -> CoreOp { + let op_map = op_registry.get_op_map(); + let op_map_json = serde_json::to_string(&op_map).unwrap(); + let buf = op_map_json.as_bytes().to_owned().into_boxed_slice(); + return Op::Sync(buf); +} + impl OpRegistry { pub fn new() -> Self { let mut registry = Self::default(); - // TODO: We should register actual "get_op_map" op here, but I couldn't - // get past borrow checker when I wanted to do: - // registry.register_op("get_op_map", Box::new(registry.op_noop)); - // Add single noop symbolizing "get_op_map" function. The actual - // handling is done in `isolate.rs`. + // handling is done in `call_op` method. let op_id = registry.register_op("get_op_map", Box::new(op_noop)); assert_eq!(op_id, 0); registry @@ -69,6 +72,23 @@ impl OpRegistry { self.ops.push(serialized_op); op_id } + + pub fn call_op( + &self, + op_id: OpId, + control: &[u8], + zero_copy_buf: Option, + ) -> CoreOp { + // Op with id 0 has special meaning - it's a special op that is + // always provided to retrieve op id map. + // Op id map consists of name to `OpId` mappings. + if op_id == 0 { + return op_get_op_map(self); + } + + let op_handler = &*self.ops.get(op_id as usize).expect("Op not found!"); + op_handler(control, zero_copy_buf) + } } #[test] @@ -82,4 +102,7 @@ fn test_op_registry() { expected_map.insert("test", 1); let op_map = op_registry.get_op_map(); assert_eq!(op_map, expected_map); + + let res = op_registry.call_op(1, [], None); + assert_eq!(op_map, Op::Sync(Box::new([]))); } From fc43e8a35fa0d104e31c93c1d43226d642867f7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 25 Sep 2019 17:15:23 +0200 Subject: [PATCH 19/34] fix --- core/ops.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/ops.rs b/core/ops.rs index 0fd59b62fb1a31..2931394b249eb1 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -98,11 +98,16 @@ fn test_op_registry() { assert!(op_id != 0); let mut expected_map = HashMap::new(); - expected_map.insert("get_op_map", 0); - expected_map.insert("test", 1); + expected_map.insert("get_op_map".to_string(), 0); + expected_map.insert("test".to_string(), 1); let op_map = op_registry.get_op_map(); assert_eq!(op_map, expected_map); - let res = op_registry.call_op(1, [], None); - assert_eq!(op_map, Op::Sync(Box::new([]))); + let res = op_registry.call_op(1, &[], None); + match res { + Op::Sync(buf) => { + assert_eq!(buf.len(), 0); + } + _ => panic!(), + } } From e28507fab1270519df3537d33dbd0f52a8b790ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 25 Sep 2019 17:23:34 +0200 Subject: [PATCH 20/34] update third_party --- third_party | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/third_party b/third_party index 86f683ab4a58e1..1cceee1f4e315e 160000 --- a/third_party +++ b/third_party @@ -1 +1 @@ -Subproject commit 86f683ab4a58e1c20995d08c3b57c3aba227dbe2 +Subproject commit 1cceee1f4e315eaeee1bd4d1fe4cfbe7162fc4fb From 86f53b223d71b54130cad3c72a84aff157f0e96c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 25 Sep 2019 18:20:46 +0200 Subject: [PATCH 21/34] clippy --- core/ops.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/ops.rs b/core/ops.rs index 2931394b249eb1..c9a44794e32175 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -39,7 +39,7 @@ fn op_get_op_map(op_registry: &OpRegistry) -> CoreOp { let op_map = op_registry.get_op_map(); let op_map_json = serde_json::to_string(&op_map).unwrap(); let buf = op_map_json.as_bytes().to_owned().into_boxed_slice(); - return Op::Sync(buf); + Op::Sync(buf) } impl OpRegistry { From 8e720eef378781281e18cae82bfde842634ae68a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 25 Sep 2019 21:13:26 +0200 Subject: [PATCH 22/34] review part 2 --- core/examples/http_bench.js | 2 +- core/examples/http_bench.rs | 51 ++++++++++--------------------------- core/ops.rs | 6 +++-- 3 files changed, 18 insertions(+), 41 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 6342fa15adf18d..12eeddc8521d66 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -164,7 +164,7 @@ async function main() { const op = opRegistry.get(name); if (!op) { - continue; + throw new Error(`Unknown op: ${name}`); } op.setOpId(opId); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index bed585a1eac1b3..f9a0e0029425e1 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -100,18 +100,15 @@ fn test_record_from() { pub type HttpOp = dyn Future + Send; -pub type HttpOpHandler = fn( - is_sync: bool, - record: Record, - zero_copy_buf: Option, -) -> Box; +pub type HttpOpHandler = + fn(record: Record, zero_copy_buf: Option) -> Box; -fn serialize_http_op(handler: HttpOpHandler) -> Box { +fn http_op(handler: HttpOpHandler) -> Box { let serialized_op = move |control: &[u8], zero_copy_buf: Option| -> CoreOp { let record = Record::from(control); let is_sync = record.promise_id == 0; - let op = handler(is_sync, record.clone(), zero_copy_buf); + let op = handler(record.clone(), zero_copy_buf); let mut record_a = record.clone(); let mut record_b = record.clone(); @@ -156,11 +153,11 @@ fn main() { }); let mut isolate = deno::Isolate::new(startup_data, false); - isolate.register_op("listen", serialize_http_op(op_listen)); - isolate.register_op("accept", serialize_http_op(op_accept)); - isolate.register_op("read", serialize_http_op(op_read)); - isolate.register_op("write", serialize_http_op(op_write)); - isolate.register_op("close", serialize_http_op(op_close)); + isolate.register_op("listen", http_op(op_listen)); + isolate.register_op("accept", http_op(op_accept)); + isolate.register_op("read", http_op(op_read)); + isolate.register_op("write", http_op(op_write)); + isolate.register_op("close", http_op(op_close)); isolate.then(|r| { js_check(r); @@ -204,12 +201,7 @@ fn new_rid() -> i32 { rid as i32 } -fn op_accept( - is_sync: bool, - record: Record, - _zero_copy_buf: Option, -) -> Box { - assert!(!is_sync); +fn op_accept(record: Record, _zero_copy_buf: Option) -> Box { let listener_rid = record.arg; debug!("accept {}", listener_rid); Box::new( @@ -234,11 +226,9 @@ fn op_accept( } fn op_listen( - is_sync: bool, _record: Record, _zero_copy_buf: Option, ) -> Box { - assert!(is_sync); debug!("listen"); Box::new(lazy(move || { let addr = "127.0.0.1:4544".parse::().unwrap(); @@ -251,12 +241,7 @@ fn op_listen( })) } -fn op_close( - is_sync: bool, - record: Record, - _zero_copy_buf: Option, -) -> Box { - assert!(is_sync); +fn op_close(record: Record, _zero_copy_buf: Option) -> Box { debug!("close"); let rid = record.arg; Box::new(lazy(move || { @@ -267,12 +252,7 @@ fn op_close( })) } -fn op_read( - is_sync: bool, - record: Record, - zero_copy_buf: Option, -) -> Box { - assert!(!is_sync); +fn op_read(record: Record, zero_copy_buf: Option) -> Box { let rid = record.arg; debug!("read rid={}", rid); let mut zero_copy_buf = zero_copy_buf.unwrap(); @@ -294,12 +274,7 @@ fn op_read( ) } -fn op_write( - is_sync: bool, - record: Record, - zero_copy_buf: Option, -) -> Box { - assert!(!is_sync); +fn op_write(record: Record, zero_copy_buf: Option) -> Box { let rid = record.arg; debug!("write rid={}", rid); let zero_copy_buf = zero_copy_buf.unwrap(); diff --git a/core/ops.rs b/core/ops.rs index c9a44794e32175..f530a9e779869e 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -53,7 +53,10 @@ impl OpRegistry { } pub fn get_op_map(&self) -> HashMap { - self.op_map.clone() + let mut op_map = self.op_map.clone(); + // Don't send "get_op_map" to JS, if JS encounters unknown op it should throw. + op_map.remove("get_op_map"); + op_map } pub fn register_op( @@ -98,7 +101,6 @@ fn test_op_registry() { assert!(op_id != 0); let mut expected_map = HashMap::new(); - expected_map.insert("get_op_map".to_string(), 0); expected_map.insert("test".to_string(), 1); let op_map = op_registry.get_op_map(); assert_eq!(op_map, expected_map); From eeeb828e65a28f00359a447efc3e0990873c6b56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 25 Sep 2019 21:34:54 +0200 Subject: [PATCH 23/34] review part 3 --- core/isolate.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/isolate.rs b/core/isolate.rs index aef70f1b06629e..dd2e457acc5eb7 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -237,6 +237,7 @@ impl Isolate { /// /// If this method is used then ops registered using `op_register` function are /// ignored and all dispatching must be handled manually in provided callback. + // TODO: we want to deprecate and remove this API and move to `register_op` API pub fn set_dispatch(&mut self, f: F) where F: Fn(OpId, &[u8], Option) -> CoreOp + Send + Sync + 'static, @@ -250,6 +251,10 @@ impl Isolate { /// Ops added using this method are only usable if `dispatch` is not set /// (using `set_dispatch` method). pub fn register_op(&mut self, name: &str, op: Box) -> OpId { + assert!( + self.dispatch.is_none(), + "set_dispatch should not be used in conjunction with register_op" + ); self.op_registry.register_op(name, op) } From ef5500fb9b4dda7f000ee06d76a2e43dd070b877 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 25 Sep 2019 22:18:42 +0200 Subject: [PATCH 24/34] register_op --- core/ops.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/ops.rs b/core/ops.rs index f530a9e779869e..7d76f0d6d38989 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -66,11 +66,11 @@ impl OpRegistry { ) -> OpId { let op_id = self.ops.len() as u32; - self - .op_map - .entry(name.to_string()) - .and_modify(|_| panic!("Op already registered {}", op_id)) - .or_insert(op_id); + let existing = self.op_map.insert(name.to_string(), op_id); + assert!( + existing.is_none(), + format!("Op already registered: {}", name) + ); self.ops.push(serialized_op); op_id From 7db809a6cafccc790842ebbeef100cf381c0eb10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 26 Sep 2019 11:17:00 +0200 Subject: [PATCH 25/34] reset CI From c37310adbbbe8202504cd5e9125561a714d566d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 26 Sep 2019 11:40:14 +0200 Subject: [PATCH 26/34] reset CI From 08f9c4f206374e1f4bdd64318e6359160c85a899 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 26 Sep 2019 13:04:30 +0200 Subject: [PATCH 27/34] Deno.core.ops --- core/examples/http_bench.js | 113 ++++++++--------------------- core/ops.rs | 6 +- core/shared_queue.js | 10 ++- deno_typescript/lib.deno_core.d.ts | 3 +- 4 files changed, 42 insertions(+), 90 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 12eeddc8521d66..a6f55a11b9e88d 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -1,70 +1,6 @@ // This is not a real HTTP server. We read blindly one time into 'requestBuf', // then write this fixed 'responseBuf'. The point of this benchmark is to // exercise the event loop in a simple yet semi-realistic way. - -/** This structure is used to collect all ops - * and assign ids to them after we get them - * from Rust. - * - * @type {Map} - */ -const opRegistry = new Map(); - -class HttpOp { - constructor(name) { - if (typeof opRegistry.get(name) !== "undefined") { - throw new Error(`Duplicate op: ${name}`); - } - - this.name = name; - this.opId = 0; - opRegistry.set(name, this); - } - - setOpId(opId) { - this.opId = opId; - } - - static handleAsyncMsgFromRust(opId, buf) { - const record = recordFromBuf(buf); - const { promiseId } = record; - const p = promiseMap.get(promiseId); - promiseMap.delete(promiseId); - p.resolve(record); - } - - static sendSync(opId, arg, zeroCopy) { - const buf = send(0, opId, arg, zeroCopy); - return recordFromBuf(buf); - } - - static sendAsync(opId, arg, zeroCopy = null) { - const promiseId = nextPromiseId++; - const p = createResolvable(); - promiseMap.set(promiseId, p); - send(promiseId, opId, arg, zeroCopy); - return p; - } - - /** Returns i32 number */ - sendSync(arg, zeroCopy = null) { - const res = HttpOp.sendSync(this.opId, arg, zeroCopy); - return res.result; - } - - /** Returns Promise */ - async sendAsync(arg, zeroCopy = null) { - const res = await HttpOp.sendAsync(this.opId, arg, zeroCopy); - return res.result; - } -} - -const OP_LISTEN = new HttpOp("listen"); -const OP_ACCEPT = new HttpOp("accept"); -const OP_READ = new HttpOp("read"); -const OP_WRITE = new HttpOp("write"); -const OP_CLOSE = new HttpOp("close"); - const requestBuf = new Uint8Array(64 * 1024); const responseBuf = new Uint8Array( "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" @@ -103,6 +39,15 @@ function send(promiseId, opId, arg, zeroCopy = null) { return Deno.core.dispatch(opId, scratchBytes, zeroCopy); } +/** Returns Promise */ +function sendAsync(opId, arg, zeroCopy = null) { + const promiseId = nextPromiseId++; + const p = createResolvable(); + promiseMap.set(promiseId, p); + send(promiseId, opId, arg, zeroCopy); + return p; +} + function recordFromBuf(buf) { assert(buf.byteLength === 3 * 4); const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); @@ -113,14 +58,29 @@ function recordFromBuf(buf) { }; } +/** Returns i32 number */ +function sendSync(opId, arg) { + const buf = send(0, opId, arg); + const record = recordFromBuf(buf); + return record.result; +} + +function handleAsyncMsgFromRust(opId, buf) { + const record = recordFromBuf(buf); + const { promiseId, result } = record; + const p = promiseMap.get(promiseId); + promiseMap.delete(promiseId); + p.resolve(result); +} + /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { - return OP_LISTEN.sendSync(-1); + return sendSync(Deno.core.ops["listen"], -1); } /** Accepts a connection, returns rid. */ async function accept(rid) { - return await OP_ACCEPT.sendAsync(rid); + return await sendAsync(Deno.core.ops["accept"], rid); } /** @@ -128,16 +88,16 @@ async function accept(rid) { * Returns bytes read. */ async function read(rid, data) { - return await OP_READ.sendAsync(rid, data); + return await sendAsync(Deno.core.ops["read"], rid, data); } /** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ async function write(rid, data) { - return await OP_WRITE.sendAsync(rid, data); + return await sendAsync(Deno.core.ops["write"], rid, data); } function close(rid) { - return OP_CLOSE.sendSync(rid); + return sendSync(Deno.core.ops["close"], rid); } async function serve(rid) { @@ -156,19 +116,8 @@ async function serve(rid) { } async function main() { - Deno.core.setAsyncHandler(HttpOp.handleAsyncMsgFromRust); - // Initialize ops by getting their ids from Rust - // and assign id for each of our ops. - const opsMap = Deno.core.getOps(); - for (const [name, opId] of Object.entries(opsMap)) { - const op = opRegistry.get(name); - - if (!op) { - throw new Error(`Unknown op: ${name}`); - } - - op.setOpId(opId); - } + Deno.core.setAsyncHandler(handleAsyncMsgFromRust); + Deno.core.initOps(); Deno.core.print("http_bench.js start\n"); diff --git a/core/ops.rs b/core/ops.rs index 7d76f0d6d38989..73abad1f157bde 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -53,10 +53,7 @@ impl OpRegistry { } pub fn get_op_map(&self) -> HashMap { - let mut op_map = self.op_map.clone(); - // Don't send "get_op_map" to JS, if JS encounters unknown op it should throw. - op_map.remove("get_op_map"); - op_map + self.op_map.clone() } pub fn register_op( @@ -101,6 +98,7 @@ fn test_op_registry() { assert!(op_id != 0); let mut expected_map = HashMap::new(); + expected_map.insert("get_op_map".to_string(), 0); expected_map.insert("test".to_string(), 1); let op_map = op_registry.get_op_map(); assert_eq!(op_map, expected_map); diff --git a/core/shared_queue.js b/core/shared_queue.js index 2cea811482c6b0..a27c6945ba06f6 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -39,6 +39,7 @@ SharedQueue Binary Layout let sharedBytes; let shared32; let initialized = false; + let opsMap = {}; function maybeInit() { if (!initialized) { @@ -58,10 +59,12 @@ SharedQueue Binary Layout Deno.core.recv(handleAsyncMsgFromRust); } - function getOps() { + function initOps() { const opsMapBytes = Deno.core.send(0, new Uint8Array([]), null); const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); - return JSON.parse(opsMapJson); + for (const [key, value] of Object.entries(JSON.parse(opsMapJson))) { + opsMap[key] = value; + } } function assert(cond) { @@ -195,7 +198,8 @@ SharedQueue Binary Layout reset, shift }, - getOps + initOps, + ops: opsMap }; assert(window[GLOBAL_NAMESPACE] != null); diff --git a/deno_typescript/lib.deno_core.d.ts b/deno_typescript/lib.deno_core.d.ts index c1b1f7ef9c3ecf..5afe7e9caccc6e 100644 --- a/deno_typescript/lib.deno_core.d.ts +++ b/deno_typescript/lib.deno_core.d.ts @@ -37,7 +37,8 @@ declare interface DenoCore { shift(): Uint8Array | null; }; - getOps(): Record; + initOps(): void; + ops: Record; recv(cb: MessageCallback): void; From 62fc1836a0e2c7e65002e2e1ff51ca81ab0f4b76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 26 Sep 2019 13:17:48 +0200 Subject: [PATCH 28/34] use proxy --- core/shared_queue.js | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/core/shared_queue.js b/core/shared_queue.js index a27c6945ba06f6..40db47416854fa 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -39,7 +39,7 @@ SharedQueue Binary Layout let sharedBytes; let shared32; let initialized = false; - let opsMap = {}; + const opsMap = {}; function maybeInit() { if (!initialized) { @@ -67,6 +67,23 @@ SharedQueue Binary Layout } } + const opsMapProxy = new Proxy(opsMap, { + get(target, key) { + const opId = target[key]; + + if (!opId) { + throw new Error( + `Unknown op: "${key}". Did you forget to initialize ops with Deno.core.initOps()?` + ); + } + + return opId; + }, + set() { + throw new Error("Setting op id is not allowed."); + } + }); + function assert(cond) { if (!cond) { throw Error("assert"); @@ -199,7 +216,7 @@ SharedQueue Binary Layout shift }, initOps, - ops: opsMap + ops: opsMapProxy }; assert(window[GLOBAL_NAMESPACE] != null); From 364944530998f82730dadbae894ca51f7bd2363f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 26 Sep 2019 15:32:06 +0200 Subject: [PATCH 29/34] Deno.core.ops --- core/examples/http_bench.js | 12 +++++----- core/shared_queue.js | 35 +++++++++++++----------------- deno_typescript/lib.deno_core.d.ts | 6 +++-- 3 files changed, 25 insertions(+), 28 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index a6f55a11b9e88d..96fae80f79901e 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -75,12 +75,12 @@ function handleAsyncMsgFromRust(opId, buf) { /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { - return sendSync(Deno.core.ops["listen"], -1); + return sendSync(Deno.core.ops.get("listen"), -1); } /** Accepts a connection, returns rid. */ async function accept(rid) { - return await sendAsync(Deno.core.ops["accept"], rid); + return await sendAsync(Deno.core.ops.get("accept"), rid); } /** @@ -88,16 +88,16 @@ async function accept(rid) { * Returns bytes read. */ async function read(rid, data) { - return await sendAsync(Deno.core.ops["read"], rid, data); + return await sendAsync(Deno.core.ops.get("read"), rid, data); } /** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ async function write(rid, data) { - return await sendAsync(Deno.core.ops["write"], rid, data); + return await sendAsync(Deno.core.ops.get("write"), rid, data); } function close(rid) { - return sendSync(Deno.core.ops["close"], rid); + return sendSync(Deno.core.ops.get("close"), rid); } async function serve(rid) { @@ -117,7 +117,7 @@ async function serve(rid) { async function main() { Deno.core.setAsyncHandler(handleAsyncMsgFromRust); - Deno.core.initOps(); + Deno.core.ops.init(); Deno.core.print("http_bench.js start\n"); diff --git a/core/shared_queue.js b/core/shared_queue.js index 40db47416854fa..9fa642b3437057 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -39,7 +39,7 @@ SharedQueue Binary Layout let sharedBytes; let shared32; let initialized = false; - const opsMap = {}; + let opsMap = {}; function maybeInit() { if (!initialized) { @@ -62,27 +62,20 @@ SharedQueue Binary Layout function initOps() { const opsMapBytes = Deno.core.send(0, new Uint8Array([]), null); const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); - for (const [key, value] of Object.entries(JSON.parse(opsMapJson))) { - opsMap[key] = value; - } + opsMap = JSON.parse(opsMapJson); } - const opsMapProxy = new Proxy(opsMap, { - get(target, key) { - const opId = target[key]; - - if (!opId) { - throw new Error( - `Unknown op: "${key}". Did you forget to initialize ops with Deno.core.initOps()?` - ); - } + function getOpId(name) { + const opId = opsMap[name]; - return opId; - }, - set() { - throw new Error("Setting op id is not allowed."); + if (!opId) { + throw new Error( + `Unknown op: "${key}". Did you forget to initialize ops with Deno.core.ops.init()?` + ); } - }); + + return opId; + } function assert(cond) { if (!cond) { @@ -215,8 +208,10 @@ SharedQueue Binary Layout reset, shift }, - initOps, - ops: opsMapProxy + ops: { + init: initOps, + get: getOpId + } }; assert(window[GLOBAL_NAMESPACE] != null); diff --git a/deno_typescript/lib.deno_core.d.ts b/deno_typescript/lib.deno_core.d.ts index 5afe7e9caccc6e..423e53578fc2d5 100644 --- a/deno_typescript/lib.deno_core.d.ts +++ b/deno_typescript/lib.deno_core.d.ts @@ -37,8 +37,10 @@ declare interface DenoCore { shift(): Uint8Array | null; }; - initOps(): void; - ops: Record; + ops: { + init(): void; + get(name: string): number; + }; recv(cb: MessageCallback): void; From 0c311c12513c91275ddc393ace5f0ce12ac7ddf8 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 26 Sep 2019 11:48:32 -0400 Subject: [PATCH 30/34] cleanup --- core/isolate.rs | 5 +++- core/ops.rs | 74 ++++++++++++++++++++++++------------------------- 2 files changed, 40 insertions(+), 39 deletions(-) diff --git a/core/isolate.rs b/core/isolate.rs index dd2e457acc5eb7..6c2e20f2f4ffb7 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -250,7 +250,10 @@ impl Isolate { /// /// Ops added using this method are only usable if `dispatch` is not set /// (using `set_dispatch` method). - pub fn register_op(&mut self, name: &str, op: Box) -> OpId { + pub fn register_op(&mut self, name: &str, op: F) -> OpId + where + F: Fn(&[u8], Option) -> CoreOp + Send + Sync + 'static, + { assert!( self.dispatch.is_none(), "set_dispatch should not be used in conjunction with register_op" diff --git a/core/ops.rs b/core/ops.rs index 73abad1f157bde..0883e2ca62e1ed 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -31,36 +31,21 @@ pub struct OpRegistry { pub op_map: HashMap, } -fn op_noop(_control: &[u8], _zero_copy_buf: Option) -> CoreOp { - Op::Sync(Box::new([])) -} - -fn op_get_op_map(op_registry: &OpRegistry) -> CoreOp { - let op_map = op_registry.get_op_map(); - let op_map_json = serde_json::to_string(&op_map).unwrap(); - let buf = op_map_json.as_bytes().to_owned().into_boxed_slice(); - Op::Sync(buf) -} - impl OpRegistry { pub fn new() -> Self { let mut registry = Self::default(); - // Add single noop symbolizing "get_op_map" function. The actual - // handling is done in `call_op` method. - let op_id = registry.register_op("get_op_map", Box::new(op_noop)); + let op_id = registry.register_op("get_op_map", |_, _| { + // get_op_map is a special op which is handled in call_op. + unreachable!() + }); assert_eq!(op_id, 0); registry } - pub fn get_op_map(&self) -> HashMap { - self.op_map.clone() - } - - pub fn register_op( - &mut self, - name: &str, - serialized_op: Box, - ) -> OpId { + pub fn register_op(&mut self, name: &str, op: F) -> OpId + where + F: Fn(&[u8], Option) -> CoreOp + Send + Sync + 'static, + { let op_id = self.ops.len() as u32; let existing = self.op_map.insert(name.to_string(), op_id); @@ -69,21 +54,26 @@ impl OpRegistry { format!("Op already registered: {}", name) ); - self.ops.push(serialized_op); + self.ops.push(Box::new(op)); op_id } + fn json_map(&self) -> Buf { + let op_map_json = serde_json::to_string(&self.op_map).unwrap(); + op_map_json.as_bytes().to_owned().into_boxed_slice() + } + pub fn call_op( &self, op_id: OpId, control: &[u8], zero_copy_buf: Option, ) -> CoreOp { - // Op with id 0 has special meaning - it's a special op that is - // always provided to retrieve op id map. - // Op id map consists of name to `OpId` mappings. + // Op with id 0 has special meaning - it's a special op that is always + // provided to retrieve op id map. The map consists of name to `OpId` + // mappings. if op_id == 0 { - return op_get_op_map(self); + return Op::Sync(self.json_map()); } let op_handler = &*self.ops.get(op_id as usize).expect("Op not found!"); @@ -93,21 +83,29 @@ impl OpRegistry { #[test] fn test_op_registry() { + use std::sync::atomic; + use std::sync::Arc; let mut op_registry = OpRegistry::new(); - let op_id = op_registry.register_op("test", Box::new(op_noop)); - assert!(op_id != 0); + + let c = Arc::new(atomic::AtomicUsize::new(0)); + let c_ = c.clone(); + + let test_id = op_registry.register_op("test", move |_, _| { + c_.fetch_add(1, atomic::Ordering::SeqCst); + CoreOp::Sync(Box::new([])) + }); + assert!(test_id != 0); let mut expected_map = HashMap::new(); expected_map.insert("get_op_map".to_string(), 0); expected_map.insert("test".to_string(), 1); - let op_map = op_registry.get_op_map(); - assert_eq!(op_map, expected_map); + assert_eq!(op_registry.op_map, expected_map); - let res = op_registry.call_op(1, &[], None); - match res { - Op::Sync(buf) => { - assert_eq!(buf.len(), 0); - } - _ => panic!(), + let res = op_registry.call_op(test_id, &[], None); + if let Op::Sync(buf) = res { + assert_eq!(buf.len(), 0); + } else { + unreachable!(); } + assert_eq!(c.load(atomic::Ordering::SeqCst), 1); } From f26d8397818db4604b4ede87cc8d339aa9ae78a4 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 26 Sep 2019 12:13:12 -0400 Subject: [PATCH 31/34] fix example --- core/examples/http_bench.rs | 67 ++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index f9a0e0029425e1..c019d8a1197f5f 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -103,40 +103,39 @@ pub type HttpOp = dyn Future + Send; pub type HttpOpHandler = fn(record: Record, zero_copy_buf: Option) -> Box; -fn http_op(handler: HttpOpHandler) -> Box { - let serialized_op = - move |control: &[u8], zero_copy_buf: Option| -> CoreOp { - let record = Record::from(control); - let is_sync = record.promise_id == 0; - let op = handler(record.clone(), zero_copy_buf); - - let mut record_a = record.clone(); - let mut record_b = record.clone(); - - let fut = Box::new( - op.and_then(move |result| { - record_a.result = result; - Ok(record_a) - }) - .or_else(|err| -> Result { - eprintln!("unexpected err {}", err); - record_b.result = -1; - Ok(record_b) - }) - .then(|result| -> Result { - let record = result.unwrap(); - Ok(record.into()) - }), - ); - - if is_sync { - Op::Sync(fut.wait().unwrap()) - } else { - Op::Async(fut) - } - }; - - Box::new(serialized_op) +fn http_op( + handler: HttpOpHandler, +) -> impl Fn(&[u8], Option) -> CoreOp { + move |control: &[u8], zero_copy_buf: Option| -> CoreOp { + let record = Record::from(control); + let is_sync = record.promise_id == 0; + let op = handler(record.clone(), zero_copy_buf); + + let mut record_a = record.clone(); + let mut record_b = record.clone(); + + let fut = Box::new( + op.and_then(move |result| { + record_a.result = result; + Ok(record_a) + }) + .or_else(|err| -> Result { + eprintln!("unexpected err {}", err); + record_b.result = -1; + Ok(record_b) + }) + .then(|result| -> Result { + let record = result.unwrap(); + Ok(record.into()) + }), + ); + + if is_sync { + Op::Sync(fut.wait().unwrap()) + } else { + Op::Async(fut) + } + } } fn main() { From 6d5ad043b37c31a4ffd0964c74b652756d380fb8 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 26 Sep 2019 12:20:27 -0400 Subject: [PATCH 32/34] cleanup --- core/examples/http_bench.js | 14 ++++++++------ core/shared_queue.js | 23 ++++------------------- 2 files changed, 12 insertions(+), 25 deletions(-) diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 96fae80f79901e..a7142b09dc0cc4 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -75,12 +75,12 @@ function handleAsyncMsgFromRust(opId, buf) { /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { - return sendSync(Deno.core.ops.get("listen"), -1); + return sendSync(ops["listen"], -1); } /** Accepts a connection, returns rid. */ async function accept(rid) { - return await sendAsync(Deno.core.ops.get("accept"), rid); + return await sendAsync(ops["accept"], rid); } /** @@ -88,16 +88,16 @@ async function accept(rid) { * Returns bytes read. */ async function read(rid, data) { - return await sendAsync(Deno.core.ops.get("read"), rid, data); + return await sendAsync(ops["read"], rid, data); } /** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ async function write(rid, data) { - return await sendAsync(Deno.core.ops.get("write"), rid, data); + return await sendAsync(ops["write"], rid, data); } function close(rid) { - return sendSync(Deno.core.ops.get("close"), rid); + return sendSync(ops["close"], rid); } async function serve(rid) { @@ -115,9 +115,11 @@ async function serve(rid) { close(rid); } +let ops; + async function main() { Deno.core.setAsyncHandler(handleAsyncMsgFromRust); - Deno.core.ops.init(); + ops = Deno.core.ops(); Deno.core.print("http_bench.js start\n"); diff --git a/core/shared_queue.js b/core/shared_queue.js index 9fa642b3437057..7eeb612550719a 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -39,7 +39,6 @@ SharedQueue Binary Layout let sharedBytes; let shared32; let initialized = false; - let opsMap = {}; function maybeInit() { if (!initialized) { @@ -59,22 +58,11 @@ SharedQueue Binary Layout Deno.core.recv(handleAsyncMsgFromRust); } - function initOps() { + function ops() { + // op id 0 is a special value to retreive the map of registered ops. const opsMapBytes = Deno.core.send(0, new Uint8Array([]), null); const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); - opsMap = JSON.parse(opsMapJson); - } - - function getOpId(name) { - const opId = opsMap[name]; - - if (!opId) { - throw new Error( - `Unknown op: "${key}". Did you forget to initialize ops with Deno.core.ops.init()?` - ); - } - - return opId; + return JSON.parse(opsMapJson); } function assert(cond) { @@ -208,10 +196,7 @@ SharedQueue Binary Layout reset, shift }, - ops: { - init: initOps, - get: getOpId - } + ops }; assert(window[GLOBAL_NAMESPACE] != null); From e4228fc71ad73fa7104c81bff17162a748ab30ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Fri, 27 Sep 2019 11:42:28 +0200 Subject: [PATCH 33/34] assert --- core/isolate.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/isolate.rs b/core/isolate.rs index 6c2e20f2f4ffb7..6e7996d7f25e4f 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -333,7 +333,10 @@ impl Isolate { let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; let op = if let Some(ref f) = isolate.dispatch { - assert!(op_id != 0); + assert!( + op_id != 0, + "op_id 0 is a special value that shouldn't be used with dispatch" + ); f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf)) } else { isolate.op_registry.call_op( From 4e8009bebbda7dbc69285a24551929d3346151ca Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Mon, 30 Sep 2019 12:25:29 -0400 Subject: [PATCH 34/34] clean up --- core/isolate.rs | 4 ++-- core/ops.rs | 38 +++++++++++++++++++------------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/core/isolate.rs b/core/isolate.rs index 6e7996d7f25e4f..6795f25f06bfbe 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -258,7 +258,7 @@ impl Isolate { self.dispatch.is_none(), "set_dispatch should not be used in conjunction with register_op" ); - self.op_registry.register_op(name, op) + self.op_registry.register(name, op) } pub fn set_dyn_import(&mut self, f: F) @@ -339,7 +339,7 @@ impl Isolate { ); f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf)) } else { - isolate.op_registry.call_op( + isolate.op_registry.call( op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf), diff --git a/core/ops.rs b/core/ops.rs index 0883e2ca62e1ed..84c15e096cb528 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -23,47 +23,47 @@ pub type CoreError = (); pub type CoreOp = Op; /// Main type describing op -pub type CoreOpHandler = dyn Fn(&[u8], Option) -> CoreOp; +type OpDispatcher = dyn Fn(&[u8], Option) -> CoreOp; #[derive(Default)] pub struct OpRegistry { - pub ops: Vec>, - pub op_map: HashMap, + dispatchers: Vec>, + name_to_id: HashMap, } impl OpRegistry { pub fn new() -> Self { let mut registry = Self::default(); - let op_id = registry.register_op("get_op_map", |_, _| { - // get_op_map is a special op which is handled in call_op. + let op_id = registry.register("ops", |_, _| { + // ops is a special op which is handled in call. unreachable!() }); assert_eq!(op_id, 0); registry } - pub fn register_op(&mut self, name: &str, op: F) -> OpId + pub fn register(&mut self, name: &str, op: F) -> OpId where F: Fn(&[u8], Option) -> CoreOp + Send + Sync + 'static, { - let op_id = self.ops.len() as u32; + let op_id = self.dispatchers.len() as u32; - let existing = self.op_map.insert(name.to_string(), op_id); + let existing = self.name_to_id.insert(name.to_string(), op_id); assert!( existing.is_none(), format!("Op already registered: {}", name) ); - self.ops.push(Box::new(op)); + self.dispatchers.push(Box::new(op)); op_id } fn json_map(&self) -> Buf { - let op_map_json = serde_json::to_string(&self.op_map).unwrap(); + let op_map_json = serde_json::to_string(&self.name_to_id).unwrap(); op_map_json.as_bytes().to_owned().into_boxed_slice() } - pub fn call_op( + pub fn call( &self, op_id: OpId, control: &[u8], @@ -76,8 +76,8 @@ impl OpRegistry { return Op::Sync(self.json_map()); } - let op_handler = &*self.ops.get(op_id as usize).expect("Op not found!"); - op_handler(control, zero_copy_buf) + let d = &*self.dispatchers.get(op_id as usize).expect("Op not found!"); + d(control, zero_copy_buf) } } @@ -90,18 +90,18 @@ fn test_op_registry() { let c = Arc::new(atomic::AtomicUsize::new(0)); let c_ = c.clone(); - let test_id = op_registry.register_op("test", move |_, _| { + let test_id = op_registry.register("test", move |_, _| { c_.fetch_add(1, atomic::Ordering::SeqCst); CoreOp::Sync(Box::new([])) }); assert!(test_id != 0); - let mut expected_map = HashMap::new(); - expected_map.insert("get_op_map".to_string(), 0); - expected_map.insert("test".to_string(), 1); - assert_eq!(op_registry.op_map, expected_map); + let mut expected = HashMap::new(); + expected.insert("ops".to_string(), 0); + expected.insert("test".to_string(), 1); + assert_eq!(op_registry.name_to_id, expected); - let res = op_registry.call_op(test_id, &[], None); + let res = op_registry.call(test_id, &[], None); if let Op::Sync(buf) = res { assert_eq!(buf.len(), 0); } else {