Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Minimal op registration in isolate #3002

Merged
merged 35 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f5a11a4
minimal op registration in isolate
bartlomieju Sep 20, 2019
6375869
mock getOpMap
bartlomieju Sep 20, 2019
425e83d
minimal op map fetching
bartlomieju Sep 20, 2019
ac1df44
add Deno.core.refreshOpsMap
bartlomieju Sep 20, 2019
9cc3b31
phone_book -> op_map
bartlomieju Sep 21, 2019
ef61161
minimal js ops
bartlomieju Sep 21, 2019
c64cf04
dual dispatch
bartlomieju Sep 21, 2019
6480d8a
move Op to Deno.core
bartlomieju Sep 21, 2019
1c45ed9
move op related types to core/ops.rs
bartlomieju Sep 21, 2019
e33d050
fixes
bartlomieju Sep 21, 2019
04e4516
reset CI
bartlomieju Sep 22, 2019
94c3bdb
Deno.core.getOps()
bartlomieju Sep 24, 2019
42f0630
fix shared queue
bartlomieju Sep 24, 2019
a97827b
cleanup
bartlomieju Sep 24, 2019
5f3f7ec
Merge branch 'master' into feat-register_op
ry Sep 25, 2019
c641e12
review
bartlomieju Sep 25, 2019
de98a7b
add comment
bartlomieju Sep 25, 2019
b0da81f
comments
bartlomieju Sep 25, 2019
4ccae4a
move call_op to OpRegistry
bartlomieju Sep 25, 2019
fc43e8a
fix
bartlomieju Sep 25, 2019
e28507f
update third_party
bartlomieju Sep 25, 2019
86f53b2
clippy
bartlomieju Sep 25, 2019
8e720ee
review part 2
bartlomieju Sep 25, 2019
eeeb828
review part 3
bartlomieju Sep 25, 2019
ef5500f
register_op
bartlomieju Sep 25, 2019
7db809a
reset CI
bartlomieju Sep 26, 2019
c37310a
reset CI
bartlomieju Sep 26, 2019
08f9c4f
Deno.core.ops
bartlomieju Sep 26, 2019
62fc183
use proxy
bartlomieju Sep 26, 2019
3649445
Deno.core.ops
bartlomieju Sep 26, 2019
0c311c1
cleanup
ry Sep 26, 2019
f26d839
fix example
ry Sep 26, 2019
6d5ad04
cleanup
ry Sep 26, 2019
e4228fc
assert
bartlomieju Sep 27, 2019
4e8009b
clean up
ry Sep 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 78 additions & 36 deletions core/examples/http_bench.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,77 @@
// This is not a real HTTP server. We read blindly one time into 'requestBuf',
// then write this fixed 'responseBuf'. The point of this benchmark is to
// exercise the event loop in a simple yet semi-realistic way.
const OP_LISTEN = 1;
const OP_ACCEPT = 2;
const OP_READ = 3;
const OP_WRITE = 4;
const OP_CLOSE = 5;
const registry = {};
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved

function initOps(opsMap) {
for (const [name, opId] of Object.entries(opsMap)) {
const op = registry[name];

if (!op) {
continue;
}

op.setOpId(opId);
}
}

class Op {
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
constructor(name) {
if (typeof registry[name] !== "undefined") {
throw new Error(`Duplicate op: ${name}`);
}

this.name = name;
this.opId = 0;
registry[name] = this;
}

setOpId(opId) {
this.opId = opId;
}

static handleAsyncMsgFromRust(opId, buf) {
const record = recordFromBuf(buf);
const { promiseId } = record;
const p = promiseMap.get(promiseId);
promiseMap.delete(promiseId);
p.resolve(record);
}

static sendSync(opId, arg, zeroCopy) {
const buf = send(0, opId, arg, zeroCopy);
ry marked this conversation as resolved.
Show resolved Hide resolved
return recordFromBuf(buf);
}

static sendAsync(opId, arg, zeroCopy = null) {
const promiseId = nextPromiseId++;
const p = createResolvable();
promiseMap.set(promiseId, p);
send(promiseId, opId, arg, zeroCopy);
return p;
}
}

class HttpOp extends Op {
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
/** Returns i32 number */
sendSync(arg, zeroCopy = null) {
const res = HttpOp.sendSync(this.opId, arg, zeroCopy);
return res.result;
}

/** Returns Promise<number> */
async sendAsync(arg, zeroCopy = null) {
const res = await HttpOp.sendAsync(this.opId, arg, zeroCopy);
return res.result;
}
}

const OP_LISTEN = new HttpOp("listen");
const OP_ACCEPT = new HttpOp("accept");
const OP_READ = new HttpOp("read");
const OP_WRITE = new HttpOp("write");
const OP_CLOSE = new HttpOp("close");
ry marked this conversation as resolved.
Show resolved Hide resolved

const requestBuf = new Uint8Array(64 * 1024);
const responseBuf = new Uint8Array(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
Expand Down Expand Up @@ -44,15 +110,6 @@ function send(promiseId, opId, arg, zeroCopy = null) {
return Deno.core.dispatch(opId, scratchBytes, zeroCopy);
}

/** Returns Promise<number> */
function sendAsync(opId, arg, zeroCopy = null) {
const promiseId = nextPromiseId++;
const p = createResolvable();
promiseMap.set(promiseId, p);
send(promiseId, opId, arg, zeroCopy);
return p;
}

function recordFromBuf(buf) {
assert(buf.byteLength === 3 * 4);
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
Expand All @@ -63,46 +120,31 @@ function recordFromBuf(buf) {
};
}

/** Returns i32 number */
function sendSync(opId, arg) {
const buf = send(0, opId, arg);
const record = recordFromBuf(buf);
return record.result;
}

function handleAsyncMsgFromRust(opId, buf) {
const record = recordFromBuf(buf);
const { promiseId, result } = record;
const p = promiseMap.get(promiseId);
promiseMap.delete(promiseId);
p.resolve(result);
}

/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
return sendSync(OP_LISTEN, -1);
return OP_LISTEN.sendSync(-1);
}

/** Accepts a connection, returns rid. */
async function accept(rid) {
return await sendAsync(OP_ACCEPT, rid);
return await OP_ACCEPT.sendAsync(rid);
}

/**
* Reads a packet from the rid, presumably an http request. data is ignored.
* Returns bytes read.
*/
async function read(rid, data) {
return await sendAsync(OP_READ, rid, data);
return await OP_READ.sendAsync(rid, data);
}

/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
async function write(rid, data) {
return await sendAsync(OP_WRITE, rid, data);
return await OP_WRITE.sendAsync(rid, data);
}

function close(rid) {
return sendSync(OP_CLOSE, rid);
return OP_CLOSE.sendSync(rid);
}

async function serve(rid) {
Expand All @@ -121,8 +163,8 @@ async function serve(rid) {
}

async function main() {
Deno.core.setAsyncHandler(handleAsyncMsgFromRust);

Deno.core.setAsyncHandler(Op.handleAsyncMsgFromRust);
initOps(Deno.core.getOps());
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
Deno.core.print("http_bench.js start\n");

const listenerRid = listen();
Expand Down
149 changes: 78 additions & 71 deletions core/examples/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ impl log::Log for Logger {
fn flush(&self) {}
}

const OP_LISTEN: OpId = 1;
const OP_ACCEPT: OpId = 2;
const OP_READ: OpId = 3;
const OP_WRITE: OpId = 4;
const OP_CLOSE: OpId = 5;

#[derive(Clone, Debug, PartialEq)]
pub struct Record {
pub promise_id: i32,
Expand Down Expand Up @@ -104,67 +98,48 @@ fn test_record_from() {
// TODO test From<&[u8]> for Record
}

pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
pub type HttpOp = dyn Future<Item = i32, Error = std::io::Error> + Send;

fn dispatch(
op_id: OpId,
control: &[u8],
pub type HttpOpHandler = fn(
is_sync: bool,
record: Record,
zero_copy_buf: Option<PinnedBuf>,
) -> CoreOp {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let http_bench_op = match op_id {
OP_LISTEN => {
assert!(is_sync);
op_listen()
}
OP_CLOSE => {
assert!(is_sync);
let rid = record.arg;
op_close(rid)
}
OP_ACCEPT => {
assert!(!is_sync);
let listener_rid = record.arg;
op_accept(listener_rid)
}
OP_READ => {
assert!(!is_sync);
let rid = record.arg;
op_read(rid, zero_copy_buf)
}
OP_WRITE => {
assert!(!is_sync);
let rid = record.arg;
op_write(rid, zero_copy_buf)
}
_ => panic!("bad op {}", op_id),
};
let mut record_a = record.clone();
let mut record_b = record.clone();

let fut = Box::new(
http_bench_op
.and_then(move |result| {
record_a.result = result;
Ok(record_a)
})
.or_else(|err| -> Result<Record, ()> {
eprintln!("unexpected err {}", err);
record_b.result = -1;
Ok(record_b)
})
.then(|result| -> Result<Buf, ()> {
let record = result.unwrap();
Ok(record.into())
}),
);
) -> Box<HttpOp>;

fn serialize_http_op(handler: HttpOpHandler) -> Box<CoreOpHandler> {
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
let serialized_op =
move |control: &[u8], zero_copy_buf: Option<PinnedBuf>| -> CoreOp {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let op = handler(is_sync, record.clone(), zero_copy_buf);

let mut record_a = record.clone();
let mut record_b = record.clone();

let fut = Box::new(
op.and_then(move |result| {
record_a.result = result;
Ok(record_a)
})
.or_else(|err| -> Result<Record, ()> {
eprintln!("unexpected err {}", err);
record_b.result = -1;
Ok(record_b)
})
.then(|result| -> Result<Buf, ()> {
let record = result.unwrap();
Ok(record.into())
}),
);

if is_sync {
Op::Sync(fut.wait().unwrap())
} else {
Op::Async(fut)
}
};

if is_sync {
Op::Sync(fut.wait().unwrap())
} else {
Op::Async(fut)
}
Box::new(serialized_op)
}

fn main() {
Expand All @@ -181,7 +156,11 @@ fn main() {
});

let mut isolate = deno::Isolate::new(startup_data, false);
isolate.set_dispatch(dispatch);
isolate.register_op("listen", serialize_http_op(op_listen));
isolate.register_op("accept", serialize_http_op(op_accept));
isolate.register_op("read", serialize_http_op(op_read));
isolate.register_op("write", serialize_http_op(op_write));
isolate.register_op("close", serialize_http_op(op_close));
ry marked this conversation as resolved.
Show resolved Hide resolved

isolate.then(|r| {
js_check(r);
Expand Down Expand Up @@ -225,7 +204,13 @@ fn new_rid() -> i32 {
rid as i32
}

fn op_accept(listener_rid: i32) -> Box<HttpBenchOp> {
fn op_accept(
is_sync: bool,
record: Record,
_zero_copy_buf: Option<PinnedBuf>,
) -> Box<HttpOp> {
assert!(!is_sync);
let listener_rid = record.arg;
debug!("accept {}", listener_rid);
Box::new(
futures::future::poll_fn(move || {
Expand All @@ -248,9 +233,13 @@ fn op_accept(listener_rid: i32) -> Box<HttpBenchOp> {
)
}

fn op_listen() -> Box<HttpBenchOp> {
fn op_listen(
is_sync: bool,
_record: Record,
_zero_copy_buf: Option<PinnedBuf>,
) -> Box<HttpOp> {
assert!(is_sync);
debug!("listen");

Box::new(lazy(move || {
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let listener = tokio::net::TcpListener::bind(&addr).unwrap();
Expand All @@ -262,8 +251,14 @@ fn op_listen() -> Box<HttpBenchOp> {
}))
}

fn op_close(rid: i32) -> Box<HttpBenchOp> {
fn op_close(
is_sync: bool,
record: Record,
_zero_copy_buf: Option<PinnedBuf>,
) -> Box<HttpOp> {
assert!(is_sync);
debug!("close");
let rid = record.arg;
Box::new(lazy(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let r = table.remove(&rid);
Expand All @@ -272,7 +267,13 @@ fn op_close(rid: i32) -> Box<HttpBenchOp> {
}))
}

fn op_read(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
fn op_read(
is_sync: bool,
record: Record,
zero_copy_buf: Option<PinnedBuf>,
) -> Box<HttpOp> {
assert!(!is_sync);
let rid = record.arg;
debug!("read rid={}", rid);
let mut zero_copy_buf = zero_copy_buf.unwrap();
Box::new(
Expand All @@ -293,7 +294,13 @@ fn op_read(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
)
}

fn op_write(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
fn op_write(
is_sync: bool,
record: Record,
zero_copy_buf: Option<PinnedBuf>,
) -> Box<HttpOp> {
assert!(!is_sync);
let rid = record.arg;
debug!("write rid={}", rid);
let zero_copy_buf = zero_copy_buf.unwrap();
Box::new(
Expand Down
Loading