Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trillium 0.3.0: make handler generic over a runtime type #64

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions async-std/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ keywords = ["trillium", "framework", "async"]
categories = ["web-programming::http-server", "web-programming"]

[dependencies]
async-std = { version = "1.9.0", package = "async-std" }
log = "0.4.14"
trillium = { path = "../trillium", version = "^0.1.0" }
trillium-http = { path = "../http", version = "^0.1.0" }
trillium-server-common = { path = "../server-common", version = "0.1.0" }

[dependencies.async-std]
version = "1.9.0"
package = "async-std"
features = ["unstable"]

[target.'cfg(unix)'.dependencies]
signal-hook = "0.3.9"
signal-hook-async-std = "0.2.1"

[dev-dependencies]
async-std = { version = "1.9.0", features = ["attributes"], package = "async-std" }
async-std = { version = "1.9.0", features = [
"attributes"
], package = "async-std" }
env_logger = "0.8.4"
10 changes: 5 additions & 5 deletions async-std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod client;
pub use client::{ClientConfig, TcpConnector};

mod server;
use server::Config;
use server::AsyncStdServer;

/**
# Runs a trillium handler in a sync context with default config
Expand All @@ -52,7 +52,7 @@ This function will block the current thread until the server shuts
down
*/

pub fn run(handler: impl Handler) {
pub fn run(handler: impl Handler<AsyncStdServer<()>>) {
config().run(handler)
}

Expand All @@ -65,7 +65,7 @@ runtime with default settings. the defaults are the same as

This function will poll pending until the server shuts down.
*/
pub async fn run_async(handler: impl Handler) {
pub async fn run_async(handler: impl Handler<AsyncStdServer<()>>) {
config().run_async(handler).await
}
/**
Expand Down Expand Up @@ -101,6 +101,6 @@ trillium_async_std::config()
See [`trillium_server_common::Config`] for more details

*/
pub fn config() -> Config<()> {
Config::new()
pub fn config() -> AsyncStdServer<()> {
AsyncStdServer::new()
}
133 changes: 92 additions & 41 deletions async-std/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use async_std::{
net::{TcpListener, TcpStream},
prelude::*,
task,
task::block_on,
};
use std::{net::IpAddr, sync::Arc};
use trillium::{async_trait, Handler, Info};
use trillium_server_common::{Acceptor, ConfigExt, Server, Stopper};
use std::{
fs::Metadata,
io::Result,
net::IpAddr,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
};
use trillium::{async_trait, FileSystem, Handler, Info, Runtime};
use trillium_server_common::{Acceptor, Config, Server, Stopper};

const SERVER_DESCRIPTION: &str = concat!(
" (",
Expand Down Expand Up @@ -33,51 +40,95 @@ async fn handle_signals(stop: Stopper) {
}
}

#[derive(Debug, Clone, Copy)]
pub struct AsyncStdServer;
pub type Config<A> = trillium_server_common::Config<AsyncStdServer, A>;
#[derive(Debug, Clone)]
pub struct AsyncStdServer<A> {
acceptor: A,
config: Config,
}

#[async_trait]
impl Server for AsyncStdServer {
type Transport = TcpStream;

fn peer_ip(transport: &Self::Transport) -> Option<IpAddr> {
transport
.peer_addr()
.ok()
.map(|socket_addr| socket_addr.ip())
impl AsyncStdServer<()> {
/// constructs a new AsyncStdServer with a default noop [`Acceptor`] and default [`Config`]
pub fn new() -> Self {
Self {
config: Config::default(),
acceptor: (),
}
}
}

fn run<A: Acceptor<Self::Transport>, H: Handler>(config: Config<A>, handler: H) {
task::block_on(async move { Self::run_async(config, handler).await })
impl Default for AsyncStdServer<()> {
fn default() -> Self {
Self::new()
}
}

async fn run_async<A: Acceptor<Self::Transport>, H: Handler>(
config: Config<A>,
mut handler: H,
) {
if config.should_register_signals() {
#[cfg(unix)]
task::spawn(handle_signals(config.stopper()));
#[cfg(not(unix))]
panic!("signals handling not supported on windows yet");
}
trillium_server_common::standard_server!(
AsyncStdServer,
transport: TcpStream,
listener: TcpListener
);

let listener = config.build_listener::<TcpListener>();
let local_addr = listener.local_addr().unwrap();
let mut info = Info::from(local_addr);
*info.listener_description_mut() = format!("http://{}:{}", config.host(), config.port());
info.server_description_mut().push_str(SERVER_DESCRIPTION);
impl<A: Acceptor<TcpStream>> Server for AsyncStdServer<A> {
fn run_async(self, handler: impl Handler<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(AsyncStdServer::run_async(self, handler))
}

handler.init(&mut info).await;
let handler = Arc::new(handler);
#[cfg(unix)]
fn handle_signals(&self) {
Self::spawn(handle_signals(self.config.stopper().clone()));
}
}

let mut incoming = config.stopper().stop_stream(listener.incoming());
while let Some(Ok(stream)) = incoming.next().await {
trillium::log_error!(stream.set_nodelay(config.nodelay()));
task::spawn(config.clone().handle_stream(stream, handler.clone()));
}
impl<A> Runtime for AsyncStdServer<A>
where
A: Send + Sync + 'static,
{
fn block_on<F>(future: F) -> F::Output
where
F: Future,
{
block_on(future)
}

fn spawn<F>(future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
async_std::task::spawn(future);
}

fn spawn_with_handle<F>(future: F) -> Pin<Box<dyn Future<Output = F::Output>>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
Box::pin(async_std::task::spawn(future))
}

fn spawn_local<F>(future: F)
where
F: Future + 'static,
{
async_std::task::spawn_local(future);
}
}

#[async_trait]
impl<A> FileSystem for AsyncStdServer<A> {
type File = async_std::fs::File;

async fn canonicalize<P: AsRef<Path> + Send + Sync>(path: P) -> Result<PathBuf> {
async_std::fs::canonicalize(path.as_ref())
.await
.map(Into::into)
}

async fn metadata<P: AsRef<Path> + Send + Sync>(path: P) -> Result<Metadata> {
async_std::fs::metadata(path.as_ref()).await
}

config.graceful_shutdown().await;
async fn open<P: AsRef<Path> + Send + Sync>(path: P) -> Result<Self::File> {
Self::File::open(path.as_ref()).await
}
}
51 changes: 45 additions & 6 deletions aws-lambda/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ trillium_aws_lambda::run(|conn: trillium::Conn| async move {
use lamedh_runtime::{Context, Handler as AwsHandler};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::runtime;
use trillium::{Conn, Handler};
use trillium::{Conn, Handler, Runtime};
use trillium_http::{Conn as HttpConn, Synthetic};

mod context;
Expand All @@ -37,7 +37,7 @@ use response::{AlbMultiHeadersResponse, AlbResponse, LambdaResponse};
#[derive(Debug)]
struct HandlerWrapper<H>(Arc<H>);

impl<H: Handler> AwsHandler<LambdaRequest, LambdaResponse> for HandlerWrapper<H> {
impl<H: Handler<AwsLambdaServer>> AwsHandler<LambdaRequest, LambdaResponse> for HandlerWrapper<H> {
type Error = std::io::Error;
type Fut = Pin<Box<dyn Future<Output = Result<LambdaResponse, Self::Error>> + Send + 'static>>;

Expand All @@ -46,15 +46,18 @@ impl<H: Handler> AwsHandler<LambdaRequest, LambdaResponse> for HandlerWrapper<H>
}
}

async fn run_handler(conn: HttpConn<Synthetic>, handler: Arc<impl Handler>) -> Conn {
async fn run_handler(
conn: HttpConn<Synthetic>,
handler: Arc<impl Handler<AwsLambdaServer>>,
) -> Conn {
let conn = handler.run(conn.into()).await;
handler.before_send(conn).await
}

async fn handler_fn(
request: LambdaRequest,
context: Context,
handler: Arc<impl Handler>,
handler: Arc<impl Handler<AwsLambdaServer>>,
) -> std::io::Result<LambdaResponse> {
match request {
LambdaRequest::Alb(request) => {
Expand All @@ -79,14 +82,50 @@ async fn handler_fn(

This function will poll pending until the server shuts down.
*/
pub async fn run_async(mut handler: impl Handler) {
pub async fn run_async(mut handler: impl Handler<AwsLambdaServer>) {
let mut info = "aws lambda".into();
handler.init(&mut info).await;
lamedh_runtime::run(HandlerWrapper(Arc::new(handler)))
.await
.unwrap()
}

/// The runtime struct for this crate
#[derive(Clone, Copy, Debug)]
pub struct AwsLambdaServer;

impl Runtime for AwsLambdaServer {
fn block_on<F>(future: F) -> F::Output
where
F: Future,
{
tokio::runtime::Runtime::new().unwrap().block_on(future)
}

fn spawn<F>(future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::task::spawn(future);
}

fn spawn_with_handle<F>(future: F) -> Pin<Box<dyn Future<Output = F::Output>>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
Box::pin(async { tokio::task::spawn(future).await.unwrap() })
}

fn spawn_local<F>(future: F)
where
F: Future + 'static,
{
tokio::task::spawn_local(future);
}
}

/**
# Runs a trillium handler in a sync context

Expand All @@ -97,7 +136,7 @@ This function will block the current thread until the server shuts
down
*/

pub fn run(handler: impl Handler) {
pub fn run(handler: impl Handler<AwsLambdaServer>) {
runtime::Builder::new_current_thread()
.enable_all()
.build()
Expand Down
22 changes: 8 additions & 14 deletions cli/src/dev_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use std::{
time::Duration,
};
use structopt::StructOpt;
use trillium::Runtime;

type R = trillium_smol::Smol<()>;

#[derive(StructOpt, Debug)]
pub struct DevServer {
Expand Down Expand Up @@ -195,14 +198,14 @@ impl DevServer {
child = run.spawn().unwrap();
*child_id.lock().unwrap() = child.id();
thread::sleep(Duration::from_millis(500));
async_io::block_on(broadcaster.send(&Event::Restarted)).ok();
R::block_on(broadcaster.send(&Event::Restarted)).ok();
});
}
{
let broadcaster = broadcaster.clone();
thread::spawn(move || loop {
let event = rx.recv().unwrap();
async_io::block_on(broadcaster.send(&event)).unwrap();
R::block_on(broadcaster.send(&event)).unwrap();
match event {
Event::BinaryChanged => {
log::info!("attempting to send {}", &signal);
Expand All @@ -219,10 +222,10 @@ impl DevServer {
Ok(ok) => {
if ok.status.success() {
log::debug!("{}", String::from_utf8_lossy(&ok.stdout[..]));
async_io::block_on(broadcaster.send(&Event::BuildSuccess)).ok();
R::block_on(broadcaster.send(&Event::BuildSuccess)).ok();
} else {
io::stderr().write_all(&ok.stderr).unwrap();
async_io::block_on(
R::block_on(
broadcaster.send(&Event::CompileError {
error: ansi_to_html::convert_escaped(
&String::from_utf8_lossy(&ok.stderr),
Expand Down Expand Up @@ -252,25 +255,16 @@ mod proxy_app {
use broadcaster::BroadcastChannel;
use futures_lite::StreamExt;
use trillium::{Conn, State};
use trillium_client::Client;
use trillium_html_rewriter::{
html::{element, html_content::ContentType, Settings},
HtmlRewriter,
};
use trillium_proxy::Proxy;
use trillium_router::Router;
use trillium_smol::{ClientConfig, TcpConnector};
use trillium_websockets::WebSocket;
type HttpClient = Client<TcpConnector>;

pub fn run(proxy: String, rx: BroadcastChannel<Event>) {
static PORT: u16 = 8082;
let client = HttpClient::new()
.with_default_pool()
.with_config(ClientConfig {
nodelay: Some(true),
..Default::default()
});

trillium_smol::config()
.without_signals()
Expand All @@ -296,7 +290,7 @@ mod proxy_app {
}),
),
),
Proxy::new(&*proxy).with_client(client),
Proxy::new(&*proxy),
HtmlRewriter::new(|| Settings {
element_content_handlers: vec![element!("body", |el| {
el.append(
Expand Down
Loading