diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index a52712f759..c6b79d442c 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,8 +1,14 @@ # Changes ## Unreleased - 2021-xx-xx +* Rename `Server` to `ServerHandle`. [#403] +* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#403] +* Remove wrapper `service::ServiceFactory` trait. [#403] +* `Server::bind` and related methods now take a regular `ServiceFactory` (from actix-service crate). [#403] * Minimum supported Rust version (MSRV) is now 1.52. +[#403]: https://github.com/actix/actix-net/pull/403 + ## 2.0.0-beta.6 - 2021-10-11 * Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374] diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 46a0ad1d62..a4c403f154 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -38,4 +38,4 @@ actix-rt = "2.0.0" bytes = "1" env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } -tokio = { version = "1.5.1", features = ["io-util"] } +tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } diff --git a/actix-server/examples/startup-fail.rs b/actix-server/examples/startup-fail.rs new file mode 100644 index 0000000000..f26f441ec1 --- /dev/null +++ b/actix-server/examples/startup-fail.rs @@ -0,0 +1,33 @@ +use std::io; + +use actix_rt::net::TcpStream; +use actix_server::Server; +use actix_service::{fn_factory, fn_service}; +use log::info; + +#[actix_rt::main] +async fn main() -> io::Result<()> { + env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace,mio=info")) + .init(); + + let addr = ("127.0.0.1", 8080); + info!("starting server on port: {}", &addr.0); + + Server::build() + .bind( + "startup-fail", + addr, + fn_factory(|| async move { + if 1 > 2 { + Ok(fn_service(move |mut _stream: TcpStream| async move { + Ok::(0) + })) + } else { + Err(42) + } + }), + )? + .workers(2) + .run() + .await +} diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 8b038da4c3..a6786b0833 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -19,9 +19,8 @@ use std::{ use actix_rt::net::TcpStream; use actix_server::Server; -use actix_service::{fn_service, ServiceFactoryExt as _}; +use actix_service::{fn_factory, fn_service, ServiceExt as _}; use bytes::BytesMut; -use futures_util::future::ok; use log::{error, info}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -39,52 +38,65 @@ async fn main() -> io::Result<()> { // logical CPU cores as the worker count. For this reason, the closure passed to bind needs // to return a service *factory*; so it can be created once per worker. Server::build() - .bind("echo", addr, move || { - let count = Arc::clone(&count); - let num2 = Arc::clone(&count); - - fn_service(move |mut stream: TcpStream| { + .bind("echo", addr, { + fn_factory::<_, (), _, _, _, _>(move || { let count = Arc::clone(&count); async move { - let num = count.fetch_add(1, Ordering::SeqCst); - let num = num + 1; - - let mut size = 0; - let mut buf = BytesMut::new(); - - loop { - match stream.read_buf(&mut buf).await { - // end of stream; bail from loop - Ok(0) => break, - - // more bytes to process - Ok(bytes_read) => { - info!("[{}] read {} bytes", num, bytes_read); - stream.write_all(&buf[size..]).await.unwrap(); - size += bytes_read; - } + let count = Arc::clone(&count); + let count2 = Arc::clone(&count); + + let svc = fn_service(move |mut stream: TcpStream| { + let count = Arc::clone(&count); + + let num = count.fetch_add(1, Ordering::SeqCst) + 1; + + info!( + "[{}] accepting connection from: {}", + num, + stream.peer_addr().unwrap() + ); - // stream error; bail from loop with error - Err(err) => { - error!("Stream Error: {:?}", err); - return Err(()); + async move { + let mut size = 0; + let mut buf = BytesMut::new(); + + loop { + match stream.read_buf(&mut buf).await { + // end of stream; bail from loop + Ok(0) => break, + + // more bytes to process + Ok(bytes_read) => { + info!("[{}] read {} bytes", num, bytes_read); + stream.write_all(&buf[size..]).await.unwrap(); + size += bytes_read; + } + + // stream error; bail from loop with error + Err(err) => { + error!("Stream Error: {:?}", err); + return Err(()); + } + } } + + // send data down service pipeline + Ok((buf.freeze(), size)) } - } + }) + .map_err(|err| error!("Service Error: {:?}", err)) + .and_then(move |(_, size)| { + let num = count2.load(Ordering::SeqCst); + info!("[{}] total bytes read: {}", num, size); + async move { Ok(size) } + }); - // send data down service pipeline - Ok((buf.freeze(), size)) + Ok::<_, ()>(svc.clone()) } }) - .map_err(|err| error!("Service Error: {:?}", err)) - .and_then(move |(_, size)| { - let num = num2.load(Ordering::SeqCst); - info!("[{}] total bytes read: {}", num, size); - ok(size) - }) })? - .workers(1) + .workers(2) .run() .await } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 5fef2fe2c4..a872853cb4 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -8,7 +8,7 @@ use actix_rt::{ use log::{debug, error, info}; use mio::{Interest, Poll, Token as MioToken}; -use crate::server::Server; +use crate::server::ServerHandle; use crate::socket::MioListener; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandleAccept}; @@ -30,13 +30,13 @@ struct ServerSocketInfo { /// /// It would also listen to `ServerCommand` and push interests to `WakerQueue`. pub(crate) struct AcceptLoop { - srv: Option, + srv: Option, poll: Option, waker: WakerQueue, } impl AcceptLoop { - pub fn new(srv: Server) -> Self { + pub fn new(srv: ServerHandle) -> Self { let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); let waker = WakerQueue::new(poll.registry()) .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); @@ -74,7 +74,7 @@ struct Accept { poll: Poll, waker: WakerQueue, handles: Vec, - srv: Server, + srv: ServerHandle, next: usize, avail: Availability, paused: bool, @@ -153,7 +153,7 @@ impl Accept { poll: Poll, waker: WakerQueue, socks: Vec<(usize, MioListener)>, - srv: Server, + srv: ServerHandle, handles: Vec, ) { // Accept runs in its own thread and would want to spawn additional futures to current @@ -176,7 +176,7 @@ impl Accept { waker: WakerQueue, socks: Vec<(usize, MioListener)>, handles: Vec, - srv: Server, + srv: ServerHandle, ) -> (Accept, Vec) { let sockets = socks .into_iter() diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 8d0684c64a..4fffeb54d3 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,4 +1,5 @@ use std::{ + fmt, future::Future, io, mem, pin::Pin, @@ -7,21 +8,25 @@ use std::{ }; use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; +use actix_service::ServiceFactory; use log::{error, info}; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver}, oneshot, }; -use crate::accept::AcceptLoop; -use crate::join_all; -use crate::server::{Server, ServerCommand}; -use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; -use crate::signals::{Signal, Signals}; -use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::socket::{MioTcpListener, MioTcpSocket}; -use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; +use crate::{ + accept::AcceptLoop, + join_all, + server::{ServerCommand, ServerHandle}, + service::{ServerServiceFactory, StreamNewService}, + signals::{Signal, Signals}, + socket::{ + MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, + }, + waker_queue::{WakerInterest, WakerQueue}, + worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}, +}; /// Server builder pub struct ServerBuilder { @@ -29,13 +34,13 @@ pub struct ServerBuilder { token: usize, backlog: u32, handles: Vec<(usize, WorkerHandleServer)>, - services: Vec>, + services: Vec>, sockets: Vec<(usize, String, MioListener)>, accept: AcceptLoop, exit: bool, no_signals: bool, cmd: UnboundedReceiver, - server: Server, + server: ServerHandle, notify: Vec>, worker_config: ServerWorkerConfig, } @@ -50,7 +55,7 @@ impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { let (tx, rx) = unbounded_channel(); - let server = Server::new(tx); + let server = ServerHandle::new(tx); ServerBuilder { threads: num_cpus::get(), @@ -114,15 +119,21 @@ impl ServerBuilder { /// Sets the maximum per-worker number of concurrent connections. /// - /// All socket listeners will stop accepting connections when this limit is - /// reached for each worker. + /// All socket listeners will stop accepting connections when this limit is reached for + /// each worker. /// - /// By default max connections is set to a 25k per worker. - pub fn maxconn(mut self, num: usize) -> Self { + /// By default max connections is set to a 25,600 per worker. + pub fn max_concurrent_connections(mut self, num: usize) -> Self { self.worker_config.max_concurrent_connections(num); self } + #[doc(hidden)] + #[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")] + pub fn maxconn(self, num: usize) -> Self { + self.max_concurrent_connections(num) + } + /// Stop Actix system. pub fn system_exit(mut self) -> Self { self.exit = true; @@ -147,91 +158,61 @@ impl ServerBuilder { self } - /// Add new service to the server. - pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result + /// Bind server to socket addresses. + /// + /// Binds to all network interface addresses that resolve from the `addr` argument. + /// Eg. using `localhost` might bind to both IPv4 and IPv6 addresses. Bind to multiple distinct + /// interfaces at the same time by passing a list of socket addresses. + /// + /// This fails only if all addresses fail to bind. + pub fn bind( + mut self, + name: impl AsRef, + addr: U, + factory: F, + ) -> io::Result where - F: ServiceFactory, + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, U: ToSocketAddrs, { let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { let token = self.next_token(); + self.services.push(StreamNewService::create( name.as_ref().to_string(), token, factory.clone(), lst.local_addr()?, )); + self.sockets .push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); } - Ok(self) - } - /// Add new unix domain service to the server. - #[cfg(unix)] - pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result - where - F: ServiceFactory, - N: AsRef, - U: AsRef, - { - // The path must not exist when we try to bind. - // Try to remove it to avoid bind error. - if let Err(e) = std::fs::remove_file(addr.as_ref()) { - // NotFound is expected and not an issue. Anything else is. - if e.kind() != std::io::ErrorKind::NotFound { - return Err(e); - } - } - - let lst = crate::socket::StdUnixListener::bind(addr)?; - self.listen_uds(name, lst, factory) - } - - /// Add new unix domain service to the server. - /// Useful when running as a systemd service and - /// a socket FD can be acquired using the systemd crate. - #[cfg(unix)] - pub fn listen_uds>( - mut self, - name: N, - lst: crate::socket::StdUnixListener, - factory: F, - ) -> io::Result - where - F: ServiceFactory, - { - use std::net::{IpAddr, Ipv4Addr}; - lst.set_nonblocking(true)?; - let token = self.next_token(); - let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); - self.services.push(StreamNewService::create( - name.as_ref().to_string(), - token, - factory, - addr, - )); - self.sockets - .push((token, name.as_ref().to_string(), MioListener::from(lst))); Ok(self) } - /// Add new service to the server. - pub fn listen>( + /// Bind server to existing TCP listener. + /// + /// Useful when running as a systemd service and a socket FD can be passed to the process. + pub fn listen( mut self, - name: N, + name: impl AsRef, lst: StdTcpListener, factory: F, ) -> io::Result where - F: ServiceFactory, + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, { lst.set_nonblocking(true)?; - let addr = lst.local_addr()?; + let addr = lst.local_addr()?; let token = self.next_token(); + self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -246,7 +227,7 @@ impl ServerBuilder { } /// Starts processing incoming connections and return server controller. - pub fn run(mut self) -> Server { + pub fn run(mut self) -> ServerHandle { if self.sockets.is_empty() { panic!("Server should have at least one bound socket"); } else { @@ -284,7 +265,7 @@ impl ServerBuilder { Signals::start(self.server.clone()); } - // start http server actor + // start http server let server = self.server.clone(); rt::spawn(self); server @@ -423,6 +404,74 @@ impl ServerBuilder { } } +/// Unix Domain Socket (UDS) support. +#[cfg(unix)] +impl ServerBuilder { + /// Add new unix domain service to the server. + pub fn bind_uds( + self, + name: impl AsRef, + addr: U, + factory: F, + ) -> io::Result + where + F: ServiceFactory + + Send + + Clone + + 'static, + U: AsRef, + InitErr: fmt::Debug + Send + 'static, + { + // The path must not exist when we try to bind. + // Try to remove it to avoid bind error. + if let Err(e) = std::fs::remove_file(addr.as_ref()) { + // NotFound is expected and not an issue. Anything else is. + if e.kind() != std::io::ErrorKind::NotFound { + return Err(e); + } + } + + let lst = crate::socket::StdUnixListener::bind(addr)?; + self.listen_uds(name, lst, factory) + } + + /// Add new unix domain service to the server. + /// + /// Useful when running as a systemd service and a socket FD can be passed to the process. + pub fn listen_uds( + mut self, + name: impl AsRef, + lst: crate::socket::StdUnixListener, + factory: F, + ) -> io::Result + where + F: ServiceFactory + + Send + + Clone + + 'static, + InitErr: fmt::Debug + Send + 'static, + { + use std::net::{IpAddr, Ipv4Addr}; + + lst.set_nonblocking(true)?; + + let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let token = self.next_token(); + + self.services.push(StreamNewService::create( + name.as_ref().to_string(), + token, + factory, + addr, + )); + + self.sockets + .push((token, name.as_ref().to_string(), MioListener::from(lst))); + + Ok(self) + } +} + impl Future for ServerBuilder { type Output = (); @@ -441,29 +490,28 @@ pub(super) fn bind_addr( backlog: u32, ) -> io::Result> { let mut err = None; - let mut succ = false; + let mut success = false; let mut sockets = Vec::new(); + for addr in addr.to_socket_addrs()? { match create_tcp_listener(addr, backlog) { Ok(lst) => { - succ = true; + success = true; sockets.push(lst); } Err(e) => err = Some(e), } } - if !succ { - if let Some(e) = err.take() { - Err(e) - } else { - Err(io::Error::new( - io::ErrorKind::Other, - "Can not bind to address.", - )) - } - } else { + if success { Ok(sockets) + } else if let Some(err) = err.take() { + Err(err) + } else { + Err(io::Error::new( + io::ErrorKind::Other, + "Can not bind to socket address", + )) } } diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index b2117191d3..ca8fd05327 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -15,8 +15,7 @@ mod waker_queue; mod worker; pub use self::builder::ServerBuilder; -pub use self::server::Server; -pub use self::service::ServiceFactory; +pub use self::server::{Server, ServerHandle}; pub use self::test_server::TestServer; #[doc(hidden)] diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index f0dfca0b7f..c04bc8be88 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -6,8 +6,18 @@ use std::task::{Context, Poll}; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; -use crate::builder::ServerBuilder; -use crate::signals::Signal; +use crate::{signals::Signal, ServerBuilder}; + +#[derive(Debug)] +#[non_exhaustive] +pub struct Server; + +impl Server { + /// Start server building process. + pub fn build() -> ServerBuilder { + ServerBuilder::default() + } +} #[derive(Debug)] pub(crate) enum ServerCommand { @@ -32,19 +42,14 @@ pub(crate) enum ServerCommand { /// /// A graceful shutdown will wait for all workers to stop first. #[derive(Debug)] -pub struct Server( +pub struct ServerHandle( UnboundedSender, Option>, ); -impl Server { +impl ServerHandle { pub(crate) fn new(tx: UnboundedSender) -> Self { - Server(tx, None) - } - - /// Start server building process - pub fn build() -> ServerBuilder { - ServerBuilder::default() + ServerHandle(tx, None) } pub(crate) fn signal(&self, sig: Signal) { @@ -91,13 +96,13 @@ impl Server { } } -impl Clone for Server { +impl Clone for ServerHandle { fn clone(&self) -> Self { Self(self.0.clone(), None) } } -impl Future for Server { +impl Future for ServerHandle { type Output = io::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 28ffb4f15a..ff7f0d4c6a 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -1,26 +1,26 @@ -use std::marker::PhantomData; -use std::net::SocketAddr; -use std::task::{Context, Poll}; - -use actix_service::{Service, ServiceFactory as BaseServiceFactory}; +use std::{ + fmt, + marker::PhantomData, + net::SocketAddr, + task::{Context, Poll}, +}; + +use actix_service::{Service, ServiceFactory}; use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; use log::error; -use crate::socket::{FromStream, MioStream}; -use crate::worker::WorkerCounterGuard; - -pub trait ServiceFactory: Send + Clone + 'static { - type Factory: BaseServiceFactory; - - fn create(&self) -> Self::Factory; -} +use crate::{ + socket::{FromStream, MioStream}, + worker::WorkerCounterGuard, +}; -pub(crate) trait InternalServiceFactory: Send { +pub(crate) trait ServerServiceFactory: Send { fn name(&self, token: usize) -> &str; - fn clone_factory(&self) -> Box; + fn clone_factory(&self) -> Box; + /// Initialize Mio stream handler service and return it with its service token. fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>; } @@ -56,7 +56,7 @@ where { type Response = (); type Error = (); - type Future = Ready>; + type Future = Ready>; fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll> { self.service.poll_ready(ctx).map_err(|_| ()) @@ -72,25 +72,26 @@ where }); Ok(()) } - Err(e) => { - error!("Can not convert to an async tcp stream: {}", e); + Err(err) => { + error!("Can not convert Mio stream to an async TCP stream: {}", err); Err(()) } }) } } -pub(crate) struct StreamNewService, Io: FromStream> { +pub(crate) struct StreamNewService { name: String, inner: F, token: usize, addr: SocketAddr, - _t: PhantomData, + _t: PhantomData<(Io, InitErr)>, } -impl StreamNewService +impl StreamNewService where - F: ServiceFactory, + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, Io: FromStream + Send + 'static, { pub(crate) fn create( @@ -98,7 +99,7 @@ where token: usize, inner: F, addr: SocketAddr, - ) -> Box { + ) -> Box { Box::new(Self { name, token, @@ -109,16 +110,17 @@ where } } -impl InternalServiceFactory for StreamNewService +impl ServerServiceFactory for StreamNewService where - F: ServiceFactory, + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, Io: FromStream + Send + 'static, { fn name(&self, _: usize) -> &str { &self.name } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box { Box::new(Self { name: self.name.clone(), inner: self.inner.clone(), @@ -130,28 +132,18 @@ where fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> { let token = self.token; - let fut = self.inner.create().new_service(()); + let fut = self.inner.new_service(()); Box::pin(async move { match fut.await { - Ok(inner) => { - let service = Box::new(StreamService::new(inner)) as _; + Ok(svc) => { + let service = Box::new(StreamService::new(svc)) as _; Ok((token, service)) } - Err(_) => Err(()), + Err(err) => { + error!("{:?}", err); + Err(()) + } } }) } } - -impl ServiceFactory for F -where - F: Fn() -> T + Send + Clone + 'static, - T: BaseServiceFactory, - I: FromStream, -{ - type Factory = T; - - fn create(&self) -> T { - (self)() - } -} diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index c9cdb45e81..b80fa7597e 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use crate::server::Server; +use crate::server::ServerHandle; /// Types of process signals. #[allow(dead_code)] @@ -20,7 +20,7 @@ pub(crate) enum Signal { /// Process signal listener. pub(crate) struct Signals { - srv: Server, + srv: ServerHandle, #[cfg(not(unix))] signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>, @@ -31,7 +31,7 @@ pub(crate) struct Signals { impl Signals { /// Spawns a signal listening future that is able to send commands to the `Server`. - pub(crate) fn start(srv: Server) { + pub(crate) fn start(srv: ServerHandle) { #[cfg(not(unix))] { actix_rt::spawn(Signals { diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index ad6ee8ee35..8756488d6b 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -1,9 +1,9 @@ -use std::sync::mpsc; -use std::{net, thread}; +use std::{fmt, net, sync::mpsc, thread}; use actix_rt::{net::TcpStream, System}; +use actix_service::ServiceFactory; -use crate::{Server, ServerBuilder, ServiceFactory}; +use crate::{Server, ServerBuilder}; /// A testing server. /// @@ -12,13 +12,13 @@ use crate::{Server, ServerBuilder, ServiceFactory}; /// /// # Examples /// ``` -/// use actix_service::fn_service; /// use actix_server::TestServer; +/// use actix_service::fn_service; /// /// #[actix_rt::main] /// async fn main() { -/// let srv = TestServer::with(|| fn_service( -/// |sock| async move { +/// let srv = TestServer::with(fn_service(|sock| +/// async move { /// println!("New connection: {:?}", sock); /// Ok::<_, ()>(()) /// } @@ -27,9 +27,10 @@ use crate::{Server, ServerBuilder, ServiceFactory}; /// println!("SOCKET: {:?}", srv.connect()); /// } /// ``` +#[non_exhaustive] pub struct TestServer; -/// Test server runtime +/// Test server runtime. pub struct TestServerRuntime { addr: net::SocketAddr, host: String, @@ -38,7 +39,7 @@ pub struct TestServerRuntime { } impl TestServer { - /// Start new server with server builder. + /// Start new server using server builder. pub fn start(mut factory: F) -> TestServerRuntime where F: FnMut(ServerBuilder) -> ServerBuilder + Send + 'static, @@ -63,8 +64,12 @@ impl TestServer { } } - /// Start new test server with application factory. - pub fn with>(factory: F) -> TestServerRuntime { + /// Start new test server with default settings using application factory. + pub fn with(factory: F) -> TestServerRuntime + where + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, + { let (tx, rx) = mpsc::channel(); // run server in separate thread diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index f8550e18f2..5b8fcd47ae 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -24,10 +24,12 @@ use tokio::sync::{ }; use crate::join_all; -use crate::service::{BoxedServerService, InternalServiceFactory}; +use crate::service::{BoxedServerService, ServerServiceFactory}; use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; +const DEFAULT_SHUTDOWN_DURATION: Duration = Duration::from_secs(30); + /// Stop worker message. Returns `true` on successful graceful shutdown. /// and `false` if some connections still alive when shutdown execute. pub(crate) struct Stop { @@ -196,7 +198,7 @@ impl WorkerHandleServer { /// Service worker. /// -/// Worker accepts Socket objects via unbounded channel and starts stream processing. +/// Worker accepts socket objects via unbounded channel and starts stream processing. pub(crate) struct ServerWorker { // UnboundedReceiver should always be the first field. // It must be dropped as soon as ServerWorker dropping. @@ -204,7 +206,7 @@ pub(crate) struct ServerWorker { rx2: UnboundedReceiver, counter: WorkerCounter, services: Box<[WorkerService]>, - factories: Box<[Box]>, + factories: Box<[Box]>, state: WorkerState, shutdown_timeout: Duration, } @@ -244,10 +246,11 @@ impl Default for ServerWorkerConfig { fn default() -> Self { // 512 is the default max blocking thread count of tokio runtime. let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1); + Self { - shutdown_timeout: Duration::from_secs(30), + shutdown_timeout: DEFAULT_SHUTDOWN_DURATION, max_blocking_threads, - max_concurrent_connections: 25600, + max_concurrent_connections: 25_600, } } } @@ -269,7 +272,7 @@ impl ServerWorkerConfig { impl ServerWorker { pub(crate) fn start( idx: usize, - factories: Vec>, + factories: Vec>, waker_queue: WakerQueue, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { @@ -314,6 +317,7 @@ impl ServerWorker { .await .into_iter() .collect::, _>>(); + let services = match res { Ok(res) => res .into_iter() @@ -327,8 +331,9 @@ impl ServerWorker { services }) .into_boxed_slice(), - Err(e) => { - error!("Can not start worker: {:?}", e); + + Err(err) => { + error!("Can not start worker: {:?}", err); Arbiter::current().stop(); return; } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 315e3eff18..0605dba23f 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -25,9 +25,7 @@ fn test_bind() { let srv = Server::build() .workers(1) .disable_signals() - .bind("test", addr, move || { - fn_service(|_| async { Ok::<_, ()>(()) }) - })? + .bind("test", addr, fn_service(|_| async { Ok::<_, ()>(()) }))? .run(); let _ = tx.send((srv.clone(), actix_rt::System::current())); @@ -56,9 +54,7 @@ fn test_listen() { let srv = Server::build() .disable_signals() .workers(1) - .listen("test", lst, move || { - fn_service(|_| async { Ok::<_, ()>(()) }) - })? + .listen("test", lst, fn_service(|_| async { Ok::<_, ()>(()) }))? .run(); let _ = tx.send((srv.clone(), actix_rt::System::current())); @@ -94,13 +90,15 @@ fn test_start() { let srv = Server::build() .backlog(100) .disable_signals() - .bind("test", addr, move || { + .bind( + "test", + addr, fn_service(|io: TcpStream| async move { let mut f = Framed::new(io, BytesCodec); f.send(Bytes::from_static(b"test")).await.unwrap(); Ok::<_, ()>(()) - }) - })? + }), + )? .run(); let _ = tx.send((srv.clone(), actix_rt::System::current())); @@ -170,10 +168,10 @@ async fn test_max_concurrent_connections() { // Set a relative higher backlog. .backlog(12) // max connection for a worker is 3. - .maxconn(max_conn) + .max_concurrent_connections(max_conn) .workers(1) .disable_signals() - .bind("test", addr, move || { + .bind("test", addr, { let counter = counter.clone(); fn_service(move |_io: TcpStream| { let counter = counter.clone(); @@ -263,14 +261,14 @@ async fn test_service_restart() { let server = Server::build() .backlog(1) .disable_signals() - .bind("addr1", addr1, move || { + .bind("addr1", addr1, { let num = num.clone(); fn_factory(move || { let num = num.clone(); async move { Ok::<_, ()>(TestService(num)) } }) })? - .bind("addr2", addr2, move || { + .bind("addr2", addr2, { let num2 = num2.clone(); fn_factory(move || { let num2 = num2.clone(); @@ -319,6 +317,7 @@ async fn worker_restart() { use futures_core::future::LocalBoxFuture; use tokio::io::{AsyncReadExt, AsyncWriteExt}; + #[derive(Debug, Clone)] struct TestServiceFactory(Arc); impl ServiceFactory for TestServiceFactory { @@ -381,7 +380,7 @@ async fn worker_restart() { actix_rt::System::new().block_on(async { let server = Server::build() .disable_signals() - .bind("addr", addr, move || TestServiceFactory(counter.clone()))? + .bind("addr", addr, TestServiceFactory(counter.clone()))? .workers(2) .run(); diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index c01fd8ddab..8a2e98a7f1 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +* `fn_factory[_with_config]` types now impl `Send` even when config, service, request types do not. [#403] + +[#403]: https://github.com/actix/actix-net/pull/403 ## 2.0.1 - 2021-10-11 diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 3898007927..2a7e18d1d0 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -14,7 +14,7 @@ use super::{Service, ServiceFactory}; /// Service for the `and_then` combinator, chaining a computation onto the end of another service /// which completes successfully. /// -/// This is created by the `Pipeline::and_then` method. +/// Created by the `.and_then()` combinator. pub struct AndThenService(Rc<(A, B)>, PhantomData); impl AndThenService { @@ -116,7 +116,7 @@ where } } -/// `.and_then()` service factory combinator +/// Service factory created by the `.and_then()` combinator. pub struct AndThenServiceFactory where A: ServiceFactory, diff --git a/actix-service/src/apply_cfg.rs b/actix-service/src/apply_cfg.rs index 25fc5fc279..8b75dfb2bc 100644 --- a/actix-service/src/apply_cfg.rs +++ b/actix-service/src/apply_cfg.rs @@ -63,7 +63,7 @@ where } } -/// Convert `Fn(Config, &Server) -> Future` fn to NewService\ +/// Convert `Fn(Config, &Server) -> Future` fn to ServiceFactory. struct ApplyConfigService where S1: Service, diff --git a/actix-service/src/ext.rs b/actix-service/src/ext.rs index f5fe6ed11e..284b4ecca8 100644 --- a/actix-service/src/ext.rs +++ b/actix-service/src/ext.rs @@ -44,7 +44,7 @@ pub trait ServiceExt: Service { /// Call another service after call to this one has resolved successfully. /// /// This function can be used to chain two services together and ensure that the second service - /// isn't called until call to the fist service have finished. Result of the call to the first + /// isn't called until call to the fist service has resolved. Result of the call to the first /// service is used as an input parameter for the second service's call. /// /// Note that this function consumes the receiving service and returns a wrapped version of it. diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index f83ef81fe6..6f71e04e19 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -3,6 +3,7 @@ use core::{future::Future, marker::PhantomData}; use crate::{ok, IntoService, IntoServiceFactory, Ready, Service, ServiceFactory}; /// Create `ServiceFactory` for function that can act as a `Service` +// TODO: remove unnecessary Cfg type param pub fn fn_service( f: F, ) -> FnServiceFactory @@ -48,6 +49,7 @@ where /// Ok(()) /// } /// ``` +// TODO: remove unnecessary Cfg type param pub fn fn_factory( f: F, ) -> FnServiceNoConfig @@ -160,7 +162,7 @@ where Fut: Future>, { f: F, - _t: PhantomData<(Req, Cfg)>, + _t: PhantomData, } impl FnServiceFactory @@ -237,7 +239,7 @@ where Srv: Service, { f: F, - _t: PhantomData<(Fut, Cfg, Req, Srv, Err)>, + _t: PhantomData, } impl FnServiceConfig @@ -293,7 +295,7 @@ where Fut: Future>, { f: F, - _t: PhantomData<(Cfg, Req)>, + _t: PhantomData, } impl FnServiceNoConfig @@ -353,10 +355,11 @@ where mod tests { use core::task::Poll; + use alloc::rc::Rc; use futures_util::future::lazy; use super::*; - use crate::{ok, Service, ServiceFactory}; + use crate::{boxed, ok, Service, ServiceExt, ServiceFactory, ServiceFactoryExt}; #[actix_rt::test] async fn test_fn_service() { @@ -391,4 +394,142 @@ mod tests { assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv", 1)); } + + // these three properties of a service factory are usually important + fn is_static(_t: &T) {} + fn impls_clone(_t: &T) {} + fn impls_send(_t: &T) {} + + #[actix_rt::test] + async fn test_fn_factory_impl_send() { + let svc_fac = fn_factory_with_config(|cfg: usize| { + ok::<_, ()>(fn_service(move |()| ok::<_, ()>(("srv", cfg)))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + // Cfg type is explicitly !Send + let svc_fac = fn_factory_with_config(|cfg: Rc| { + let cfg = Rc::clone(&cfg); + ok::<_, ()>(fn_service(move |_: ()| ok::<_, ()>(("srv", *cfg)))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + let svc_fac = fn_factory::<_, (), _, _, _, _>(|| { + ok::<_, ()>(fn_service(move |()| ok::<_, ()>("srv"))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + // Req type is explicitly !Send + let svc_fac = fn_factory::<_, (), _, _, _, _>(|| { + ok::<_, ()>(fn_service(move |_: Rc<()>| ok::<_, ()>("srv"))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + // Service type is explicitly !Send + let svc_fac = fn_factory::<_, (), _, _, _, _>(|| { + ok::<_, ()>(boxed::rc_service(fn_service(move |_: ()| { + ok::<_, ()>("srv") + }))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + } + + #[actix_rt::test] + async fn test_service_combinators_impls() { + #[derive(Clone)] + struct Ident; + + impl Service for Ident { + type Response = T; + type Error = (); + type Future = Ready>; + + crate::always_ready!(); + + fn call(&self, req: T) -> Self::Future { + ok(req) + } + } + + let svc = Ident; + is_static(&svc); + impls_clone(&svc); + impls_send(&svc); + + let svc = ServiceExt::map(Ident, core::convert::identity); + impls_send(&svc); + svc.call(()).await.unwrap(); + + let svc = ServiceExt::map_err(Ident, core::convert::identity); + impls_send(&svc); + svc.call(()).await.unwrap(); + + let svc = ServiceExt::and_then(Ident, Ident); + // impls_send(&svc); // fails to compile :( + svc.call(()).await.unwrap(); + + // let svc = ServiceExt::and_then_send(Ident, Ident); + // impls_send(&svc); + // svc.call(()).await.unwrap(); + } + + #[actix_rt::test] + async fn test_factory_combinators_impls() { + #[derive(Clone)] + struct Ident; + + impl ServiceFactory for Ident { + type Response = T; + type Error = (); + type Config = (); + // explicitly !Send result service + type Service = boxed::RcService; + type InitError = (); + type Future = Ready>; + + fn new_service(&self, _cfg: Self::Config) -> Self::Future { + ok(boxed::rc_service(fn_service(ok))) + } + } + + let svc_fac = Ident; + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + let svc_fac = ServiceFactoryExt::map(Ident, core::convert::identity); + impls_send(&svc_fac); + let svc = svc_fac.new_service(()).await.unwrap(); + svc.call(()).await.unwrap(); + + let svc_fac = ServiceFactoryExt::map_err(Ident, core::convert::identity); + impls_send(&svc_fac); + let svc = svc_fac.new_service(()).await.unwrap(); + svc.call(()).await.unwrap(); + + let svc_fac = ServiceFactoryExt::map_init_err(Ident, core::convert::identity); + impls_send(&svc_fac); + let svc = svc_fac.new_service(()).await.unwrap(); + svc.call(()).await.unwrap(); + + let svc_fac = ServiceFactoryExt::and_then(Ident, Ident); + // impls_send(&svc_fac); // fails to compile :( + let svc = svc_fac.new_service(()).await.unwrap(); + svc.call(()).await.unwrap(); + + // let svc_fac = ServiceFactoryExt::and_then_send(Ident, Ident); + // impls_send(&svc_fac); + // let svc = svc_fac.new_service(()).await.unwrap(); + // svc.call(()).await.unwrap(); + } } diff --git a/actix-service/src/map.rs b/actix-service/src/map.rs index 12fd439556..025d8ce58f 100644 --- a/actix-service/src/map.rs +++ b/actix-service/src/map.rs @@ -103,7 +103,7 @@ where } } -/// `MapNewService` new service combinator +/// `MapServiceFactory` new service combinator. pub struct MapServiceFactory { a: A, f: F, diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index 2c71a74bfd..dd86be6e61 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -238,8 +238,7 @@ where } } - /// Map this service's output to a different type, returning a new service - /// of the resulting type. + /// Map this service's output to a different type, returning a new service. pub fn map(self, f: F) -> PipelineFactory, Req> where Self: Sized, @@ -251,7 +250,7 @@ where } } - /// Map this service's error to a different error, returning a new service. + /// Map this service's error to a different type, returning a new service. pub fn map_err( self, f: F, @@ -266,7 +265,7 @@ where } } - /// Map this factory's init error to a different error, returning a new service. + /// Map this factory's init error to a different type, returning a new service. pub fn map_init_err(self, f: F) -> PipelineFactory, Req> where Self: Sized, diff --git a/actix-tls/examples/tcp-rustls.rs b/actix-tls/examples/tcp-rustls.rs index f347e16484..39d43b1adb 100644 --- a/actix-tls/examples/tcp-rustls.rs +++ b/actix-tls/examples/tcp-rustls.rs @@ -31,21 +31,24 @@ use std::{ use actix_rt::net::TcpStream; use actix_server::Server; -use actix_service::ServiceFactoryExt as _; +use actix_service::{fn_factory, fn_service, ServiceExt as _, ServiceFactory}; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use futures_util::future::ok; use log::info; use rustls::{server::ServerConfig, Certificate, PrivateKey}; use rustls_pemfile::{certs, rsa_private_keys}; +const CERT_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/cert.pem"]; +const KEY_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/key.pem"]; + #[actix_rt::main] async fn main() -> io::Result<()> { env::set_var("RUST_LOG", "info"); env_logger::init(); // Load TLS key and cert files - let cert_file = &mut BufReader::new(File::open("./examples/cert.pem").unwrap()); - let key_file = &mut BufReader::new(File::open("./examples/key.pem").unwrap()); + let cert_file = &mut BufReader::new(File::open(CERT_PATH).unwrap()); + let key_file = &mut BufReader::new(File::open(KEY_PATH).unwrap()); let cert_chain = certs(cert_file) .unwrap() @@ -68,18 +71,34 @@ async fn main() -> io::Result<()> { info!("starting server on port: {}", &addr.0); Server::build() - .bind("tls-example", addr, move || { + .bind("tls-example", addr, { let count = Arc::clone(&count); // Set up TLS service factory - tls_acceptor - .clone() - .map_err(|err| println!("Rustls error: {:?}", err)) - .and_then(move |stream: TlsStream| { - let num = count.fetch_add(1, Ordering::Relaxed); - info!("[{}] Got TLS connection: {:?}", num, &*stream); - ok(()) - }) + // note: moving rustls acceptor into fn_factory scope + fn_factory(move || { + // manually call new_service so that and_then can be used from ServiceExt + // type annotation for inner stream type is required + let svc = >::new_service( + &tls_acceptor, + (), + ); + + let count = Arc::clone(&count); + + async move { + let svc = svc + .await? + .map_err(|err| println!("Rustls error: {:?}", err)) + .and_then(fn_service(move |stream: TlsStream| { + let num = count.fetch_add(1, Ordering::Relaxed) + 1; + info!("[{}] Got TLS connection: {:?}", num, &*stream); + ok(()) + })); + + Ok::<_, ()>(svc) + } + }) })? .workers(1) .run() diff --git a/actix-tls/tests/test_connect.rs b/actix-tls/tests/test_connect.rs index 564151ceeb..8317c16f0f 100755 --- a/actix-tls/tests/test_connect.rs +++ b/actix-tls/tests/test_connect.rs @@ -17,7 +17,7 @@ use actix_tls::connect::{self as actix_connect, Connect}; #[cfg(feature = "openssl")] #[actix_rt::test] async fn test_string() { - let srv = TestServer::with(|| { + let srv = TestServer::with({ fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?; @@ -34,7 +34,7 @@ async fn test_string() { #[cfg(feature = "rustls")] #[actix_rt::test] async fn test_rustls_string() { - let srv = TestServer::with(|| { + let srv = TestServer::with({ fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?; @@ -50,13 +50,11 @@ async fn test_rustls_string() { #[actix_rt::test] async fn test_static_str() { - let srv = TestServer::with(|| { - fn_service(|io: TcpStream| async { - let mut framed = Framed::new(io, BytesCodec); - framed.send(Bytes::from_static(b"test")).await?; - Ok::<_, io::Error>(()) - }) - }); + let srv = TestServer::with(fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + })); let conn = actix_connect::default_connector(); @@ -75,13 +73,11 @@ async fn test_static_str() { #[actix_rt::test] async fn test_new_service() { - let srv = TestServer::with(|| { - fn_service(|io: TcpStream| async { - let mut framed = Framed::new(io, BytesCodec); - framed.send(Bytes::from_static(b"test")).await?; - Ok::<_, io::Error>(()) - }) - }); + let srv = TestServer::with(fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + })); let factory = actix_connect::default_connector_factory(); @@ -98,7 +94,7 @@ async fn test_new_service() { async fn test_openssl_uri() { use std::convert::TryFrom; - let srv = TestServer::with(|| { + let srv = TestServer::with({ fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?; @@ -117,7 +113,7 @@ async fn test_openssl_uri() { async fn test_rustls_uri() { use std::convert::TryFrom; - let srv = TestServer::with(|| { + let srv = TestServer::with({ fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?; @@ -133,7 +129,7 @@ async fn test_rustls_uri() { #[actix_rt::test] async fn test_local_addr() { - let srv = TestServer::with(|| { + let srv = TestServer::with({ fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?; diff --git a/actix-tls/tests/test_resolvers.rs b/actix-tls/tests/test_resolvers.rs index 40ee21fa6c..37958c9830 100644 --- a/actix-tls/tests/test_resolvers.rs +++ b/actix-tls/tests/test_resolvers.rs @@ -38,8 +38,9 @@ async fn custom_resolver() { async fn custom_resolver_connect() { use trust_dns_resolver::TokioAsyncResolver; - let srv = - TestServer::with(|| fn_service(|_io: TcpStream| async { Ok::<_, io::Error>(()) })); + let srv = TestServer::with(fn_service(|_io: TcpStream| async { + Ok::<_, io::Error>(()) + })); struct MyResolver { trust_dns: TokioAsyncResolver,