From 12bc93af7810fab9c313aaf8c7285cc7bf98b305 Mon Sep 17 00:00:00 2001 From: gbaranski Date: Thu, 30 Mar 2023 14:43:07 +0200 Subject: [PATCH] wip: add support for warp --- Cargo.toml | 3 + examples/chat-server-warp/Cargo.toml | 14 +++ examples/chat-server-warp/src/main.rs | 156 ++++++++++++++++++++++++++ src/lib.rs | 3 + src/warp.rs | 25 +++++ 5 files changed, 201 insertions(+) create mode 100644 examples/chat-server-warp/Cargo.toml create mode 100644 examples/chat-server-warp/src/main.rs create mode 100644 src/warp.rs diff --git a/Cargo.toml b/Cargo.toml index ae74ba0..8dbfeb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ url = "2.2.2" cfg-if = "1.0.0" axum_crate = { package = "axum", version = "0.6.1", features = ["ws"], optional = true } +warp_crate = { package = "warp", version = "0.3.3", optional = true } tokio-tungstenite = { version = "0.18.0", optional = true } tokio-rustls = { version = "0.23.4", optional = true } tokio-native-tls = { version = "0.3.1", optional = true } @@ -35,6 +36,7 @@ client = ["tokio-tungstenite"] server = [] tungstenite = ["server", "tokio-tungstenite"] axum = ["server", "axum_crate"] +warp = ["server", "warp_crate"] tls = [] native-tls = ["tls", "tokio-native-tls", "tokio-tungstenite/native-tls"] @@ -56,6 +58,7 @@ members = [ "examples/chat-client", "examples/chat-server", "examples/chat-server-axum", + "examples/chat-server-warp", "examples/echo-server", "examples/echo-server-native-tls", "examples/simple-client", diff --git a/examples/chat-server-warp/Cargo.toml b/examples/chat-server-warp/Cargo.toml new file mode 100644 index 0000000..345888e --- /dev/null +++ b/examples/chat-server-warp/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "ezsockets-chat-server-warp" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1.52" +ezsockets = { path = "../../", features = ["warp"] } +tokio = { version = "1.17.0", features = ["full"] } +tracing = "0.1.32" +tracing-subscriber = "0.3.9" +warp = "0.3.3" diff --git a/examples/chat-server-warp/src/main.rs b/examples/chat-server-warp/src/main.rs new file mode 100644 index 0000000..e25b4e3 --- /dev/null +++ b/examples/chat-server-warp/src/main.rs @@ -0,0 +1,156 @@ +use async_trait::async_trait; +use axum::extract::Extension; +use axum::response::IntoResponse; +use axum::routing::get; +use axum::Router; +use ezsockets::axum::Upgrade; +use ezsockets::Error; +use ezsockets::Server; +use ezsockets::Socket; +use std::collections::HashMap; +use std::io::BufRead; +use std::net::SocketAddr; + +type SessionID = u16; +type Session = ezsockets::Session; + +#[derive(Debug)] +enum ChatMessage { + Send { from: SessionID, text: String }, +} + +struct ChatServer { + sessions: HashMap, + handle: Server, +} + +#[async_trait] +impl ezsockets::ServerExt for ChatServer { + type Session = ChatSession; + type Call = ChatMessage; + + async fn on_connect( + &mut self, + socket: Socket, + _address: SocketAddr, + _args: ::Args, + ) -> Result { + let id = (0..).find(|i| !self.sessions.contains_key(i)).unwrap_or(0); + let session = Session::create( + |_| ChatSession { + id, + server: self.handle.clone(), + }, + id, + socket, + ); + self.sessions.insert(id, session.clone()); + Ok(session) + } + + async fn on_disconnect( + &mut self, + id: ::ID, + ) -> Result<(), Error> { + assert!(self.sessions.remove(&id).is_some()); + Ok(()) + } + + async fn on_call(&mut self, call: Self::Call) -> Result<(), Error> { + match call { + ChatMessage::Send { text, from } => { + let sessions = self.sessions.iter().filter(|(id, _)| from != **id); + let text = format!("from {from}: {text}"); + for (id, handle) in sessions { + tracing::info!("sending {text} to {id}"); + handle.text(text.clone()); + } + } + }; + Ok(()) + } +} + +struct ChatSession { + id: SessionID, + server: Server, +} + +#[async_trait] +impl ezsockets::SessionExt for ChatSession { + type ID = SessionID; + type Args = (); + type Call = (); + + fn id(&self) -> &Self::ID { + &self.id + } + async fn on_text(&mut self, text: String) -> Result<(), Error> { + tracing::info!("received: {text}"); + self.server.call(ChatMessage::Send { + from: self.id, + text, + }); + Ok(()) + } + + async fn on_binary(&mut self, _bytes: Vec) -> Result<(), Error> { + unimplemented!() + } + + async fn on_call(&mut self, call: Self::Call) -> Result<(), Error> { + let () = call; + Ok(()) + } +} + +async fn websocket_handler( + Extension(server): Extension>, + ezsocket: Upgrade, +) -> impl IntoResponse { + ezsocket.on_upgrade(server, ()) +} + + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let (server, _) = Server::create(|handle| ChatServer { + sessions: HashMap::new(), + handle, + }); + + let routes = warp::path("echo") + // The `ws()` filter will prepare the Websocket handshake. + .and(warp::ws()) + .map(|ws: warp::ws::Ws| { + // And then our closure will be called when it completes... + ws.on_upgrade(|websocket| { + // Just echo all messages back... + let (tx, rx) = websocket.split(); + rx.forward(tx).map(|result| { + if let Err(e) = result { + eprintln!("websocket error: {:?}", e); + } + }) + }) + }); + + + tokio::spawn(async move { + tracing::debug!("listening on {}", address); + warp::serve(routes).run(([127, 0, 0, 1], 8080)).await; + }); + + let stdin = std::io::stdin(); + let lines = stdin.lock().lines(); + for line in lines { + let line = line.unwrap(); + server.call(ChatMessage::Send { + text: line, + from: SessionID::MAX, // reserve some ID for the server + }); + } + + +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 9f37645..1ad0ae1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,9 @@ pub use socket::Stream; #[cfg(feature = "axum")] pub mod axum; +#[cfg(feature = "warp")] +pub mod warp; + #[cfg(feature = "tokio-tungstenite")] pub mod tungstenite; diff --git a/src/warp.rs b/src/warp.rs new file mode 100644 index 0000000..011a035 --- /dev/null +++ b/src/warp.rs @@ -0,0 +1,25 @@ +//! Websockets Filters + +use warp_crate as warp; + +use std::borrow::Cow; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use warp::filters::header; +use warp::Filter; +use warp::reject::Rejection; +use warp::reply::{Reply, Response}; +use http; + +pub fn ws() -> impl Filter + Copy { + let ws = warp::filters::ws::ws(); + todo!() + // return ; +} + +pub struct Ws { + inner: warp::filters::ws::Ws, +}