Skip to content

Commit

Permalink
bin/server: Use axum::serve(...).with_graceful_shutdown(...)
Browse files Browse the repository at this point in the history
  • Loading branch information
Turbo87 committed Dec 30, 2023
1 parent 1883445 commit ca6756e
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 112 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ http = "=1.0.0"
http-body = "=1.0.0"
http-body-util = "=0.1.0"
hyper = { version = "=1.1.0", features = ["client", "http1"] }
hyper-util = { version = "=0.1.2", features = ["tokio", "server-auto", "http1"] }
indexmap = { version = "=2.1.0", features = ["serde"] }
indicatif = "=0.17.7"
ipnetwork = "=0.20.0"
Expand Down
116 changes: 6 additions & 110 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@ use crates_io::middleware::normalize_path::normalize_path;
use crates_io::{metrics::LogEncoder, App, Emails};
use std::{sync::Arc, time::Duration};

use axum::extract::Request;
use axum::ServiceExt;
use crates_io_github::RealGitHubClient;
use hyper::body::Incoming;
use hyper_util::rt::TokioIo;
use prometheus::Encoder;
use reqwest::Client;
use std::io::Write;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::watch;
use tower::{Layer, Service};
use tower::Layer;

const CORE_THREADS: usize = 4;

Expand Down Expand Up @@ -62,11 +58,7 @@ fn main() -> anyhow::Result<()> {

let rt = builder.build().unwrap();

let mut make_service = axum_router.into_make_service_with_connect_info::<SocketAddr>();

// to understand the following implementation,
// see https://github.com/tokio-rs/axum/blob/axum-v0.7.2/examples/graceful-shutdown/src/main.rs
// and https://github.com/tokio-rs/axum/blob/axum-v0.7.2/examples/serve-with-hyper/src/main.rs
let make_service = axum_router.into_make_service_with_connect_info::<SocketAddr>();

// Block the main thread until the server has shutdown
rt.block_on(async {
Expand All @@ -79,106 +71,10 @@ fn main() -> anyhow::Result<()> {
// the test suite :)
info!("Listening at http://{addr}");

// Create a watch channel to track tasks that are handling connections and wait for them to
// complete.
let (close_tx, close_rx) = watch::channel(());

// Continuously accept new connections.
loop {
let (socket, remote_addr) = tokio::select! {
// Either accept a new connection...
result = listener.accept() => {
result.unwrap()
}
// ...or wait to receive a shutdown signal and stop the accept loop.
_ = shutdown_signal() => {
debug!("shutdown signal received, not accepting new connections");
break;
}
};

debug!("connection {remote_addr} accepted");

// We don't need to call `poll_ready` because `IntoMakeServiceWithConnectInfo` is always
// ready.
let tower_service = make_service.call(remote_addr).await.unwrap();

// Clone the watch receiver and move it into the task.
let close_rx = close_rx.clone();

// Spawn a task to handle the connection. That way we can serve multiple connections
// concurrently.
tokio::spawn(async move {
// Hyper has its own `AsyncRead` and `AsyncWrite` traits and doesn't use tokio.
// `TokioIo` converts between them.
let socket = TokioIo::new(socket);

// Hyper also has its own `Service` trait and doesn't use tower. We can use
// `hyper::service::service_fn` to create a hyper `Service` that calls our app through
// `tower::Service::call`.
let hyper_service =
hyper::service::service_fn(move |request: Request<Incoming>| {
// We have to clone `tower_service` because hyper's `Service` uses `&self` whereas
// tower's `Service` requires `&mut self`.
//
// We don't need to call `poll_ready` since `Router` is always ready.
tower_service.clone().call(request.map(axum::body::Body::new))
});

// `hyper_util::server::conn::auto::Builder` supports both http1 and http2 but doesn't
// support graceful so we have to use hyper directly and unfortunately pick between
// http1 and http2.
let conn = hyper::server::conn::http1::Builder::new()
.serve_connection(socket, hyper_service)
// `with_upgrades` is required for websockets.
.with_upgrades();

// `graceful_shutdown` requires a pinned connection.
let mut conn = std::pin::pin!(conn);

loop {
tokio::select! {
// Poll the connection. This completes when the client has closed the
// connection, graceful shutdown has completed, or we encounter a TCP error.
result = conn.as_mut() => {
if let Err(err) = result {
debug!("failed to serve connection: {err:#}");
}
break;
}
// Start graceful shutdown when we receive a shutdown signal.
//
// We use a loop to continue polling the connection to allow requests to finish
// after starting graceful shutdown. Our `Router` has `TimeoutLayer` so
// requests will finish after at most 30 seconds.
_ = shutdown_signal() => {
debug!("shutdown signal received, starting graceful connection shutdown");
conn.as_mut().graceful_shutdown();
}
}
}

debug!("connection {remote_addr} closed");

// Drop the watch receiver to signal to `main` that this task is done.
drop(close_rx);
});
}

info!("Starting graceful shutdown");

// We only care about the watch receivers that were moved into the tasks so close the residual
// receiver.
drop(close_rx);

// Close the listener to stop accepting new connections.
drop(listener);

// Wait for all tasks to complete.
debug!("waiting for {} tasks to finish", close_tx.receiver_count());
close_tx.closed().await;

Ok::<(), anyhow::Error>(())
// Run the server with graceful shutdown
axum::serve(listener, make_service)
.with_graceful_shutdown(shutdown_signal())
.await
})?;

info!("Persisting remaining downloads counters");
Expand Down

0 comments on commit ca6756e

Please sign in to comment.