Skip to content

Commit

Permalink
make handler generic over a runtime type
Browse files Browse the repository at this point in the history
  • Loading branch information
jbr committed Jul 7, 2021
1 parent 86bdc28 commit 9f2c6a5
Show file tree
Hide file tree
Showing 68 changed files with 1,393 additions and 878 deletions.
10 changes: 8 additions & 2 deletions async-std/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ keywords = ["trillium", "framework", "async"]
categories = ["web-programming::http-server", "web-programming"]

[dependencies]
async-std = { version = "1.9.0", package = "async-std" }
log = "0.4.14"
trillium = { path = "../trillium", version = "^0.1.0" }
trillium-http = { path = "../http", version = "^0.1.0" }
trillium-server-common = { path = "../server-common", version = "0.1.0" }

[dependencies.async-std]
version = "1.9.0"
package = "async-std"
features = ["unstable"]

[target.'cfg(unix)'.dependencies]
signal-hook = "0.3.9"
signal-hook-async-std = "0.2.1"

[dev-dependencies]
async-std = { version = "1.9.0", features = ["attributes"], package = "async-std" }
async-std = { version = "1.9.0", features = [
"attributes"
], package = "async-std" }
env_logger = "0.8.4"
10 changes: 5 additions & 5 deletions async-std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod client;
pub use client::{ClientConfig, TcpConnector};

mod server;
use server::Config;
use server::AsyncStdServer;

/**
# Runs a trillium handler in a sync context with default config
Expand All @@ -52,7 +52,7 @@ This function will block the current thread until the server shuts
down
*/

pub fn run(handler: impl Handler) {
pub fn run(handler: impl Handler<AsyncStdServer<()>>) {
config().run(handler)
}

Expand All @@ -65,7 +65,7 @@ runtime with default settings. the defaults are the same as
This function will poll pending until the server shuts down.
*/
pub async fn run_async(handler: impl Handler) {
pub async fn run_async(handler: impl Handler<AsyncStdServer<()>>) {
config().run_async(handler).await
}
/**
Expand Down Expand Up @@ -101,6 +101,6 @@ trillium_async_std::config()
See [`trillium_server_common::Config`] for more details
*/
pub fn config() -> Config<()> {
Config::new()
pub fn config() -> AsyncStdServer<()> {
AsyncStdServer::new()
}
127 changes: 86 additions & 41 deletions async-std/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use async_std::{
net::{TcpListener, TcpStream},
prelude::*,
task,
task::block_on,
};
use std::{net::IpAddr, sync::Arc};
use trillium::{async_trait, Handler, Info};
use trillium_server_common::{Acceptor, ConfigExt, Server, Stopper};
use std::{
fs::Metadata,
io::Result,
net::IpAddr,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
};
use trillium::{async_trait, FileSystem, Handler, Info, Runtime};
use trillium_server_common::{Acceptor, Config, Server, Stopper};

const SERVER_DESCRIPTION: &str = concat!(
" (",
Expand Down Expand Up @@ -33,51 +40,89 @@ async fn handle_signals(stop: Stopper) {
}
}

#[derive(Debug, Clone, Copy)]
pub struct AsyncStdServer;
pub type Config<A> = trillium_server_common::Config<AsyncStdServer, A>;
#[derive(Debug, Clone)]
pub struct AsyncStdServer<A> {
acceptor: A,
config: Config,
}

#[async_trait]
impl Server for AsyncStdServer {
type Transport = TcpStream;

fn peer_ip(transport: &Self::Transport) -> Option<IpAddr> {
transport
.peer_addr()
.ok()
.map(|socket_addr| socket_addr.ip())
impl AsyncStdServer<()> {
/// constructs a new AsyncStdServer with a default noop [`Acceptor`] and default [`Config`]
pub fn new() -> Self {
Self {
config: Config::default(),
acceptor: (),
}
}
}

trillium_server_common::standard_server!(
AsyncStdServer,
transport: TcpStream,
listener: TcpListener
);

fn run<A: Acceptor<Self::Transport>, H: Handler>(config: Config<A>, handler: H) {
task::block_on(async move { Self::run_async(config, handler).await })
impl<A: Acceptor<TcpStream>> Server for AsyncStdServer<A> {
fn run_async(self, handler: impl Handler<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(AsyncStdServer::run_async(self, handler))
}

async fn run_async<A: Acceptor<Self::Transport>, H: Handler>(
config: Config<A>,
mut handler: H,
) {
if config.should_register_signals() {
#[cfg(unix)]
task::spawn(handle_signals(config.stopper()));
#[cfg(not(unix))]
panic!("signals handling not supported on windows yet");
}
#[cfg(unix)]
fn handle_signals(&self) {
Self::spawn(handle_signals(self.config.stopper().clone()));
}
}

let listener = config.build_listener::<TcpListener>();
let local_addr = listener.local_addr().unwrap();
let mut info = Info::from(local_addr);
*info.listener_description_mut() = format!("http://{}:{}", config.host(), config.port());
info.server_description_mut().push_str(SERVER_DESCRIPTION);
impl<A> Runtime for AsyncStdServer<A>
where
A: Send + Sync + 'static,
{
fn block_on<F>(future: F) -> F::Output
where
F: Future,
{
block_on(future)
}

handler.init(&mut info).await;
let handler = Arc::new(handler);
fn spawn<F>(future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
async_std::task::spawn(future);
}

let mut incoming = config.stopper().stop_stream(listener.incoming());
while let Some(Ok(stream)) = incoming.next().await {
trillium::log_error!(stream.set_nodelay(config.nodelay()));
task::spawn(config.clone().handle_stream(stream, handler.clone()));
}
fn spawn_with_handle<F>(future: F) -> Pin<Box<dyn Future<Output = F::Output>>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
Box::pin(async_std::task::spawn(future))
}

fn spawn_local<F>(future: F)
where
F: Future + 'static,
{
async_std::task::spawn_local(future);
}
}

#[async_trait]
impl<A> FileSystem for AsyncStdServer<A> {
type File = async_std::fs::File;

async fn canonicalize<P: AsRef<Path> + Send + Sync>(path: P) -> Result<PathBuf> {
async_std::fs::canonicalize(path.as_ref())
.await
.map(Into::into)
}

async fn metadata<P: AsRef<Path> + Send + Sync>(path: P) -> Result<Metadata> {
async_std::fs::metadata(path.as_ref()).await
}

config.graceful_shutdown().await;
async fn open<P: AsRef<Path> + Send + Sync>(path: P) -> Result<Self::File> {
Self::File::open(path.as_ref()).await
}
}
51 changes: 45 additions & 6 deletions aws-lambda/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ trillium_aws_lambda::run(|conn: trillium::Conn| async move {
use lamedh_runtime::{Context, Handler as AwsHandler};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::runtime;
use trillium::{Conn, Handler};
use trillium::{Conn, Handler, Runtime};
use trillium_http::{Conn as HttpConn, Synthetic};

mod context;
Expand All @@ -37,7 +37,7 @@ use response::{AlbMultiHeadersResponse, AlbResponse, LambdaResponse};
#[derive(Debug)]
struct HandlerWrapper<H>(Arc<H>);

impl<H: Handler> AwsHandler<LambdaRequest, LambdaResponse> for HandlerWrapper<H> {
impl<H: Handler<AwsLambdaServer>> AwsHandler<LambdaRequest, LambdaResponse> for HandlerWrapper<H> {
type Error = std::io::Error;
type Fut = Pin<Box<dyn Future<Output = Result<LambdaResponse, Self::Error>> + Send + 'static>>;

Expand All @@ -46,15 +46,18 @@ impl<H: Handler> AwsHandler<LambdaRequest, LambdaResponse> for HandlerWrapper<H>
}
}

async fn run_handler(conn: HttpConn<Synthetic>, handler: Arc<impl Handler>) -> Conn {
async fn run_handler(
conn: HttpConn<Synthetic>,
handler: Arc<impl Handler<AwsLambdaServer>>,
) -> Conn {
let conn = handler.run(conn.into()).await;
handler.before_send(conn).await
}

async fn handler_fn(
request: LambdaRequest,
context: Context,
handler: Arc<impl Handler>,
handler: Arc<impl Handler<AwsLambdaServer>>,
) -> std::io::Result<LambdaResponse> {
match request {
LambdaRequest::Alb(request) => {
Expand All @@ -79,14 +82,50 @@ async fn handler_fn(
This function will poll pending until the server shuts down.
*/
pub async fn run_async(mut handler: impl Handler) {
pub async fn run_async(mut handler: impl Handler<AwsLambdaServer>) {
let mut info = "aws lambda".into();
handler.init(&mut info).await;
lamedh_runtime::run(HandlerWrapper(Arc::new(handler)))
.await
.unwrap()
}

/// The runtime struct for this crate
#[derive(Clone, Copy, Debug)]
pub struct AwsLambdaServer;

impl Runtime for AwsLambdaServer {
fn block_on<F>(future: F) -> F::Output
where
F: Future,
{
tokio::runtime::Runtime::new().unwrap().block_on(future)
}

fn spawn<F>(future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::task::spawn(future);
}

fn spawn_with_handle<F>(future: F) -> Pin<Box<dyn Future<Output = F::Output>>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
Box::pin(async { tokio::task::spawn(future).await.unwrap() })
}

fn spawn_local<F>(future: F)
where
F: Future + 'static,
{
tokio::task::spawn_local(future);
}
}

/**
# Runs a trillium handler in a sync context
Expand All @@ -97,7 +136,7 @@ This function will block the current thread until the server shuts
down
*/

pub fn run(handler: impl Handler) {
pub fn run(handler: impl Handler<AwsLambdaServer>) {
runtime::Builder::new_current_thread()
.enable_all()
.build()
Expand Down
22 changes: 8 additions & 14 deletions cli/src/dev_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use std::{
time::Duration,
};
use structopt::StructOpt;
use trillium::Runtime;

type R = trillium_smol::Smol<()>;

#[derive(StructOpt, Debug)]
pub struct DevServer {
Expand Down Expand Up @@ -195,14 +198,14 @@ impl DevServer {
child = run.spawn().unwrap();
*child_id.lock().unwrap() = child.id();
thread::sleep(Duration::from_millis(500));
async_io::block_on(broadcaster.send(&Event::Restarted)).ok();
R::block_on(broadcaster.send(&Event::Restarted)).ok();
});
}
{
let broadcaster = broadcaster.clone();
thread::spawn(move || loop {
let event = rx.recv().unwrap();
async_io::block_on(broadcaster.send(&event)).unwrap();
R::block_on(broadcaster.send(&event)).unwrap();
match event {
Event::BinaryChanged => {
log::info!("attempting to send {}", &signal);
Expand All @@ -219,10 +222,10 @@ impl DevServer {
Ok(ok) => {
if ok.status.success() {
log::debug!("{}", String::from_utf8_lossy(&ok.stdout[..]));
async_io::block_on(broadcaster.send(&Event::BuildSuccess)).ok();
R::block_on(broadcaster.send(&Event::BuildSuccess)).ok();
} else {
io::stderr().write_all(&ok.stderr).unwrap();
async_io::block_on(
R::block_on(
broadcaster.send(&Event::CompileError {
error: ansi_to_html::convert_escaped(
&String::from_utf8_lossy(&ok.stderr),
Expand Down Expand Up @@ -252,25 +255,16 @@ mod proxy_app {
use broadcaster::BroadcastChannel;
use futures_lite::StreamExt;
use trillium::{Conn, State};
use trillium_client::Client;
use trillium_html_rewriter::{
html::{element, html_content::ContentType, Settings},
HtmlRewriter,
};
use trillium_proxy::Proxy;
use trillium_router::Router;
use trillium_smol::{ClientConfig, TcpConnector};
use trillium_websockets::WebSocket;
type HttpClient = Client<TcpConnector>;

pub fn run(proxy: String, rx: BroadcastChannel<Event>) {
static PORT: u16 = 8082;
let client = HttpClient::new()
.with_default_pool()
.with_config(ClientConfig {
nodelay: Some(true),
..Default::default()
});

trillium_smol::config()
.without_signals()
Expand All @@ -296,7 +290,7 @@ mod proxy_app {
}),
),
),
Proxy::new(&*proxy).with_client(client),
Proxy::new(&*proxy),
HtmlRewriter::new(|| Settings {
element_content_handlers: vec![element!("body", |el| {
el.append(
Expand Down
Loading

0 comments on commit 9f2c6a5

Please sign in to comment.