Skip to content

Commit

Permalink
use ResourceTable in deno_core_http_bench
Browse files Browse the repository at this point in the history
  • Loading branch information
bartlomieju committed Oct 19, 2019
1 parent b555ecc commit df91327
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 74 deletions.
92 changes: 36 additions & 56 deletions core/examples/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ extern crate lazy_static;

use deno::*;
use futures::future::lazy;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use tokio::prelude::*;
use std::sync::MutexGuard;

static LOGGER: Logger = Logger;
struct Logger;
Expand Down Expand Up @@ -184,41 +182,35 @@ fn main() {
}
}

enum Repr {
TcpListener(tokio::net::TcpListener),
TcpStream(tokio::net::TcpStream),
}
struct TcpListener(tokio::net::TcpListener);

impl Resource for TcpListener {}

struct TcpStream(tokio::net::TcpStream);

impl Resource for TcpStream {}

type ResourceTable = HashMap<i32, Repr>;
lazy_static! {
static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new(HashMap::new());
static ref NEXT_RID: AtomicUsize = AtomicUsize::new(3);
static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new(ResourceTable::default());
}

fn new_rid() -> i32 {
let rid = NEXT_RID.fetch_add(1, Ordering::SeqCst);
rid as i32
fn lock_table<'a>() -> MutexGuard<'a, ResourceTable> {
RESOURCE_TABLE.lock().unwrap()
}

fn op_accept(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
let listener_rid = record.arg;
debug!("accept {}", listener_rid);
let rid = record.arg as u32;
debug!("accept {}", rid);
Box::new(
futures::future::poll_fn(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&listener_rid);
match maybe_repr {
Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(),
_ => panic!("bad rid {}", listener_rid),
}
let mut table = lock_table();
let listener = table.get_mut::<TcpListener>(&rid)?;
listener.0.poll_accept()
})
.and_then(move |(stream, addr)| {
debug!("accept success {}", addr);
let rid = new_rid();

let mut guard = RESOURCE_TABLE.lock().unwrap();
guard.insert(rid, Repr::TcpStream(stream));

let mut table = lock_table();
let rid = table.add(Box::new(TcpStream(stream)));
Ok(rid as i32)
}),
)
Expand All @@ -232,39 +224,32 @@ fn op_listen(
Box::new(lazy(move || {
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let listener = tokio::net::TcpListener::bind(&addr).unwrap();
let rid = new_rid();

let mut guard = RESOURCE_TABLE.lock().unwrap();
guard.insert(rid, Repr::TcpListener(listener));
futures::future::ok(rid)
let mut table = lock_table();
let rid = table.add(Box::new(TcpListener(listener)));
futures::future::ok(rid as i32)
}))
}

fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
debug!("close");
let rid = record.arg;
Box::new(lazy(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let r = table.remove(&rid);
let result = if r.is_some() { 0 } else { -1 };
futures::future::ok(result)
}))
let rid = record.arg as u32;
let mut table = lock_table();
let fut = match table.close(&rid) {
Ok(_) => futures::future::ok(0),
Err(e) => futures::future::err(e)
};
Box::new(fut)
}

fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
let rid = record.arg;
let rid = record.arg as u32;
debug!("read rid={}", rid);
let mut zero_copy_buf = zero_copy_buf.unwrap();
Box::new(
futures::future::poll_fn(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&rid);
match maybe_repr {
Some(Repr::TcpStream(ref mut stream)) => {
stream.poll_read(&mut zero_copy_buf)
}
_ => panic!("bad rid"),
}
let mut table = lock_table();
let stream = table.get_mut::<TcpStream>(&rid)?;
stream.0.poll_read(&mut zero_copy_buf)
})
.and_then(move |nread| {
debug!("read success {}", nread);
Expand All @@ -274,19 +259,14 @@ fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
}

fn op_write(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
let rid = record.arg;
let rid = record.arg as u32;
debug!("write rid={}", rid);
let zero_copy_buf = zero_copy_buf.unwrap();
Box::new(
futures::future::poll_fn(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&rid);
match maybe_repr {
Some(Repr::TcpStream(ref mut stream)) => {
stream.poll_write(&zero_copy_buf)
}
_ => panic!("bad rid"),
}
let mut table = lock_table();
let stream = table.get_mut::<TcpStream>(&rid)?;
stream.0.poll_write(&zero_copy_buf)
})
.and_then(move |nwritten| {
debug!("write success {}", nwritten);
Expand Down
25 changes: 7 additions & 18 deletions core/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
// descriptors". This module implements a global resource table. Ops (AKA
// handlers) look up resources by their integer id here.

use crate::ErrBox;
use downcast_rs::Downcast;
use std;
use std::any::Any;
use std::collections::BTreeMap;
use std::error::Error;
use std::fmt;
use std::io::Error;
use std::io::ErrorKind;

/// Also referred to as rid.
pub type ResourceId = u32;
Expand All @@ -30,7 +29,7 @@ pub struct ResourceTable {
}

impl ResourceTable {
pub fn get<T: Resource>(&self, rid: &ResourceId) -> Result<&T, ErrBox> {
pub fn get<T: Resource>(&self, rid: &ResourceId) -> Result<&T, Error> {
let resource = self.map.get(&rid).ok_or_else(bad_resource)?;
let resource = &resource.downcast_ref::<T>().ok_or_else(bad_resource)?;
Ok(resource)
Expand All @@ -39,7 +38,7 @@ impl ResourceTable {
pub fn get_mut<T: Resource>(
&mut self,
rid: &ResourceId,
) -> Result<&mut T, ErrBox> {
) -> Result<&mut T, Error> {
let resource = self.map.get_mut(&rid).ok_or_else(bad_resource)?;
let resource = resource.downcast_mut::<T>().ok_or_else(bad_resource)?;
Ok(resource)
Expand All @@ -61,7 +60,7 @@ impl ResourceTable {

// close(2) is done by dropping the value. Therefore we just need to remove
// the resource from the RESOURCE_TABLE.
pub fn close(&mut self, rid: &ResourceId) -> Result<(), ErrBox> {
pub fn close(&mut self, rid: &ResourceId) -> Result<(), Error> {
let repr = self.map.remove(rid).ok_or_else(bad_resource)?;
// Give resource a chance to cleanup (notify tasks, etc.)
repr.close();
Expand All @@ -80,17 +79,7 @@ pub trait Resource: Downcast + Any + Send {
}
impl_downcast!(Resource);

#[derive(Debug)]
struct StaticError(&'static str);

impl Error for StaticError {}

impl fmt::Display for StaticError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad(self.0)
}
}

pub fn bad_resource() -> ErrBox {
StaticError("bad resource id").into()
pub fn bad_resource() -> Error {
Error::new(ErrorKind::NotFound, "bad resource id")
}

0 comments on commit df91327

Please sign in to comment.