Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Minimal op registration in isolate #3002

Merged
merged 35 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f5a11a4
minimal op registration in isolate
bartlomieju Sep 20, 2019
6375869
mock getOpMap
bartlomieju Sep 20, 2019
425e83d
minimal op map fetching
bartlomieju Sep 20, 2019
ac1df44
add Deno.core.refreshOpsMap
bartlomieju Sep 20, 2019
9cc3b31
phone_book -> op_map
bartlomieju Sep 21, 2019
ef61161
minimal js ops
bartlomieju Sep 21, 2019
c64cf04
dual dispatch
bartlomieju Sep 21, 2019
6480d8a
move Op to Deno.core
bartlomieju Sep 21, 2019
1c45ed9
move op related types to core/ops.rs
bartlomieju Sep 21, 2019
e33d050
fixes
bartlomieju Sep 21, 2019
04e4516
reset CI
bartlomieju Sep 22, 2019
94c3bdb
Deno.core.getOps()
bartlomieju Sep 24, 2019
42f0630
fix shared queue
bartlomieju Sep 24, 2019
a97827b
cleanup
bartlomieju Sep 24, 2019
5f3f7ec
Merge branch 'master' into feat-register_op
ry Sep 25, 2019
c641e12
review
bartlomieju Sep 25, 2019
de98a7b
add comment
bartlomieju Sep 25, 2019
b0da81f
comments
bartlomieju Sep 25, 2019
4ccae4a
move call_op to OpRegistry
bartlomieju Sep 25, 2019
fc43e8a
fix
bartlomieju Sep 25, 2019
e28507f
update third_party
bartlomieju Sep 25, 2019
86f53b2
clippy
bartlomieju Sep 25, 2019
8e720ee
review part 2
bartlomieju Sep 25, 2019
eeeb828
review part 3
bartlomieju Sep 25, 2019
ef5500f
register_op
bartlomieju Sep 25, 2019
7db809a
reset CI
bartlomieju Sep 26, 2019
c37310a
reset CI
bartlomieju Sep 26, 2019
08f9c4f
Deno.core.ops
bartlomieju Sep 26, 2019
62fc183
use proxy
bartlomieju Sep 26, 2019
3649445
Deno.core.ops
bartlomieju Sep 26, 2019
0c311c1
cleanup
ry Sep 26, 2019
f26d839
fix example
ry Sep 26, 2019
6d5ad04
cleanup
ry Sep 26, 2019
e4228fc
assert
bartlomieju Sep 27, 2019
4e8009b
clean up
ry Sep 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions core/examples/http_bench.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
// This is not a real HTTP server. We read blindly one time into 'requestBuf',
// then write this fixed 'responseBuf'. The point of this benchmark is to
// exercise the event loop in a simple yet semi-realistic way.
const OP_LISTEN = 1;
const OP_ACCEPT = 2;
const OP_READ = 3;
const OP_WRITE = 4;
const OP_CLOSE = 5;
const requestBuf = new Uint8Array(64 * 1024);
const responseBuf = new Uint8Array(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
Expand Down Expand Up @@ -80,29 +75,29 @@ function handleAsyncMsgFromRust(opId, buf) {

/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
return sendSync(OP_LISTEN, -1);
return sendSync(ops["listen"], -1);
}

/** Accepts a connection, returns rid. */
async function accept(rid) {
return await sendAsync(OP_ACCEPT, rid);
return await sendAsync(ops["accept"], rid);
}

/**
* Reads a packet from the rid, presumably an http request. data is ignored.
* Returns bytes read.
*/
async function read(rid, data) {
return await sendAsync(OP_READ, rid, data);
return await sendAsync(ops["read"], rid, data);
}

/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
async function write(rid, data) {
return await sendAsync(OP_WRITE, rid, data);
return await sendAsync(ops["write"], rid, data);
}

function close(rid) {
return sendSync(OP_CLOSE, rid);
return sendSync(ops["close"], rid);
}

async function serve(rid) {
Expand All @@ -120,8 +115,11 @@ async function serve(rid) {
close(rid);
}

let ops;

async function main() {
Deno.core.setAsyncHandler(handleAsyncMsgFromRust);
ops = Deno.core.ops();

Deno.core.print("http_bench.js start\n");

Expand Down
99 changes: 40 additions & 59 deletions core/examples/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ impl log::Log for Logger {
fn flush(&self) {}
}

const OP_LISTEN: OpId = 1;
const OP_ACCEPT: OpId = 2;
const OP_READ: OpId = 3;
const OP_WRITE: OpId = 4;
const OP_CLOSE: OpId = 5;

#[derive(Clone, Debug, PartialEq)]
pub struct Record {
pub promise_id: i32,
Expand Down Expand Up @@ -104,48 +98,24 @@ fn test_record_from() {
// TODO test From<&[u8]> for Record
}

pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;

fn dispatch(
op_id: OpId,
control: &[u8],
zero_copy_buf: Option<PinnedBuf>,
) -> CoreOp {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let http_bench_op = match op_id {
OP_LISTEN => {
assert!(is_sync);
op_listen()
}
OP_CLOSE => {
assert!(is_sync);
let rid = record.arg;
op_close(rid)
}
OP_ACCEPT => {
assert!(!is_sync);
let listener_rid = record.arg;
op_accept(listener_rid)
}
OP_READ => {
assert!(!is_sync);
let rid = record.arg;
op_read(rid, zero_copy_buf)
}
OP_WRITE => {
assert!(!is_sync);
let rid = record.arg;
op_write(rid, zero_copy_buf)
}
_ => panic!("bad op {}", op_id),
};
let mut record_a = record.clone();
let mut record_b = record.clone();
pub type HttpOp = dyn Future<Item = i32, Error = std::io::Error> + Send;

let fut = Box::new(
http_bench_op
.and_then(move |result| {
pub type HttpOpHandler =
fn(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp>;

fn http_op(
handler: HttpOpHandler,
) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp {
move |control: &[u8], zero_copy_buf: Option<PinnedBuf>| -> CoreOp {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let op = handler(record.clone(), zero_copy_buf);

let mut record_a = record.clone();
let mut record_b = record.clone();

let fut = Box::new(
op.and_then(move |result| {
record_a.result = result;
Ok(record_a)
})
Expand All @@ -158,12 +128,13 @@ fn dispatch(
let record = result.unwrap();
Ok(record.into())
}),
);
);

if is_sync {
Op::Sync(fut.wait().unwrap())
} else {
Op::Async(fut)
if is_sync {
Op::Sync(fut.wait().unwrap())
} else {
Op::Async(fut)
}
}
}

Expand All @@ -181,7 +152,11 @@ fn main() {
});

let mut isolate = deno::Isolate::new(startup_data, false);
isolate.set_dispatch(dispatch);
isolate.register_op("listen", http_op(op_listen));
isolate.register_op("accept", http_op(op_accept));
isolate.register_op("read", http_op(op_read));
isolate.register_op("write", http_op(op_write));
isolate.register_op("close", http_op(op_close));

isolate.then(|r| {
js_check(r);
Expand Down Expand Up @@ -225,7 +200,8 @@ fn new_rid() -> i32 {
rid as i32
}

fn op_accept(listener_rid: i32) -> Box<HttpBenchOp> {
fn op_accept(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
let listener_rid = record.arg;
debug!("accept {}", listener_rid);
Box::new(
futures::future::poll_fn(move || {
Expand All @@ -248,9 +224,11 @@ fn op_accept(listener_rid: i32) -> Box<HttpBenchOp> {
)
}

fn op_listen() -> Box<HttpBenchOp> {
fn op_listen(
_record: Record,
_zero_copy_buf: Option<PinnedBuf>,
) -> Box<HttpOp> {
debug!("listen");

Box::new(lazy(move || {
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let listener = tokio::net::TcpListener::bind(&addr).unwrap();
Expand All @@ -262,8 +240,9 @@ fn op_listen() -> Box<HttpBenchOp> {
}))
}

fn op_close(rid: i32) -> Box<HttpBenchOp> {
fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
debug!("close");
let rid = record.arg;
Box::new(lazy(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let r = table.remove(&rid);
Expand All @@ -272,7 +251,8 @@ fn op_close(rid: i32) -> Box<HttpBenchOp> {
}))
}

fn op_read(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
let rid = record.arg;
debug!("read rid={}", rid);
let mut zero_copy_buf = zero_copy_buf.unwrap();
Box::new(
Expand All @@ -293,7 +273,8 @@ fn op_read(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
)
}

fn op_write(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
fn op_write(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
let rid = record.arg;
debug!("write rid={}", rid);
let zero_copy_buf = zero_copy_buf.unwrap();
Box::new(
Expand Down
52 changes: 32 additions & 20 deletions core/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use crate::libdeno::deno_buf;
use crate::libdeno::deno_dyn_import_id;
use crate::libdeno::deno_mod;
use crate::libdeno::deno_pinned_buf;
use crate::libdeno::OpId;
use crate::libdeno::PinnedBuf;
use crate::libdeno::Snapshot1;
use crate::libdeno::Snapshot2;
use crate::ops::*;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use futures::stream::FuturesUnordered;
Expand All @@ -34,24 +34,6 @@ use std::fmt;
use std::ptr::null;
use std::sync::{Arc, Mutex, Once};

pub type Buf = Box<[u8]>;

pub type OpAsyncFuture<E> = Box<dyn Future<Item = Buf, Error = E> + Send>;

type PendingOpFuture =
Box<dyn Future<Item = (OpId, Buf), Error = CoreError> + Send>;

pub enum Op<E> {
Sync(Buf),
Async(OpAsyncFuture<E>),
}

pub type CoreError = ();

pub type CoreOp = Op<CoreError>;

pub type OpResult<E> = Result<Op<E>, E>;

/// Args: op_id, control_buf, zero_copy_buf
type CoreDispatchFn = dyn Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp;

Expand Down Expand Up @@ -179,6 +161,7 @@ pub struct Isolate {
pending_dyn_imports: FuturesUnordered<StreamFuture<DynImport>>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
op_registry: OpRegistry,
}

unsafe impl Send for Isolate {}
Expand Down Expand Up @@ -244,19 +227,40 @@ impl Isolate {
have_unpolled_ops: false,
pending_dyn_imports: FuturesUnordered::new(),
startup_script,
op_registry: OpRegistry::new(),
}
}

/// Defines the how Deno.core.dispatch() acts.
/// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf
/// corresponds to the second argument of Deno.core.dispatch().
///
/// If this method is used then ops registered using `op_register` function are
/// ignored and all dispatching must be handled manually in provided callback.
// TODO: we want to deprecate and remove this API and move to `register_op` API
pub fn set_dispatch<F>(&mut self, f: F)
where
F: Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{
self.dispatch = Some(Arc::new(f));
}
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved

/// New dispatch mechanism. Requires runtime to explicitly ask for op ids
/// before using any of the ops.
///
/// Ops added using this method are only usable if `dispatch` is not set
/// (using `set_dispatch` method).
pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
where
F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{
assert!(
self.dispatch.is_none(),
"set_dispatch should not be used in conjunction with register_op"
);
self.op_registry.register(name, op)
}

pub fn set_dyn_import<F>(&mut self, f: F)
where
F: Fn(deno_dyn_import_id, &str, &str) -> DynImportStream
Expand Down Expand Up @@ -329,9 +333,17 @@ impl Isolate {
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };

let op = if let Some(ref f) = isolate.dispatch {
assert!(
op_id != 0,
"op_id 0 is a special value that shouldn't be used with dispatch"
);
f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf))
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
} else {
panic!("isolate.dispatch not set")
isolate.op_registry.call(
op_id,
control_buf.as_ref(),
PinnedBuf::new(zero_copy_buf),
)
};

debug_assert_eq!(isolate.shared.size(), 0);
Expand Down
2 changes: 2 additions & 0 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod js_errors;
mod libdeno;
mod module_specifier;
mod modules;
mod ops;
mod shared_queue;

pub use crate::any_error::*;
Expand All @@ -22,6 +23,7 @@ pub use crate::libdeno::OpId;
pub use crate::libdeno::PinnedBuf;
pub use crate::module_specifier::*;
pub use crate::modules::*;
pub use crate::ops::*;

pub fn v8_version() -> &'static str {
use std::ffi::CStr;
Expand Down
Loading