Skip to content

Commit

Permalink
feat: op registration in core (#3002)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartlomieju authored and ry committed Sep 30, 2019
1 parent ae26a9c commit ffbf0c2
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 92 deletions.
18 changes: 8 additions & 10 deletions core/examples/http_bench.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
// This is not a real HTTP server. We read blindly one time into 'requestBuf',
// then write this fixed 'responseBuf'. The point of this benchmark is to
// exercise the event loop in a simple yet semi-realistic way.
const OP_LISTEN = 1;
const OP_ACCEPT = 2;
const OP_READ = 3;
const OP_WRITE = 4;
const OP_CLOSE = 5;
const requestBuf = new Uint8Array(64 * 1024);
const responseBuf = new Uint8Array(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
Expand Down Expand Up @@ -80,29 +75,29 @@ function handleAsyncMsgFromRust(opId, buf) {

/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
return sendSync(OP_LISTEN, -1);
return sendSync(ops["listen"], -1);
}

/** Accepts a connection, returns rid. */
async function accept(rid) {
return await sendAsync(OP_ACCEPT, rid);
return await sendAsync(ops["accept"], rid);
}

/**
* Reads a packet from the rid, presumably an http request. data is ignored.
* Returns bytes read.
*/
async function read(rid, data) {
return await sendAsync(OP_READ, rid, data);
return await sendAsync(ops["read"], rid, data);
}

/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
async function write(rid, data) {
return await sendAsync(OP_WRITE, rid, data);
return await sendAsync(ops["write"], rid, data);
}

function close(rid) {
return sendSync(OP_CLOSE, rid);
return sendSync(ops["close"], rid);
}

async function serve(rid) {
Expand All @@ -120,8 +115,11 @@ async function serve(rid) {
close(rid);
}

let ops;

async function main() {
Deno.core.setAsyncHandler(handleAsyncMsgFromRust);
ops = Deno.core.ops();

Deno.core.print("http_bench.js start\n");

Expand Down
99 changes: 40 additions & 59 deletions core/examples/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ impl log::Log for Logger {
fn flush(&self) {}
}

const OP_LISTEN: OpId = 1;
const OP_ACCEPT: OpId = 2;
const OP_READ: OpId = 3;
const OP_WRITE: OpId = 4;
const OP_CLOSE: OpId = 5;

#[derive(Clone, Debug, PartialEq)]
pub struct Record {
pub promise_id: i32,
Expand Down Expand Up @@ -104,48 +98,24 @@ fn test_record_from() {
// TODO test From<&[u8]> for Record
}

pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;

fn dispatch(
op_id: OpId,
control: &[u8],
zero_copy_buf: Option<PinnedBuf>,
) -> CoreOp {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let http_bench_op = match op_id {
OP_LISTEN => {
assert!(is_sync);
op_listen()
}
OP_CLOSE => {
assert!(is_sync);
let rid = record.arg;
op_close(rid)
}
OP_ACCEPT => {
assert!(!is_sync);
let listener_rid = record.arg;
op_accept(listener_rid)
}
OP_READ => {
assert!(!is_sync);
let rid = record.arg;
op_read(rid, zero_copy_buf)
}
OP_WRITE => {
assert!(!is_sync);
let rid = record.arg;
op_write(rid, zero_copy_buf)
}
_ => panic!("bad op {}", op_id),
};
let mut record_a = record.clone();
let mut record_b = record.clone();
pub type HttpOp = dyn Future<Item = i32, Error = std::io::Error> + Send;

let fut = Box::new(
http_bench_op
.and_then(move |result| {
pub type HttpOpHandler =
fn(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp>;

fn http_op(
handler: HttpOpHandler,
) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp {
move |control: &[u8], zero_copy_buf: Option<PinnedBuf>| -> CoreOp {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let op = handler(record.clone(), zero_copy_buf);

let mut record_a = record.clone();
let mut record_b = record.clone();

let fut = Box::new(
op.and_then(move |result| {
record_a.result = result;
Ok(record_a)
})
Expand All @@ -158,12 +128,13 @@ fn dispatch(
let record = result.unwrap();
Ok(record.into())
}),
);
);

if is_sync {
Op::Sync(fut.wait().unwrap())
} else {
Op::Async(fut)
if is_sync {
Op::Sync(fut.wait().unwrap())
} else {
Op::Async(fut)
}
}
}

Expand All @@ -181,7 +152,11 @@ fn main() {
});

let mut isolate = deno::Isolate::new(startup_data, false);
isolate.set_dispatch(dispatch);
isolate.register_op("listen", http_op(op_listen));
isolate.register_op("accept", http_op(op_accept));
isolate.register_op("read", http_op(op_read));
isolate.register_op("write", http_op(op_write));
isolate.register_op("close", http_op(op_close));

isolate.then(|r| {
js_check(r);
Expand Down Expand Up @@ -225,7 +200,8 @@ fn new_rid() -> i32 {
rid as i32
}

fn op_accept(listener_rid: i32) -> Box<HttpBenchOp> {
fn op_accept(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
let listener_rid = record.arg;
debug!("accept {}", listener_rid);
Box::new(
futures::future::poll_fn(move || {
Expand All @@ -248,9 +224,11 @@ fn op_accept(listener_rid: i32) -> Box<HttpBenchOp> {
)
}

fn op_listen() -> Box<HttpBenchOp> {
fn op_listen(
_record: Record,
_zero_copy_buf: Option<PinnedBuf>,
) -> Box<HttpOp> {
debug!("listen");

Box::new(lazy(move || {
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let listener = tokio::net::TcpListener::bind(&addr).unwrap();
Expand All @@ -262,8 +240,9 @@ fn op_listen() -> Box<HttpBenchOp> {
}))
}

fn op_close(rid: i32) -> Box<HttpBenchOp> {
fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
debug!("close");
let rid = record.arg;
Box::new(lazy(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let r = table.remove(&rid);
Expand All @@ -272,7 +251,8 @@ fn op_close(rid: i32) -> Box<HttpBenchOp> {
}))
}

fn op_read(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
let rid = record.arg;
debug!("read rid={}", rid);
let mut zero_copy_buf = zero_copy_buf.unwrap();
Box::new(
Expand All @@ -293,7 +273,8 @@ fn op_read(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
)
}

fn op_write(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
fn op_write(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
let rid = record.arg;
debug!("write rid={}", rid);
let zero_copy_buf = zero_copy_buf.unwrap();
Box::new(
Expand Down
52 changes: 32 additions & 20 deletions core/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use crate::libdeno::deno_buf;
use crate::libdeno::deno_dyn_import_id;
use crate::libdeno::deno_mod;
use crate::libdeno::deno_pinned_buf;
use crate::libdeno::OpId;
use crate::libdeno::PinnedBuf;
use crate::libdeno::Snapshot1;
use crate::libdeno::Snapshot2;
use crate::ops::*;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use futures::stream::FuturesUnordered;
Expand All @@ -34,24 +34,6 @@ use std::fmt;
use std::ptr::null;
use std::sync::{Arc, Mutex, Once};

pub type Buf = Box<[u8]>;

pub type OpAsyncFuture<E> = Box<dyn Future<Item = Buf, Error = E> + Send>;

type PendingOpFuture =
Box<dyn Future<Item = (OpId, Buf), Error = CoreError> + Send>;

pub enum Op<E> {
Sync(Buf),
Async(OpAsyncFuture<E>),
}

pub type CoreError = ();

pub type CoreOp = Op<CoreError>;

pub type OpResult<E> = Result<Op<E>, E>;

/// Args: op_id, control_buf, zero_copy_buf
type CoreDispatchFn = dyn Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp;

Expand Down Expand Up @@ -179,6 +161,7 @@ pub struct Isolate {
pending_dyn_imports: FuturesUnordered<StreamFuture<DynImport>>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
op_registry: OpRegistry,
}

unsafe impl Send for Isolate {}
Expand Down Expand Up @@ -244,19 +227,40 @@ impl Isolate {
have_unpolled_ops: false,
pending_dyn_imports: FuturesUnordered::new(),
startup_script,
op_registry: OpRegistry::new(),
}
}

/// Defines the how Deno.core.dispatch() acts.
/// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf
/// corresponds to the second argument of Deno.core.dispatch().
///
/// If this method is used then ops registered using `op_register` function are
/// ignored and all dispatching must be handled manually in provided callback.
// TODO: we want to deprecate and remove this API and move to `register_op` API
pub fn set_dispatch<F>(&mut self, f: F)
where
F: Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{
self.dispatch = Some(Arc::new(f));
}

/// New dispatch mechanism. Requires runtime to explicitly ask for op ids
/// before using any of the ops.
///
/// Ops added using this method are only usable if `dispatch` is not set
/// (using `set_dispatch` method).
pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
where
F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{
assert!(
self.dispatch.is_none(),
"set_dispatch should not be used in conjunction with register_op"
);
self.op_registry.register(name, op)
}

pub fn set_dyn_import<F>(&mut self, f: F)
where
F: Fn(deno_dyn_import_id, &str, &str) -> DynImportStream
Expand Down Expand Up @@ -329,9 +333,17 @@ impl Isolate {
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };

let op = if let Some(ref f) = isolate.dispatch {
assert!(
op_id != 0,
"op_id 0 is a special value that shouldn't be used with dispatch"
);
f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf))
} else {
panic!("isolate.dispatch not set")
isolate.op_registry.call(
op_id,
control_buf.as_ref(),
PinnedBuf::new(zero_copy_buf),
)
};

debug_assert_eq!(isolate.shared.size(), 0);
Expand Down
2 changes: 2 additions & 0 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod js_errors;
mod libdeno;
mod module_specifier;
mod modules;
mod ops;
mod shared_queue;

pub use crate::any_error::*;
Expand All @@ -22,6 +23,7 @@ pub use crate::libdeno::OpId;
pub use crate::libdeno::PinnedBuf;
pub use crate::module_specifier::*;
pub use crate::modules::*;
pub use crate::ops::*;

pub fn v8_version() -> &'static str {
use std::ffi::CStr;
Expand Down
Loading

0 comments on commit ffbf0c2

Please sign in to comment.