From 305d0e9d8a8b6f53ac96de23db5fb07799ec5f9d Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 22 Oct 2021 18:34:11 +0100 Subject: [PATCH 01/11] rename server to serverhandle --- actix-server/CHANGES.md | 3 +++ actix-server/src/accept.rs | 12 ++++++------ actix-server/src/builder.rs | 19 +++++++++++++++---- actix-server/src/lib.rs | 4 ++-- actix-server/src/server.rs | 16 +++++----------- actix-server/src/signals.rs | 6 +++--- actix-tls/examples/tcp-rustls.rs | 4 ++-- 7 files changed, 36 insertions(+), 28 deletions(-) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index a52712f759..a99e87adb9 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,8 +1,11 @@ # Changes ## Unreleased - 2021-xx-xx +* Rename `Server` to `ServerHandle`. [#???] * Minimum supported Rust version (MSRV) is now 1.52. +[#???]: https://github.com/actix/actix-net/pull/??? + ## 2.0.0-beta.6 - 2021-10-11 * Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374] diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 5fef2fe2c4..a872853cb4 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -8,7 +8,7 @@ use actix_rt::{ use log::{debug, error, info}; use mio::{Interest, Poll, Token as MioToken}; -use crate::server::Server; +use crate::server::ServerHandle; use crate::socket::MioListener; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandleAccept}; @@ -30,13 +30,13 @@ struct ServerSocketInfo { /// /// It would also listen to `ServerCommand` and push interests to `WakerQueue`. pub(crate) struct AcceptLoop { - srv: Option, + srv: Option, poll: Option, waker: WakerQueue, } impl AcceptLoop { - pub fn new(srv: Server) -> Self { + pub fn new(srv: ServerHandle) -> Self { let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); let waker = WakerQueue::new(poll.registry()) .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); @@ -74,7 +74,7 @@ struct Accept { poll: Poll, waker: WakerQueue, handles: Vec, - srv: Server, + srv: ServerHandle, next: usize, avail: Availability, paused: bool, @@ -153,7 +153,7 @@ impl Accept { poll: Poll, waker: WakerQueue, socks: Vec<(usize, MioListener)>, - srv: Server, + srv: ServerHandle, handles: Vec, ) { // Accept runs in its own thread and would want to spawn additional futures to current @@ -176,7 +176,7 @@ impl Accept { waker: WakerQueue, socks: Vec<(usize, MioListener)>, handles: Vec, - srv: Server, + srv: ServerHandle, ) -> (Accept, Vec) { let sockets = socks .into_iter() diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 8d0684c64a..4c8f279e3c 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -15,7 +15,7 @@ use tokio::sync::{ use crate::accept::AcceptLoop; use crate::join_all; -use crate::server::{Server, ServerCommand}; +use crate::server::{ServerCommand, ServerHandle}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; @@ -23,6 +23,17 @@ use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; +#[derive(Debug)] +#[non_exhaustive] +pub struct Server; + +impl Server { + /// Start server building process. + pub fn build() -> ServerBuilder { + ServerBuilder::default() + } +} + /// Server builder pub struct ServerBuilder { threads: usize, @@ -35,7 +46,7 @@ pub struct ServerBuilder { exit: bool, no_signals: bool, cmd: UnboundedReceiver, - server: Server, + server: ServerHandle, notify: Vec>, worker_config: ServerWorkerConfig, } @@ -50,7 +61,7 @@ impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { let (tx, rx) = unbounded_channel(); - let server = Server::new(tx); + let server = ServerHandle::new(tx); ServerBuilder { threads: num_cpus::get(), @@ -246,7 +257,7 @@ impl ServerBuilder { } /// Starts processing incoming connections and return server controller. - pub fn run(mut self) -> Server { + pub fn run(mut self) -> ServerHandle { if self.sockets.is_empty() { panic!("Server should have at least one bound socket"); } else { diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index b2117191d3..6893d9c13c 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -14,8 +14,8 @@ mod test_server; mod waker_queue; mod worker; -pub use self::builder::ServerBuilder; -pub use self::server::Server; +pub use self::builder::{Server, ServerBuilder}; +pub use self::server::{ServerHandle}; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index f0dfca0b7f..c515164c7b 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -6,7 +6,6 @@ use std::task::{Context, Poll}; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; -use crate::builder::ServerBuilder; use crate::signals::Signal; #[derive(Debug)] @@ -32,19 +31,14 @@ pub(crate) enum ServerCommand { /// /// A graceful shutdown will wait for all workers to stop first. #[derive(Debug)] -pub struct Server( +pub struct ServerHandle( UnboundedSender, Option>, ); -impl Server { +impl ServerHandle { pub(crate) fn new(tx: UnboundedSender) -> Self { - Server(tx, None) - } - - /// Start server building process - pub fn build() -> ServerBuilder { - ServerBuilder::default() + ServerHandle(tx, None) } pub(crate) fn signal(&self, sig: Signal) { @@ -91,13 +85,13 @@ impl Server { } } -impl Clone for Server { +impl Clone for ServerHandle { fn clone(&self) -> Self { Self(self.0.clone(), None) } } -impl Future for Server { +impl Future for ServerHandle { type Output = io::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index c9cdb45e81..b80fa7597e 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use crate::server::Server; +use crate::server::ServerHandle; /// Types of process signals. #[allow(dead_code)] @@ -20,7 +20,7 @@ pub(crate) enum Signal { /// Process signal listener. pub(crate) struct Signals { - srv: Server, + srv: ServerHandle, #[cfg(not(unix))] signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>, @@ -31,7 +31,7 @@ pub(crate) struct Signals { impl Signals { /// Spawns a signal listening future that is able to send commands to the `Server`. - pub(crate) fn start(srv: Server) { + pub(crate) fn start(srv: ServerHandle) { #[cfg(not(unix))] { actix_rt::spawn(Signals { diff --git a/actix-tls/examples/tcp-rustls.rs b/actix-tls/examples/tcp-rustls.rs index f347e16484..3ef34e35c7 100644 --- a/actix-tls/examples/tcp-rustls.rs +++ b/actix-tls/examples/tcp-rustls.rs @@ -30,7 +30,7 @@ use std::{ }; use actix_rt::net::TcpStream; -use actix_server::Server; +use actix_server::ServerHandle; use actix_service::ServiceFactoryExt as _; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use futures_util::future::ok; @@ -67,7 +67,7 @@ async fn main() -> io::Result<()> { let addr = ("127.0.0.1", 8443); info!("starting server on port: {}", &addr.0); - Server::build() + ServerHandle::build() .bind("tls-example", addr, move || { let count = Arc::clone(&count); From 81421c2ba9b59cbe5a44d4b4dc300a18d6bd6b0b Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 22 Oct 2021 21:01:50 +0100 Subject: [PATCH 02/11] rename maxconn => max_concurrent_connections --- actix-server/CHANGES.md | 1 + actix-server/src/builder.rs | 149 +++++++++++++++++------------- actix-server/src/worker.rs | 2 +- actix-server/tests/test_server.rs | 2 +- 4 files changed, 88 insertions(+), 66 deletions(-) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index a99e87adb9..a52b6faf55 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased - 2021-xx-xx * Rename `Server` to `ServerHandle`. [#???] +* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#???] * Minimum supported Rust version (MSRV) is now 1.52. [#???]: https://github.com/actix/actix-net/pull/??? diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 4c8f279e3c..f6c5f3c161 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -125,15 +125,21 @@ impl ServerBuilder { /// Sets the maximum per-worker number of concurrent connections. /// - /// All socket listeners will stop accepting connections when this limit is - /// reached for each worker. + /// All socket listeners will stop accepting connections when this limit is reached for + /// each worker. /// - /// By default max connections is set to a 25k per worker. - pub fn maxconn(mut self, num: usize) -> Self { + /// By default max connections is set to a 25,600 per worker. + pub fn max_concurrent_connections(mut self, num: usize) -> Self { self.worker_config.max_concurrent_connections(num); self } + #[doc(hidden)] + #[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")] + pub fn maxconn(self, num: usize) -> Self { + self.max_concurrent_connections(num) + } + /// Stop Actix system. pub fn system_exit(mut self) -> Self { self.exit = true; @@ -158,7 +164,11 @@ impl ServerBuilder { self } - /// Add new service to the server. + /// Bind server to socket addresses. + /// + /// Binds to all network interface addresses that resolve from the `addr` argument. + /// Eg. using `localhost` might bind to both IPv4 and IPv6 addresses. Bind to multiple distinct + /// interfaces at the same time by passing a list of socket addresses. pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result where F: ServiceFactory, @@ -180,56 +190,9 @@ impl ServerBuilder { Ok(self) } - /// Add new unix domain service to the server. - #[cfg(unix)] - pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result - where - F: ServiceFactory, - N: AsRef, - U: AsRef, - { - // The path must not exist when we try to bind. - // Try to remove it to avoid bind error. - if let Err(e) = std::fs::remove_file(addr.as_ref()) { - // NotFound is expected and not an issue. Anything else is. - if e.kind() != std::io::ErrorKind::NotFound { - return Err(e); - } - } - - let lst = crate::socket::StdUnixListener::bind(addr)?; - self.listen_uds(name, lst, factory) - } - - /// Add new unix domain service to the server. - /// Useful when running as a systemd service and - /// a socket FD can be acquired using the systemd crate. - #[cfg(unix)] - pub fn listen_uds>( - mut self, - name: N, - lst: crate::socket::StdUnixListener, - factory: F, - ) -> io::Result - where - F: ServiceFactory, - { - use std::net::{IpAddr, Ipv4Addr}; - lst.set_nonblocking(true)?; - let token = self.next_token(); - let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); - self.services.push(StreamNewService::create( - name.as_ref().to_string(), - token, - factory, - addr, - )); - self.sockets - .push((token, name.as_ref().to_string(), MioListener::from(lst))); - Ok(self) - } - - /// Add new service to the server. + /// Bind server to existing TCP listener. + /// + /// Useful when running as a systemd service and a socket FD can be passed to the process. pub fn listen>( mut self, name: N, @@ -240,9 +203,10 @@ impl ServerBuilder { F: ServiceFactory, { lst.set_nonblocking(true)?; - let addr = lst.local_addr()?; + let addr = lst.local_addr()?; let token = self.next_token(); + self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -434,6 +398,62 @@ impl ServerBuilder { } } +/// Unix Domain Socket (UDS) support. +#[cfg(unix)] +impl ServerBuilder { + /// Add new unix domain service to the server. + pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result + where + F: ServiceFactory, + N: AsRef, + U: AsRef, + { + // The path must not exist when we try to bind. + // Try to remove it to avoid bind error. + if let Err(e) = std::fs::remove_file(addr.as_ref()) { + // NotFound is expected and not an issue. Anything else is. + if e.kind() != std::io::ErrorKind::NotFound { + return Err(e); + } + } + + let lst = crate::socket::StdUnixListener::bind(addr)?; + self.listen_uds(name, lst, factory) + } + + /// Add new unix domain service to the server. + /// + /// Useful when running as a systemd service and a socket FD can be passed to the process. + pub fn listen_uds>( + mut self, + name: N, + lst: crate::socket::StdUnixListener, + factory: F, + ) -> io::Result + where + F: ServiceFactory, + { + use std::net::{IpAddr, Ipv4Addr}; + + lst.set_nonblocking(true)?; + + let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let token = self.next_token(); + + self.services.push(StreamNewService::create( + name.as_ref().to_string(), + token, + factory, + addr, + )); + + self.sockets + .push((token, name.as_ref().to_string(), MioListener::from(lst))); + + Ok(self) + } +} + impl Future for ServerBuilder { type Output = (); @@ -452,29 +472,30 @@ pub(super) fn bind_addr( backlog: u32, ) -> io::Result> { let mut err = None; - let mut succ = false; + let mut success = false; let mut sockets = Vec::new(); + for addr in addr.to_socket_addrs()? { match create_tcp_listener(addr, backlog) { Ok(lst) => { - succ = true; + success = true; sockets.push(lst); } Err(e) => err = Some(e), } } - if !succ { - if let Some(e) = err.take() { - Err(e) + if success { + Ok(sockets) + } else { + if let Some(err) = err.take() { + Err(err) } else { Err(io::Error::new( io::ErrorKind::Other, - "Can not bind to address.", + "Can not bind to socket address", )) } - } else { - Ok(sockets) } } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index f8550e18f2..a3faf1724d 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -247,7 +247,7 @@ impl Default for ServerWorkerConfig { Self { shutdown_timeout: Duration::from_secs(30), max_blocking_threads, - max_concurrent_connections: 25600, + max_concurrent_connections: 25_600, } } } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 315e3eff18..5919438bc8 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -170,7 +170,7 @@ async fn test_max_concurrent_connections() { // Set a relative higher backlog. .backlog(12) // max connection for a worker is 3. - .maxconn(max_conn) + .max_concurrent_connections(max_conn) .workers(1) .disable_signals() .bind("test", addr, move || { From 4c0eaca5811b20b1295f16474047ba7324985650 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 25 Oct 2021 18:03:52 +0100 Subject: [PATCH 03/11] convert Server::bind to accept a normal service factory --- actix-server/CHANGES.md | 8 ++- actix-server/Cargo.toml | 2 +- actix-server/examples/startup-fail.rs | 25 ++++++++ actix-server/examples/tcp-echo.rs | 15 +++-- actix-server/src/builder.rs | 80 +++++++++++++++---------- actix-server/src/lib.rs | 5 +- actix-server/src/server.rs | 13 +++- actix-server/src/service.rs | 59 ++++++++----------- actix-server/src/test_server.rs | 11 +++- actix-server/src/worker.rs | 11 +++- actix-server/tests/test_server.rs | 25 ++++---- actix-service/CHANGES.md | 1 + actix-service/src/and_then.rs | 85 +++++++++++++++++++++++++++ actix-service/src/ext.rs | 24 +++++--- actix-service/src/pipeline.rs | 12 ++-- actix-tls/tests/test_connect.rs | 26 ++++---- actix-tls/tests/test_resolvers.rs | 5 +- 17 files changed, 278 insertions(+), 129 deletions(-) create mode 100644 actix-server/examples/startup-fail.rs diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index a52b6faf55..c6b79d442c 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,11 +1,13 @@ # Changes ## Unreleased - 2021-xx-xx -* Rename `Server` to `ServerHandle`. [#???] -* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#???] +* Rename `Server` to `ServerHandle`. [#403] +* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#403] +* Remove wrapper `service::ServiceFactory` trait. [#403] +* `Server::bind` and related methods now take a regular `ServiceFactory` (from actix-service crate). [#403] * Minimum supported Rust version (MSRV) is now 1.52. -[#???]: https://github.com/actix/actix-net/pull/??? +[#403]: https://github.com/actix/actix-net/pull/403 ## 2.0.0-beta.6 - 2021-10-11 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 46a0ad1d62..a4c403f154 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -38,4 +38,4 @@ actix-rt = "2.0.0" bytes = "1" env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } -tokio = { version = "1.5.1", features = ["io-util"] } +tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } diff --git a/actix-server/examples/startup-fail.rs b/actix-server/examples/startup-fail.rs new file mode 100644 index 0000000000..9631b2d1bc --- /dev/null +++ b/actix-server/examples/startup-fail.rs @@ -0,0 +1,25 @@ +use std::io; + +use actix_rt::net::TcpStream; +use actix_server::Server; +use actix_service::fn_service; +use log::info; + +#[actix_rt::main] +async fn main() -> io::Result<()> { + env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace,mio=info")) + .init(); + + let addr = ("127.0.0.1", 8080); + info!("starting server on port: {}", &addr.0); + + Server::build() + .bind( + "startup-fail", + addr, + fn_service(move |mut _stream: TcpStream| async move { Ok::(42) }), + )? + .workers(2) + .run() + .await +} diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 8b038da4c3..e5e288fc6a 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -21,7 +21,6 @@ use actix_rt::net::TcpStream; use actix_server::Server; use actix_service::{fn_service, ServiceFactoryExt as _}; use bytes::BytesMut; -use futures_util::future::ok; use log::{error, info}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -39,11 +38,11 @@ async fn main() -> io::Result<()> { // logical CPU cores as the worker count. For this reason, the closure passed to bind needs // to return a service *factory*; so it can be created once per worker. Server::build() - .bind("echo", addr, move || { + .bind("echo", addr, { let count = Arc::clone(&count); let num2 = Arc::clone(&count); - fn_service(move |mut stream: TcpStream| { + let svc = fn_service(move |mut stream: TcpStream| { let count = Arc::clone(&count); async move { @@ -78,11 +77,15 @@ async fn main() -> io::Result<()> { } }) .map_err(|err| error!("Service Error: {:?}", err)) - .and_then(move |(_, size)| { + .and_then_send(move |(_, size)| { let num = num2.load(Ordering::SeqCst); info!("[{}] total bytes read: {}", num, size); - ok(size) - }) + async move { Ok(size) } + }); + + let svc2 = svc.clone(); + + svc2 })? .workers(1) .run() diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index f6c5f3c161..9eddb7765f 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,4 +1,5 @@ use std::{ + fmt, future::Future, io, mem, pin::Pin, @@ -7,32 +8,25 @@ use std::{ }; use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; +use actix_service::ServiceFactory; use log::{error, info}; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver}, oneshot, }; -use crate::accept::AcceptLoop; -use crate::join_all; -use crate::server::{ServerCommand, ServerHandle}; -use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; -use crate::signals::{Signal, Signals}; -use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::socket::{MioTcpListener, MioTcpSocket}; -use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; - -#[derive(Debug)] -#[non_exhaustive] -pub struct Server; - -impl Server { - /// Start server building process. - pub fn build() -> ServerBuilder { - ServerBuilder::default() - } -} +use crate::{ + accept::AcceptLoop, + join_all, + server::{ServerCommand, ServerHandle}, + service::{InternalServiceFactory, StreamNewService}, + signals::{Signal, Signals}, + socket::{ + MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, + }, + waker_queue::{WakerInterest, WakerQueue}, + worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}, +}; /// Server builder pub struct ServerBuilder { @@ -169,38 +163,48 @@ impl ServerBuilder { /// Binds to all network interface addresses that resolve from the `addr` argument. /// Eg. using `localhost` might bind to both IPv4 and IPv6 addresses. Bind to multiple distinct /// interfaces at the same time by passing a list of socket addresses. - pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result + pub fn bind( + mut self, + name: impl AsRef, + addr: U, + factory: F, + ) -> io::Result where - F: ServiceFactory, + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, U: ToSocketAddrs, { let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { let token = self.next_token(); + self.services.push(StreamNewService::create( name.as_ref().to_string(), token, factory.clone(), lst.local_addr()?, )); + self.sockets .push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); } + Ok(self) } /// Bind server to existing TCP listener. /// /// Useful when running as a systemd service and a socket FD can be passed to the process. - pub fn listen>( + pub fn listen( mut self, - name: N, + name: impl AsRef, lst: StdTcpListener, factory: F, ) -> io::Result where - F: ServiceFactory, + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, { lst.set_nonblocking(true)?; @@ -259,7 +263,7 @@ impl ServerBuilder { Signals::start(self.server.clone()); } - // start http server actor + // start http server let server = self.server.clone(); rt::spawn(self); server @@ -402,11 +406,19 @@ impl ServerBuilder { #[cfg(unix)] impl ServerBuilder { /// Add new unix domain service to the server. - pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result + pub fn bind_uds( + self, + name: impl AsRef, + addr: U, + factory: F, + ) -> io::Result where - F: ServiceFactory, - N: AsRef, + F: ServiceFactory + + Send + + Clone + + 'static, U: AsRef, + InitErr: fmt::Debug + Send + 'static, { // The path must not exist when we try to bind. // Try to remove it to avoid bind error. @@ -424,14 +436,18 @@ impl ServerBuilder { /// Add new unix domain service to the server. /// /// Useful when running as a systemd service and a socket FD can be passed to the process. - pub fn listen_uds>( + pub fn listen_uds( mut self, - name: N, + name: impl AsRef, lst: crate::socket::StdUnixListener, factory: F, ) -> io::Result where - F: ServiceFactory, + F: ServiceFactory + + Send + + Clone + + 'static, + InitErr: fmt::Debug + Send + 'static, { use std::net::{IpAddr, Ipv4Addr}; diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 6893d9c13c..ca8fd05327 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -14,9 +14,8 @@ mod test_server; mod waker_queue; mod worker; -pub use self::builder::{Server, ServerBuilder}; -pub use self::server::{ServerHandle}; -pub use self::service::ServiceFactory; +pub use self::builder::ServerBuilder; +pub use self::server::{Server, ServerHandle}; pub use self::test_server::TestServer; #[doc(hidden)] diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index c515164c7b..c04bc8be88 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -6,7 +6,18 @@ use std::task::{Context, Poll}; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; -use crate::signals::Signal; +use crate::{signals::Signal, ServerBuilder}; + +#[derive(Debug)] +#[non_exhaustive] +pub struct Server; + +impl Server { + /// Start server building process. + pub fn build() -> ServerBuilder { + ServerBuilder::default() + } +} #[derive(Debug)] pub(crate) enum ServerCommand { diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 28ffb4f15a..77b31afa69 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -1,20 +1,19 @@ -use std::marker::PhantomData; -use std::net::SocketAddr; -use std::task::{Context, Poll}; - -use actix_service::{Service, ServiceFactory as BaseServiceFactory}; +use std::{ + fmt, + marker::PhantomData, + net::SocketAddr, + task::{Context, Poll}, +}; + +use actix_service::{Service, ServiceFactory}; use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; use log::error; -use crate::socket::{FromStream, MioStream}; -use crate::worker::WorkerCounterGuard; - -pub trait ServiceFactory: Send + Clone + 'static { - type Factory: BaseServiceFactory; - - fn create(&self) -> Self::Factory; -} +use crate::{ + socket::{FromStream, MioStream}, + worker::WorkerCounterGuard, +}; pub(crate) trait InternalServiceFactory: Send { fn name(&self, token: usize) -> &str; @@ -80,17 +79,18 @@ where } } -pub(crate) struct StreamNewService, Io: FromStream> { +pub(crate) struct StreamNewService { name: String, inner: F, token: usize, addr: SocketAddr, - _t: PhantomData, + _t: PhantomData<(Io, InitErr)>, } -impl StreamNewService +impl StreamNewService where - F: ServiceFactory, + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, Io: FromStream + Send + 'static, { pub(crate) fn create( @@ -109,9 +109,10 @@ where } } -impl InternalServiceFactory for StreamNewService +impl InternalServiceFactory for StreamNewService where - F: ServiceFactory, + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, Io: FromStream + Send + 'static, { fn name(&self, _: usize) -> &str { @@ -130,28 +131,18 @@ where fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> { let token = self.token; - let fut = self.inner.create().new_service(()); + let fut = self.inner.new_service(()); Box::pin(async move { match fut.await { Ok(inner) => { let service = Box::new(StreamService::new(inner)) as _; Ok((token, service)) } - Err(_) => Err(()), + Err(err) => { + error!("{:?}", err); + Err(()) + } } }) } } - -impl ServiceFactory for F -where - F: Fn() -> T + Send + Clone + 'static, - T: BaseServiceFactory, - I: FromStream, -{ - type Factory = T; - - fn create(&self) -> T { - (self)() - } -} diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index ad6ee8ee35..77ddad3ed4 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -1,9 +1,10 @@ use std::sync::mpsc; -use std::{net, thread}; +use std::{fmt, net, thread}; use actix_rt::{net::TcpStream, System}; +use actix_service::ServiceFactory; -use crate::{Server, ServerBuilder, ServiceFactory}; +use crate::{Server, ServerBuilder}; /// A testing server. /// @@ -64,7 +65,11 @@ impl TestServer { } /// Start new test server with application factory. - pub fn with>(factory: F) -> TestServerRuntime { + pub fn with(factory: F) -> TestServerRuntime + where + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, + { let (tx, rx) = mpsc::channel(); // run server in separate thread diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index a3faf1724d..89d93b1630 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -28,6 +28,8 @@ use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; +const DEFAULT_SHUTDOWN_DURATION: Duration = Duration::from_secs(30); + /// Stop worker message. Returns `true` on successful graceful shutdown. /// and `false` if some connections still alive when shutdown execute. pub(crate) struct Stop { @@ -244,8 +246,9 @@ impl Default for ServerWorkerConfig { fn default() -> Self { // 512 is the default max blocking thread count of tokio runtime. let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1); + Self { - shutdown_timeout: Duration::from_secs(30), + shutdown_timeout: DEFAULT_SHUTDOWN_DURATION, max_blocking_threads, max_concurrent_connections: 25_600, } @@ -314,6 +317,7 @@ impl ServerWorker { .await .into_iter() .collect::, _>>(); + let services = match res { Ok(res) => res .into_iter() @@ -327,8 +331,9 @@ impl ServerWorker { services }) .into_boxed_slice(), - Err(e) => { - error!("Can not start worker: {:?}", e); + + Err(err) => { + error!("Can not start worker: {:?}", err); Arbiter::current().stop(); return; } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 5919438bc8..0605dba23f 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -25,9 +25,7 @@ fn test_bind() { let srv = Server::build() .workers(1) .disable_signals() - .bind("test", addr, move || { - fn_service(|_| async { Ok::<_, ()>(()) }) - })? + .bind("test", addr, fn_service(|_| async { Ok::<_, ()>(()) }))? .run(); let _ = tx.send((srv.clone(), actix_rt::System::current())); @@ -56,9 +54,7 @@ fn test_listen() { let srv = Server::build() .disable_signals() .workers(1) - .listen("test", lst, move || { - fn_service(|_| async { Ok::<_, ()>(()) }) - })? + .listen("test", lst, fn_service(|_| async { Ok::<_, ()>(()) }))? .run(); let _ = tx.send((srv.clone(), actix_rt::System::current())); @@ -94,13 +90,15 @@ fn test_start() { let srv = Server::build() .backlog(100) .disable_signals() - .bind("test", addr, move || { + .bind( + "test", + addr, fn_service(|io: TcpStream| async move { let mut f = Framed::new(io, BytesCodec); f.send(Bytes::from_static(b"test")).await.unwrap(); Ok::<_, ()>(()) - }) - })? + }), + )? .run(); let _ = tx.send((srv.clone(), actix_rt::System::current())); @@ -173,7 +171,7 @@ async fn test_max_concurrent_connections() { .max_concurrent_connections(max_conn) .workers(1) .disable_signals() - .bind("test", addr, move || { + .bind("test", addr, { let counter = counter.clone(); fn_service(move |_io: TcpStream| { let counter = counter.clone(); @@ -263,14 +261,14 @@ async fn test_service_restart() { let server = Server::build() .backlog(1) .disable_signals() - .bind("addr1", addr1, move || { + .bind("addr1", addr1, { let num = num.clone(); fn_factory(move || { let num = num.clone(); async move { Ok::<_, ()>(TestService(num)) } }) })? - .bind("addr2", addr2, move || { + .bind("addr2", addr2, { let num2 = num2.clone(); fn_factory(move || { let num2 = num2.clone(); @@ -319,6 +317,7 @@ async fn worker_restart() { use futures_core::future::LocalBoxFuture; use tokio::io::{AsyncReadExt, AsyncWriteExt}; + #[derive(Debug, Clone)] struct TestServiceFactory(Arc); impl ServiceFactory for TestServiceFactory { @@ -381,7 +380,7 @@ async fn worker_restart() { actix_rt::System::new().block_on(async { let server = Server::build() .disable_signals() - .bind("addr", addr, move || TestServiceFactory(counter.clone()))? + .bind("addr", addr, TestServiceFactory(counter.clone()))? .workers(2) .run(); diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index c01fd8ddab..5aaffc0600 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -1,6 +1,7 @@ # Changes ## Unreleased - 2021-xx-xx +* Add `.and_then_send()` & `AndThenSendServiceFactory` for creating `Send`able chained services. [#403] ## 2.0.1 - 2021-10-11 diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 3898007927..fdf219f060 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -261,6 +261,91 @@ where } } +/// `.and_then_send()` service factory combinator +pub struct AndThenSendServiceFactory +where + A: ServiceFactory, + A::Config: Clone, + B: ServiceFactory< + A::Response, + Config = A::Config, + Error = A::Error, + InitError = A::InitError, + >, +{ + inner_a: A, + inner_b: B, + _phantom: PhantomData, +} + +impl AndThenSendServiceFactory +where + A: ServiceFactory, + A::Config: Clone, + B: ServiceFactory< + A::Response, + Config = A::Config, + Error = A::Error, + InitError = A::InitError, + >, +{ + /// Create new `AndThenFactory` combinator + pub(crate) fn new(a: A, b: B) -> Self { + Self { + inner_a: a, + inner_b: b, + _phantom: PhantomData, + } + } +} + +impl ServiceFactory for AndThenSendServiceFactory +where + A: ServiceFactory, + A::Config: Clone, + B: ServiceFactory< + A::Response, + Config = A::Config, + Error = A::Error, + InitError = A::InitError, + >, +{ + type Response = B::Response; + type Error = A::Error; + + type Config = A::Config; + type Service = AndThenService; + type InitError = A::InitError; + type Future = AndThenServiceFactoryResponse; + + fn new_service(&self, cfg: A::Config) -> Self::Future { + AndThenServiceFactoryResponse::new( + self.inner_a.new_service(cfg.clone()), + self.inner_b.new_service(cfg), + ) + } +} + +impl Clone for AndThenSendServiceFactory +where + A: ServiceFactory, + A::Config: Clone, + B: ServiceFactory< + A::Response, + Config = A::Config, + Error = A::Error, + InitError = A::InitError, + >, +{ + fn clone(&self) -> Self { + Self { + inner_a: self.inner_a.clone(), + inner_b: self.inner_b.clone(), + _phantom: PhantomData, + } + } +} + #[cfg(test)] mod tests { use alloc::rc::Rc; diff --git a/actix-service/src/ext.rs b/actix-service/src/ext.rs index f5fe6ed11e..48ddb12755 100644 --- a/actix-service/src/ext.rs +++ b/actix-service/src/ext.rs @@ -1,10 +1,4 @@ -use crate::{ - and_then::{AndThenService, AndThenServiceFactory}, - map::Map, - map_err::MapErr, - transform_err::TransformMapInitErr, - IntoService, IntoServiceFactory, Service, ServiceFactory, Transform, -}; +use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory, Transform, and_then::{AndThenSendServiceFactory, AndThenService, AndThenServiceFactory}, map::Map, map_err::MapErr, transform_err::TransformMapInitErr}; /// An extension trait for [`Service`]s that provides a variety of convenient adapters. pub trait ServiceExt: Service { @@ -105,6 +99,22 @@ pub trait ServiceFactoryExt: ServiceFactory { { AndThenServiceFactory::new(self, factory.into_factory()) } + + /// Call another service after call to this one has resolved successfully. + fn and_then_send(self, factory: I) -> AndThenSendServiceFactory + where + Self: Sized, + Self::Config: Clone, + I: IntoServiceFactory, + SF1: ServiceFactory< + Self::Response, + Config = Self::Config, + Error = Self::Error, + InitError = Self::InitError, + >, + { + AndThenSendServiceFactory::new(self, factory.into_factory()) + } } impl ServiceFactoryExt for SF where SF: ServiceFactory {} diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index 2c71a74bfd..b3cb72f295 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -175,13 +175,13 @@ where factory: I, ) -> PipelineFactory< impl ServiceFactory< - Req, - Response = SF1::Response, - Error = SF::Error, - Config = SF::Config, - InitError = SF::InitError, + Req, + Response = SF1::Response, + Error = SF::Error, + Config = SF::Config, + InitError = SF::InitError, Service = impl Service + Clone, - > + Clone, + > + Clone, Req, > where diff --git a/actix-tls/tests/test_connect.rs b/actix-tls/tests/test_connect.rs index 564151ceeb..9738d9e06d 100755 --- a/actix-tls/tests/test_connect.rs +++ b/actix-tls/tests/test_connect.rs @@ -50,13 +50,11 @@ async fn test_rustls_string() { #[actix_rt::test] async fn test_static_str() { - let srv = TestServer::with(|| { - fn_service(|io: TcpStream| async { - let mut framed = Framed::new(io, BytesCodec); - framed.send(Bytes::from_static(b"test")).await?; - Ok::<_, io::Error>(()) - }) - }); + let srv = TestServer::with(fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + })); let conn = actix_connect::default_connector(); @@ -75,13 +73,11 @@ async fn test_static_str() { #[actix_rt::test] async fn test_new_service() { - let srv = TestServer::with(|| { - fn_service(|io: TcpStream| async { - let mut framed = Framed::new(io, BytesCodec); - framed.send(Bytes::from_static(b"test")).await?; - Ok::<_, io::Error>(()) - }) - }); + let srv = TestServer::with(fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + })); let factory = actix_connect::default_connector_factory(); @@ -133,7 +129,7 @@ async fn test_rustls_uri() { #[actix_rt::test] async fn test_local_addr() { - let srv = TestServer::with(|| { + let srv = TestServer::with({ fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?; diff --git a/actix-tls/tests/test_resolvers.rs b/actix-tls/tests/test_resolvers.rs index 40ee21fa6c..37958c9830 100644 --- a/actix-tls/tests/test_resolvers.rs +++ b/actix-tls/tests/test_resolvers.rs @@ -38,8 +38,9 @@ async fn custom_resolver() { async fn custom_resolver_connect() { use trust_dns_resolver::TokioAsyncResolver; - let srv = - TestServer::with(|| fn_service(|_io: TcpStream| async { Ok::<_, io::Error>(()) })); + let srv = TestServer::with(fn_service(|_io: TcpStream| async { + Ok::<_, io::Error>(()) + })); struct MyResolver { trust_dns: TokioAsyncResolver, From 9b9869f1ddafc0b9d86fb3aeed3874e44b44510c Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 25 Oct 2021 18:13:08 +0100 Subject: [PATCH 04/11] fix startup fail example --- actix-server/examples/startup-fail.rs | 12 ++++++++++-- actix-server/examples/tcp-echo.rs | 2 +- actix-service/src/ext.rs | 8 +++++++- actix-service/src/pipeline.rs | 12 ++++++------ 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/actix-server/examples/startup-fail.rs b/actix-server/examples/startup-fail.rs index 9631b2d1bc..f26f441ec1 100644 --- a/actix-server/examples/startup-fail.rs +++ b/actix-server/examples/startup-fail.rs @@ -2,7 +2,7 @@ use std::io; use actix_rt::net::TcpStream; use actix_server::Server; -use actix_service::fn_service; +use actix_service::{fn_factory, fn_service}; use log::info; #[actix_rt::main] @@ -17,7 +17,15 @@ async fn main() -> io::Result<()> { .bind( "startup-fail", addr, - fn_service(move |mut _stream: TcpStream| async move { Ok::(42) }), + fn_factory(|| async move { + if 1 > 2 { + Ok(fn_service(move |mut _stream: TcpStream| async move { + Ok::(0) + })) + } else { + Err(42) + } + }), )? .workers(2) .run() diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index e5e288fc6a..dfe5e01e3b 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -87,7 +87,7 @@ async fn main() -> io::Result<()> { svc2 })? - .workers(1) + .workers(2) .run() .await } diff --git a/actix-service/src/ext.rs b/actix-service/src/ext.rs index 48ddb12755..622b0f5556 100644 --- a/actix-service/src/ext.rs +++ b/actix-service/src/ext.rs @@ -1,4 +1,10 @@ -use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory, Transform, and_then::{AndThenSendServiceFactory, AndThenService, AndThenServiceFactory}, map::Map, map_err::MapErr, transform_err::TransformMapInitErr}; +use crate::{ + and_then::{AndThenSendServiceFactory, AndThenService, AndThenServiceFactory}, + map::Map, + map_err::MapErr, + transform_err::TransformMapInitErr, + IntoService, IntoServiceFactory, Service, ServiceFactory, Transform, +}; /// An extension trait for [`Service`]s that provides a variety of convenient adapters. pub trait ServiceExt: Service { diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index b3cb72f295..2c71a74bfd 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -175,13 +175,13 @@ where factory: I, ) -> PipelineFactory< impl ServiceFactory< - Req, - Response = SF1::Response, - Error = SF::Error, - Config = SF::Config, - InitError = SF::InitError, + Req, + Response = SF1::Response, + Error = SF::Error, + Config = SF::Config, + InitError = SF::InitError, Service = impl Service + Clone, - > + Clone, + > + Clone, Req, > where From 448626d5431050a1ef61c418ad834b902d2bd1f7 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 25 Oct 2021 18:42:23 +0100 Subject: [PATCH 05/11] fix tls examples --- actix-server/src/builder.rs | 4 ++-- actix-server/src/service.rs | 21 +++++++++++---------- actix-server/src/worker.rs | 8 ++++---- actix-tls/examples/tcp-rustls.rs | 8 ++++---- actix-tls/tests/test_connect.rs | 8 ++++---- 5 files changed, 25 insertions(+), 24 deletions(-) diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 9eddb7765f..6128898dca 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -19,7 +19,7 @@ use crate::{ accept::AcceptLoop, join_all, server::{ServerCommand, ServerHandle}, - service::{InternalServiceFactory, StreamNewService}, + service::{ServerServiceFactory, StreamNewService}, signals::{Signal, Signals}, socket::{ MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, @@ -34,7 +34,7 @@ pub struct ServerBuilder { token: usize, backlog: u32, handles: Vec<(usize, WorkerHandleServer)>, - services: Vec>, + services: Vec>, sockets: Vec<(usize, String, MioListener)>, accept: AcceptLoop, exit: bool, diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 77b31afa69..ff7f0d4c6a 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -15,11 +15,12 @@ use crate::{ worker::WorkerCounterGuard, }; -pub(crate) trait InternalServiceFactory: Send { +pub(crate) trait ServerServiceFactory: Send { fn name(&self, token: usize) -> &str; - fn clone_factory(&self) -> Box; + fn clone_factory(&self) -> Box; + /// Initialize Mio stream handler service and return it with its service token. fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>; } @@ -55,7 +56,7 @@ where { type Response = (); type Error = (); - type Future = Ready>; + type Future = Ready>; fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll> { self.service.poll_ready(ctx).map_err(|_| ()) @@ -71,8 +72,8 @@ where }); Ok(()) } - Err(e) => { - error!("Can not convert to an async tcp stream: {}", e); + Err(err) => { + error!("Can not convert Mio stream to an async TCP stream: {}", err); Err(()) } }) @@ -98,7 +99,7 @@ where token: usize, inner: F, addr: SocketAddr, - ) -> Box { + ) -> Box { Box::new(Self { name, token, @@ -109,7 +110,7 @@ where } } -impl InternalServiceFactory for StreamNewService +impl ServerServiceFactory for StreamNewService where F: ServiceFactory + Send + Clone + 'static, InitErr: fmt::Debug + Send + 'static, @@ -119,7 +120,7 @@ where &self.name } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box { Box::new(Self { name: self.name.clone(), inner: self.inner.clone(), @@ -134,8 +135,8 @@ where let fut = self.inner.new_service(()); Box::pin(async move { match fut.await { - Ok(inner) => { - let service = Box::new(StreamService::new(inner)) as _; + Ok(svc) => { + let service = Box::new(StreamService::new(svc)) as _; Ok((token, service)) } Err(err) => { diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 89d93b1630..5b8fcd47ae 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -24,7 +24,7 @@ use tokio::sync::{ }; use crate::join_all; -use crate::service::{BoxedServerService, InternalServiceFactory}; +use crate::service::{BoxedServerService, ServerServiceFactory}; use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; @@ -198,7 +198,7 @@ impl WorkerHandleServer { /// Service worker. /// -/// Worker accepts Socket objects via unbounded channel and starts stream processing. +/// Worker accepts socket objects via unbounded channel and starts stream processing. pub(crate) struct ServerWorker { // UnboundedReceiver should always be the first field. // It must be dropped as soon as ServerWorker dropping. @@ -206,7 +206,7 @@ pub(crate) struct ServerWorker { rx2: UnboundedReceiver, counter: WorkerCounter, services: Box<[WorkerService]>, - factories: Box<[Box]>, + factories: Box<[Box]>, state: WorkerState, shutdown_timeout: Duration, } @@ -272,7 +272,7 @@ impl ServerWorkerConfig { impl ServerWorker { pub(crate) fn start( idx: usize, - factories: Vec>, + factories: Vec>, waker_queue: WakerQueue, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { diff --git a/actix-tls/examples/tcp-rustls.rs b/actix-tls/examples/tcp-rustls.rs index 3ef34e35c7..7cf56e6d60 100644 --- a/actix-tls/examples/tcp-rustls.rs +++ b/actix-tls/examples/tcp-rustls.rs @@ -30,7 +30,7 @@ use std::{ }; use actix_rt::net::TcpStream; -use actix_server::ServerHandle; +use actix_server::Server; use actix_service::ServiceFactoryExt as _; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use futures_util::future::ok; @@ -67,15 +67,15 @@ async fn main() -> io::Result<()> { let addr = ("127.0.0.1", 8443); info!("starting server on port: {}", &addr.0); - ServerHandle::build() - .bind("tls-example", addr, move || { + Server::build() + .bind("tls-example", addr, { let count = Arc::clone(&count); // Set up TLS service factory tls_acceptor .clone() .map_err(|err| println!("Rustls error: {:?}", err)) - .and_then(move |stream: TlsStream| { + .and_then_send(move |stream: TlsStream| { let num = count.fetch_add(1, Ordering::Relaxed); info!("[{}] Got TLS connection: {:?}", num, &*stream); ok(()) diff --git a/actix-tls/tests/test_connect.rs b/actix-tls/tests/test_connect.rs index 9738d9e06d..8317c16f0f 100755 --- a/actix-tls/tests/test_connect.rs +++ b/actix-tls/tests/test_connect.rs @@ -17,7 +17,7 @@ use actix_tls::connect::{self as actix_connect, Connect}; #[cfg(feature = "openssl")] #[actix_rt::test] async fn test_string() { - let srv = TestServer::with(|| { + let srv = TestServer::with({ fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?; @@ -34,7 +34,7 @@ async fn test_string() { #[cfg(feature = "rustls")] #[actix_rt::test] async fn test_rustls_string() { - let srv = TestServer::with(|| { + let srv = TestServer::with({ fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?; @@ -94,7 +94,7 @@ async fn test_new_service() { async fn test_openssl_uri() { use std::convert::TryFrom; - let srv = TestServer::with(|| { + let srv = TestServer::with({ fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?; @@ -113,7 +113,7 @@ async fn test_openssl_uri() { async fn test_rustls_uri() { use std::convert::TryFrom; - let srv = TestServer::with(|| { + let srv = TestServer::with({ fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?; From 336e98e95007b1ea3d4007235bdf8334afdd3e14 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 28 Oct 2021 20:57:01 +0100 Subject: [PATCH 06/11] fix doc test --- actix-server/src/test_server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index 77ddad3ed4..1d6f0fdc9f 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -18,8 +18,8 @@ use crate::{Server, ServerBuilder}; /// /// #[actix_rt::main] /// async fn main() { -/// let srv = TestServer::with(|| fn_service( -/// |sock| async move { +/// let srv = TestServer::with(fn_service(|sock| +/// async move { /// println!("New connection: {:?}", sock); /// Ok::<_, ()>(()) /// } From 75a877b631af621a704191b5ad6dec4b12bdd575 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 28 Oct 2021 20:57:36 +0100 Subject: [PATCH 07/11] fmt --- actix-server/src/test_server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index 1d6f0fdc9f..0d8c0766fb 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -13,8 +13,8 @@ use crate::{Server, ServerBuilder}; /// /// # Examples /// ``` -/// use actix_service::fn_service; /// use actix_server::TestServer; +/// use actix_service::fn_service; /// /// #[actix_rt::main] /// async fn main() { From e49fedbfe7504657de0272dd6c9ebc52419a0489 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 28 Oct 2021 20:59:44 +0100 Subject: [PATCH 08/11] doc --- actix-server/src/test_server.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index 0d8c0766fb..8756488d6b 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -1,5 +1,4 @@ -use std::sync::mpsc; -use std::{fmt, net, thread}; +use std::{fmt, net, sync::mpsc, thread}; use actix_rt::{net::TcpStream, System}; use actix_service::ServiceFactory; @@ -28,9 +27,10 @@ use crate::{Server, ServerBuilder}; /// println!("SOCKET: {:?}", srv.connect()); /// } /// ``` +#[non_exhaustive] pub struct TestServer; -/// Test server runtime +/// Test server runtime. pub struct TestServerRuntime { addr: net::SocketAddr, host: String, @@ -39,7 +39,7 @@ pub struct TestServerRuntime { } impl TestServer { - /// Start new server with server builder. + /// Start new server using server builder. pub fn start(mut factory: F) -> TestServerRuntime where F: FnMut(ServerBuilder) -> ServerBuilder + Send + 'static, @@ -64,7 +64,7 @@ impl TestServer { } } - /// Start new test server with application factory. + /// Start new test server with default settings using application factory. pub fn with(factory: F) -> TestServerRuntime where F: ServiceFactory + Send + Clone + 'static, From f7985c585a504ee98dcdb3054a2db21c911d35cb Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 1 Nov 2021 00:36:53 +0000 Subject: [PATCH 09/11] Update actix-server/src/builder.rs Co-authored-by: Ali MJ Al-Nasrawy --- actix-server/src/builder.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 6128898dca..f0720126e9 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -163,6 +163,8 @@ impl ServerBuilder { /// Binds to all network interface addresses that resolve from the `addr` argument. /// Eg. using `localhost` might bind to both IPv4 and IPv6 addresses. Bind to multiple distinct /// interfaces at the same time by passing a list of socket addresses. + /// + /// This fails only if all addresses fail to bind. pub fn bind( mut self, name: impl AsRef, From 3c6f586b892f9eaa5eb3c9cb05995c0596359be2 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 1 Nov 2021 02:19:20 +0000 Subject: [PATCH 10/11] doc tweaks --- actix-server/src/builder.rs | 14 ++++++-------- actix-service/src/and_then.rs | 18 +++++++++--------- actix-service/src/apply_cfg.rs | 2 +- actix-service/src/ext.rs | 2 +- actix-service/src/map.rs | 2 +- actix-service/src/pipeline.rs | 7 +++---- 6 files changed, 21 insertions(+), 24 deletions(-) diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 6128898dca..b1657d97e7 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -503,15 +503,13 @@ pub(super) fn bind_addr( if success { Ok(sockets) + } else if let Some(err) = err.take() { + Err(err) } else { - if let Some(err) = err.take() { - Err(err) - } else { - Err(io::Error::new( - io::ErrorKind::Other, - "Can not bind to socket address", - )) - } + Err(io::Error::new( + io::ErrorKind::Other, + "Can not bind to socket address", + )) } } diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index fdf219f060..6a5d3acee2 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -14,7 +14,7 @@ use super::{Service, ServiceFactory}; /// Service for the `and_then` combinator, chaining a computation onto the end of another service /// which completes successfully. /// -/// This is created by the `Pipeline::and_then` method. +/// Created by the `.and_then()` combinator. pub struct AndThenService(Rc<(A, B)>, PhantomData); impl AndThenService { @@ -116,7 +116,7 @@ where } } -/// `.and_then()` service factory combinator +/// Service factory created by the `.and_then()` combinator. pub struct AndThenServiceFactory where A: ServiceFactory, @@ -326,16 +326,16 @@ where } } -impl Clone for AndThenSendServiceFactory +impl Clone for AndThenSendServiceFactory where - A: ServiceFactory, + A: ServiceFactory + Clone, A::Config: Clone, B: ServiceFactory< - A::Response, - Config = A::Config, - Error = A::Error, - InitError = A::InitError, - >, + A::Response, + Config = A::Config, + Error = A::Error, + InitError = A::InitError, + > + Clone, { fn clone(&self) -> Self { Self { diff --git a/actix-service/src/apply_cfg.rs b/actix-service/src/apply_cfg.rs index 25fc5fc279..8b75dfb2bc 100644 --- a/actix-service/src/apply_cfg.rs +++ b/actix-service/src/apply_cfg.rs @@ -63,7 +63,7 @@ where } } -/// Convert `Fn(Config, &Server) -> Future` fn to NewService\ +/// Convert `Fn(Config, &Server) -> Future` fn to ServiceFactory. struct ApplyConfigService where S1: Service, diff --git a/actix-service/src/ext.rs b/actix-service/src/ext.rs index 622b0f5556..0f82dc1ba6 100644 --- a/actix-service/src/ext.rs +++ b/actix-service/src/ext.rs @@ -44,7 +44,7 @@ pub trait ServiceExt: Service { /// Call another service after call to this one has resolved successfully. /// /// This function can be used to chain two services together and ensure that the second service - /// isn't called until call to the fist service have finished. Result of the call to the first + /// isn't called until call to the fist service has resolved. Result of the call to the first /// service is used as an input parameter for the second service's call. /// /// Note that this function consumes the receiving service and returns a wrapped version of it. diff --git a/actix-service/src/map.rs b/actix-service/src/map.rs index 12fd439556..025d8ce58f 100644 --- a/actix-service/src/map.rs +++ b/actix-service/src/map.rs @@ -103,7 +103,7 @@ where } } -/// `MapNewService` new service combinator +/// `MapServiceFactory` new service combinator. pub struct MapServiceFactory { a: A, f: F, diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index 2c71a74bfd..dd86be6e61 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -238,8 +238,7 @@ where } } - /// Map this service's output to a different type, returning a new service - /// of the resulting type. + /// Map this service's output to a different type, returning a new service. pub fn map(self, f: F) -> PipelineFactory, Req> where Self: Sized, @@ -251,7 +250,7 @@ where } } - /// Map this service's error to a different error, returning a new service. + /// Map this service's error to a different type, returning a new service. pub fn map_err( self, f: F, @@ -266,7 +265,7 @@ where } } - /// Map this factory's init error to a different error, returning a new service. + /// Map this factory's init error to a different type, returning a new service. pub fn map_init_err(self, f: F) -> PipelineFactory, Req> where Self: Sized, From 5097b12b7c816b8d40b151a57987ae18a42da29f Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 1 Nov 2021 03:19:32 +0000 Subject: [PATCH 11/11] remove and_then_send --- actix-server/examples/tcp-echo.rs | 87 +++++++++-------- actix-service/CHANGES.md | 4 +- actix-service/src/and_then.rs | 85 ----------------- actix-service/src/ext.rs | 18 +--- actix-service/src/fn_service.rs | 149 +++++++++++++++++++++++++++++- actix-tls/examples/tcp-rustls.rs | 41 +++++--- 6 files changed, 227 insertions(+), 157 deletions(-) diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index dfe5e01e3b..a6786b0833 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -19,7 +19,7 @@ use std::{ use actix_rt::net::TcpStream; use actix_server::Server; -use actix_service::{fn_service, ServiceFactoryExt as _}; +use actix_service::{fn_factory, fn_service, ServiceExt as _}; use bytes::BytesMut; use log::{error, info}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -39,53 +39,62 @@ async fn main() -> io::Result<()> { // to return a service *factory*; so it can be created once per worker. Server::build() .bind("echo", addr, { - let count = Arc::clone(&count); - let num2 = Arc::clone(&count); - - let svc = fn_service(move |mut stream: TcpStream| { + fn_factory::<_, (), _, _, _, _>(move || { let count = Arc::clone(&count); async move { - let num = count.fetch_add(1, Ordering::SeqCst); - let num = num + 1; - - let mut size = 0; - let mut buf = BytesMut::new(); - - loop { - match stream.read_buf(&mut buf).await { - // end of stream; bail from loop - Ok(0) => break, - - // more bytes to process - Ok(bytes_read) => { - info!("[{}] read {} bytes", num, bytes_read); - stream.write_all(&buf[size..]).await.unwrap(); - size += bytes_read; - } + let count = Arc::clone(&count); + let count2 = Arc::clone(&count); + + let svc = fn_service(move |mut stream: TcpStream| { + let count = Arc::clone(&count); + + let num = count.fetch_add(1, Ordering::SeqCst) + 1; + + info!( + "[{}] accepting connection from: {}", + num, + stream.peer_addr().unwrap() + ); + + async move { + let mut size = 0; + let mut buf = BytesMut::new(); - // stream error; bail from loop with error - Err(err) => { - error!("Stream Error: {:?}", err); - return Err(()); + loop { + match stream.read_buf(&mut buf).await { + // end of stream; bail from loop + Ok(0) => break, + + // more bytes to process + Ok(bytes_read) => { + info!("[{}] read {} bytes", num, bytes_read); + stream.write_all(&buf[size..]).await.unwrap(); + size += bytes_read; + } + + // stream error; bail from loop with error + Err(err) => { + error!("Stream Error: {:?}", err); + return Err(()); + } + } } + + // send data down service pipeline + Ok((buf.freeze(), size)) } - } + }) + .map_err(|err| error!("Service Error: {:?}", err)) + .and_then(move |(_, size)| { + let num = count2.load(Ordering::SeqCst); + info!("[{}] total bytes read: {}", num, size); + async move { Ok(size) } + }); - // send data down service pipeline - Ok((buf.freeze(), size)) + Ok::<_, ()>(svc.clone()) } }) - .map_err(|err| error!("Service Error: {:?}", err)) - .and_then_send(move |(_, size)| { - let num = num2.load(Ordering::SeqCst); - info!("[{}] total bytes read: {}", num, size); - async move { Ok(size) } - }); - - let svc2 = svc.clone(); - - svc2 })? .workers(2) .run() diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index 5aaffc0600..8a2e98a7f1 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -1,7 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx -* Add `.and_then_send()` & `AndThenSendServiceFactory` for creating `Send`able chained services. [#403] +* `fn_factory[_with_config]` types now impl `Send` even when config, service, request types do not. [#403] + +[#403]: https://github.com/actix/actix-net/pull/403 ## 2.0.1 - 2021-10-11 diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 6a5d3acee2..2a7e18d1d0 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -261,91 +261,6 @@ where } } -/// `.and_then_send()` service factory combinator -pub struct AndThenSendServiceFactory -where - A: ServiceFactory, - A::Config: Clone, - B: ServiceFactory< - A::Response, - Config = A::Config, - Error = A::Error, - InitError = A::InitError, - >, -{ - inner_a: A, - inner_b: B, - _phantom: PhantomData, -} - -impl AndThenSendServiceFactory -where - A: ServiceFactory, - A::Config: Clone, - B: ServiceFactory< - A::Response, - Config = A::Config, - Error = A::Error, - InitError = A::InitError, - >, -{ - /// Create new `AndThenFactory` combinator - pub(crate) fn new(a: A, b: B) -> Self { - Self { - inner_a: a, - inner_b: b, - _phantom: PhantomData, - } - } -} - -impl ServiceFactory for AndThenSendServiceFactory -where - A: ServiceFactory, - A::Config: Clone, - B: ServiceFactory< - A::Response, - Config = A::Config, - Error = A::Error, - InitError = A::InitError, - >, -{ - type Response = B::Response; - type Error = A::Error; - - type Config = A::Config; - type Service = AndThenService; - type InitError = A::InitError; - type Future = AndThenServiceFactoryResponse; - - fn new_service(&self, cfg: A::Config) -> Self::Future { - AndThenServiceFactoryResponse::new( - self.inner_a.new_service(cfg.clone()), - self.inner_b.new_service(cfg), - ) - } -} - -impl Clone for AndThenSendServiceFactory -where - A: ServiceFactory + Clone, - A::Config: Clone, - B: ServiceFactory< - A::Response, - Config = A::Config, - Error = A::Error, - InitError = A::InitError, - > + Clone, -{ - fn clone(&self) -> Self { - Self { - inner_a: self.inner_a.clone(), - inner_b: self.inner_b.clone(), - _phantom: PhantomData, - } - } -} - #[cfg(test)] mod tests { use alloc::rc::Rc; diff --git a/actix-service/src/ext.rs b/actix-service/src/ext.rs index 0f82dc1ba6..284b4ecca8 100644 --- a/actix-service/src/ext.rs +++ b/actix-service/src/ext.rs @@ -1,5 +1,5 @@ use crate::{ - and_then::{AndThenSendServiceFactory, AndThenService, AndThenServiceFactory}, + and_then::{AndThenService, AndThenServiceFactory}, map::Map, map_err::MapErr, transform_err::TransformMapInitErr, @@ -105,22 +105,6 @@ pub trait ServiceFactoryExt: ServiceFactory { { AndThenServiceFactory::new(self, factory.into_factory()) } - - /// Call another service after call to this one has resolved successfully. - fn and_then_send(self, factory: I) -> AndThenSendServiceFactory - where - Self: Sized, - Self::Config: Clone, - I: IntoServiceFactory, - SF1: ServiceFactory< - Self::Response, - Config = Self::Config, - Error = Self::Error, - InitError = Self::InitError, - >, - { - AndThenSendServiceFactory::new(self, factory.into_factory()) - } } impl ServiceFactoryExt for SF where SF: ServiceFactory {} diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index f83ef81fe6..6f71e04e19 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -3,6 +3,7 @@ use core::{future::Future, marker::PhantomData}; use crate::{ok, IntoService, IntoServiceFactory, Ready, Service, ServiceFactory}; /// Create `ServiceFactory` for function that can act as a `Service` +// TODO: remove unnecessary Cfg type param pub fn fn_service( f: F, ) -> FnServiceFactory @@ -48,6 +49,7 @@ where /// Ok(()) /// } /// ``` +// TODO: remove unnecessary Cfg type param pub fn fn_factory( f: F, ) -> FnServiceNoConfig @@ -160,7 +162,7 @@ where Fut: Future>, { f: F, - _t: PhantomData<(Req, Cfg)>, + _t: PhantomData, } impl FnServiceFactory @@ -237,7 +239,7 @@ where Srv: Service, { f: F, - _t: PhantomData<(Fut, Cfg, Req, Srv, Err)>, + _t: PhantomData, } impl FnServiceConfig @@ -293,7 +295,7 @@ where Fut: Future>, { f: F, - _t: PhantomData<(Cfg, Req)>, + _t: PhantomData, } impl FnServiceNoConfig @@ -353,10 +355,11 @@ where mod tests { use core::task::Poll; + use alloc::rc::Rc; use futures_util::future::lazy; use super::*; - use crate::{ok, Service, ServiceFactory}; + use crate::{boxed, ok, Service, ServiceExt, ServiceFactory, ServiceFactoryExt}; #[actix_rt::test] async fn test_fn_service() { @@ -391,4 +394,142 @@ mod tests { assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv", 1)); } + + // these three properties of a service factory are usually important + fn is_static(_t: &T) {} + fn impls_clone(_t: &T) {} + fn impls_send(_t: &T) {} + + #[actix_rt::test] + async fn test_fn_factory_impl_send() { + let svc_fac = fn_factory_with_config(|cfg: usize| { + ok::<_, ()>(fn_service(move |()| ok::<_, ()>(("srv", cfg)))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + // Cfg type is explicitly !Send + let svc_fac = fn_factory_with_config(|cfg: Rc| { + let cfg = Rc::clone(&cfg); + ok::<_, ()>(fn_service(move |_: ()| ok::<_, ()>(("srv", *cfg)))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + let svc_fac = fn_factory::<_, (), _, _, _, _>(|| { + ok::<_, ()>(fn_service(move |()| ok::<_, ()>("srv"))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + // Req type is explicitly !Send + let svc_fac = fn_factory::<_, (), _, _, _, _>(|| { + ok::<_, ()>(fn_service(move |_: Rc<()>| ok::<_, ()>("srv"))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + // Service type is explicitly !Send + let svc_fac = fn_factory::<_, (), _, _, _, _>(|| { + ok::<_, ()>(boxed::rc_service(fn_service(move |_: ()| { + ok::<_, ()>("srv") + }))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + } + + #[actix_rt::test] + async fn test_service_combinators_impls() { + #[derive(Clone)] + struct Ident; + + impl Service for Ident { + type Response = T; + type Error = (); + type Future = Ready>; + + crate::always_ready!(); + + fn call(&self, req: T) -> Self::Future { + ok(req) + } + } + + let svc = Ident; + is_static(&svc); + impls_clone(&svc); + impls_send(&svc); + + let svc = ServiceExt::map(Ident, core::convert::identity); + impls_send(&svc); + svc.call(()).await.unwrap(); + + let svc = ServiceExt::map_err(Ident, core::convert::identity); + impls_send(&svc); + svc.call(()).await.unwrap(); + + let svc = ServiceExt::and_then(Ident, Ident); + // impls_send(&svc); // fails to compile :( + svc.call(()).await.unwrap(); + + // let svc = ServiceExt::and_then_send(Ident, Ident); + // impls_send(&svc); + // svc.call(()).await.unwrap(); + } + + #[actix_rt::test] + async fn test_factory_combinators_impls() { + #[derive(Clone)] + struct Ident; + + impl ServiceFactory for Ident { + type Response = T; + type Error = (); + type Config = (); + // explicitly !Send result service + type Service = boxed::RcService; + type InitError = (); + type Future = Ready>; + + fn new_service(&self, _cfg: Self::Config) -> Self::Future { + ok(boxed::rc_service(fn_service(ok))) + } + } + + let svc_fac = Ident; + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + let svc_fac = ServiceFactoryExt::map(Ident, core::convert::identity); + impls_send(&svc_fac); + let svc = svc_fac.new_service(()).await.unwrap(); + svc.call(()).await.unwrap(); + + let svc_fac = ServiceFactoryExt::map_err(Ident, core::convert::identity); + impls_send(&svc_fac); + let svc = svc_fac.new_service(()).await.unwrap(); + svc.call(()).await.unwrap(); + + let svc_fac = ServiceFactoryExt::map_init_err(Ident, core::convert::identity); + impls_send(&svc_fac); + let svc = svc_fac.new_service(()).await.unwrap(); + svc.call(()).await.unwrap(); + + let svc_fac = ServiceFactoryExt::and_then(Ident, Ident); + // impls_send(&svc_fac); // fails to compile :( + let svc = svc_fac.new_service(()).await.unwrap(); + svc.call(()).await.unwrap(); + + // let svc_fac = ServiceFactoryExt::and_then_send(Ident, Ident); + // impls_send(&svc_fac); + // let svc = svc_fac.new_service(()).await.unwrap(); + // svc.call(()).await.unwrap(); + } } diff --git a/actix-tls/examples/tcp-rustls.rs b/actix-tls/examples/tcp-rustls.rs index 7cf56e6d60..39d43b1adb 100644 --- a/actix-tls/examples/tcp-rustls.rs +++ b/actix-tls/examples/tcp-rustls.rs @@ -31,21 +31,24 @@ use std::{ use actix_rt::net::TcpStream; use actix_server::Server; -use actix_service::ServiceFactoryExt as _; +use actix_service::{fn_factory, fn_service, ServiceExt as _, ServiceFactory}; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use futures_util::future::ok; use log::info; use rustls::{server::ServerConfig, Certificate, PrivateKey}; use rustls_pemfile::{certs, rsa_private_keys}; +const CERT_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/cert.pem"]; +const KEY_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/key.pem"]; + #[actix_rt::main] async fn main() -> io::Result<()> { env::set_var("RUST_LOG", "info"); env_logger::init(); // Load TLS key and cert files - let cert_file = &mut BufReader::new(File::open("./examples/cert.pem").unwrap()); - let key_file = &mut BufReader::new(File::open("./examples/key.pem").unwrap()); + let cert_file = &mut BufReader::new(File::open(CERT_PATH).unwrap()); + let key_file = &mut BufReader::new(File::open(KEY_PATH).unwrap()); let cert_chain = certs(cert_file) .unwrap() @@ -72,14 +75,30 @@ async fn main() -> io::Result<()> { let count = Arc::clone(&count); // Set up TLS service factory - tls_acceptor - .clone() - .map_err(|err| println!("Rustls error: {:?}", err)) - .and_then_send(move |stream: TlsStream| { - let num = count.fetch_add(1, Ordering::Relaxed); - info!("[{}] Got TLS connection: {:?}", num, &*stream); - ok(()) - }) + // note: moving rustls acceptor into fn_factory scope + fn_factory(move || { + // manually call new_service so that and_then can be used from ServiceExt + // type annotation for inner stream type is required + let svc = >::new_service( + &tls_acceptor, + (), + ); + + let count = Arc::clone(&count); + + async move { + let svc = svc + .await? + .map_err(|err| println!("Rustls error: {:?}", err)) + .and_then(fn_service(move |stream: TlsStream| { + let num = count.fetch_add(1, Ordering::Relaxed) + 1; + info!("[{}] Got TLS connection: {:?}", num, &*stream); + ok(()) + })); + + Ok::<_, ()>(svc) + } + }) })? .workers(1) .run()