Skip to content

Commit 74ca5ad

Browse files
Moved hyper support in a separate module. One can now use the hyper support without necessarily using our server, nor tokio net. (#14)
1 parent d22dd24 commit 74ca5ad

File tree

11 files changed

+104
-92
lines changed

11 files changed

+104
-92
lines changed

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ repository = "https://github.com/restatedev/sdk-rust"
88

99
[features]
1010
default = ["http_server", "rand", "uuid"]
11-
http_server = ["hyper", "http-body-util", "hyper-util", "tokio/net", "tokio/signal", "restate-sdk-shared-core/http"]
11+
hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"]
12+
http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal"]
1213

1314
[dependencies]
1415
bytes = "1.6.1"
1516
futures = "0.3"
1617
http = "1.1.0"
1718
http-body-util = { version = "0.1", optional = true }
18-
hyper = { version = "1.4.1", optional = true, features = ["server", "http2"] }
19+
hyper = { version = "1.4.1", optional = true}
1920
hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true }
2021
pin-project-lite = "0.2"
2122
rand = { version = "0.8.5", optional = true }
@@ -25,7 +26,7 @@ restate-sdk-shared-core = { version = "0.0.5" }
2526
serde = "1.0"
2627
serde_json = "1.0"
2728
thiserror = "1.0.63"
28-
tokio = { version = "1", default-features = false, features = ["sync", "macros"] }
29+
tokio = { version = "1", default-features = false, features = ["sync"] }
2930
tower-service = "0.3"
3031
tracing = "0.1"
3132
uuid = { version = "1.10.0", optional = true }

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl Greeter for GreeterImpl {
5151
async fn main() {
5252
// To enable logging/tracing
5353
// tracing_subscriber::fmt::init();
54-
HyperServer::new(
54+
HttpServer::new(
5555
Endpoint::builder()
5656
.with_service(GreeterImpl.serve())
5757
.build(),

examples/counter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl Counter for CounterImpl {
3838
#[tokio::main]
3939
async fn main() {
4040
tracing_subscriber::fmt::init();
41-
HyperServer::new(
41+
HttpServer::new(
4242
Endpoint::builder()
4343
.with_service(CounterImpl.serve())
4444
.build(),

examples/failures.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl FailureExample for FailureExampleImpl {
3232
#[tokio::main]
3333
async fn main() {
3434
tracing_subscriber::fmt::init();
35-
HyperServer::new(
35+
HttpServer::new(
3636
Endpoint::builder()
3737
.with_service(FailureExampleImpl.serve())
3838
.build(),

examples/greeter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ impl Greeter for GreeterImpl {
1616
#[tokio::main]
1717
async fn main() {
1818
tracing_subscriber::fmt::init();
19-
HyperServer::new(
19+
HttpServer::new(
2020
Endpoint::builder()
2121
.with_service(GreeterImpl.serve())
2222
.build(),

examples/run.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ impl RunExample for RunExampleImpl {
3333
#[tokio::main]
3434
async fn main() {
3535
tracing_subscriber::fmt::init();
36-
HyperServer::new(
36+
HttpServer::new(
3737
Endpoint::builder()
3838
.with_service(RunExampleImpl(reqwest::Client::new()).serve())
3939
.build(),

src/http_server.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use crate::endpoint::Endpoint;
2+
use crate::hyper::HyperEndpoint;
3+
use futures::FutureExt;
4+
use hyper::server::conn::http2;
5+
use hyper_util::rt::{TokioExecutor, TokioIo};
6+
use std::future::Future;
7+
use std::net::SocketAddr;
8+
use std::time::Duration;
9+
use tokio::net::TcpListener;
10+
use tracing::{info, warn};
11+
12+
pub struct HttpServer {
13+
endpoint: Endpoint,
14+
}
15+
16+
impl From<Endpoint> for HttpServer {
17+
fn from(endpoint: Endpoint) -> Self {
18+
Self { endpoint }
19+
}
20+
}
21+
22+
impl HttpServer {
23+
pub fn new(endpoint: Endpoint) -> Self {
24+
Self { endpoint }
25+
}
26+
27+
pub async fn listen_and_serve(self, addr: SocketAddr) {
28+
let listener = TcpListener::bind(addr).await.expect("listener can bind");
29+
self.serve(listener).await;
30+
}
31+
32+
pub async fn serve(self, listener: TcpListener) {
33+
self.serve_with_cancel(listener, tokio::signal::ctrl_c().map(|_| ()))
34+
.await;
35+
}
36+
37+
pub async fn serve_with_cancel(self, listener: TcpListener, cancel_signal_future: impl Future) {
38+
let endpoint = HyperEndpoint::new(self.endpoint);
39+
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
40+
41+
// when this signal completes, start shutdown
42+
let mut signal = std::pin::pin!(cancel_signal_future);
43+
44+
info!("Starting listening on {}", listener.local_addr().unwrap());
45+
46+
// Our server accept loop
47+
loop {
48+
tokio::select! {
49+
Ok((stream, remote)) = listener.accept() => {
50+
let endpoint = endpoint.clone();
51+
52+
let conn = http2::Builder::new(TokioExecutor::default())
53+
.serve_connection(TokioIo::new(stream), endpoint);
54+
55+
let fut = graceful.watch(conn);
56+
57+
tokio::spawn(async move {
58+
if let Err(e) = fut.await {
59+
warn!("Error serving connection {remote}: {:?}", e);
60+
}
61+
});
62+
},
63+
_ = &mut signal => {
64+
info!("Shutting down");
65+
// stop the accept loop
66+
break;
67+
}
68+
}
69+
}
70+
71+
// Wait graceful shutdown
72+
tokio::select! {
73+
_ = graceful.shutdown() => {},
74+
_ = tokio::time::sleep(Duration::from_secs(10)) => {
75+
warn!("Timed out waiting for all connections to close");
76+
}
77+
}
78+
}
79+
}

src/http.rs renamed to src/hyper.rs

Lines changed: 9 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -3,98 +3,29 @@ use crate::endpoint::{Endpoint, InputReceiver, OutputSender};
33
use bytes::Bytes;
44
use futures::future::BoxFuture;
55
use futures::{FutureExt, TryStreamExt};
6+
use http::header::CONTENT_TYPE;
7+
use http::{response, Request, Response};
68
use http_body_util::{BodyExt, Either, Full};
79
use hyper::body::{Body, Frame, Incoming};
8-
use hyper::header::CONTENT_TYPE;
9-
use hyper::http::response;
10-
use hyper::server::conn::http2;
1110
use hyper::service::Service;
12-
use hyper::{Request, Response};
13-
use hyper_util::rt::{TokioExecutor, TokioIo};
1411
use restate_sdk_shared_core::ResponseHead;
1512
use std::convert::Infallible;
16-
use std::future::{ready, Future, Ready};
17-
use std::net::SocketAddr;
13+
use std::future::{ready, Ready};
1814
use std::ops::Deref;
1915
use std::pin::Pin;
2016
use std::task::{ready, Context, Poll};
21-
use std::time::Duration;
22-
use tokio::net::TcpListener;
2317
use tokio::sync::mpsc;
24-
use tracing::{info, warn};
18+
use tracing::warn;
2519

26-
pub struct HyperServer {
27-
endpoint: Endpoint,
28-
}
29-
30-
impl From<Endpoint> for HyperServer {
31-
fn from(endpoint: Endpoint) -> Self {
32-
Self { endpoint }
33-
}
34-
}
20+
#[derive(Clone)]
21+
pub struct HyperEndpoint(Endpoint);
3522

36-
impl HyperServer {
23+
impl HyperEndpoint {
3724
pub fn new(endpoint: Endpoint) -> Self {
38-
Self { endpoint }
39-
}
40-
41-
pub async fn listen_and_serve(self, addr: SocketAddr) {
42-
let listener = TcpListener::bind(addr).await.expect("listener can bind");
43-
self.serve(listener).await;
44-
}
45-
46-
pub async fn serve(self, listener: TcpListener) {
47-
self.serve_with_cancel(listener, tokio::signal::ctrl_c().map(|_| ()))
48-
.await;
49-
}
50-
51-
pub async fn serve_with_cancel(self, listener: TcpListener, cancel_signal_future: impl Future) {
52-
let endpoint = HyperEndpoint(self.endpoint);
53-
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
54-
55-
// when this signal completes, start shutdown
56-
let mut signal = std::pin::pin!(cancel_signal_future);
57-
58-
info!("Starting listening on {}", listener.local_addr().unwrap());
59-
60-
// Our server accept loop
61-
loop {
62-
tokio::select! {
63-
Ok((stream, remote)) = listener.accept() => {
64-
let endpoint = endpoint.clone();
65-
66-
let conn = http2::Builder::new(TokioExecutor::default())
67-
.serve_connection(TokioIo::new(stream), endpoint);
68-
69-
let fut = graceful.watch(conn);
70-
71-
tokio::spawn(async move {
72-
if let Err(e) = fut.await {
73-
warn!("Error serving connection {remote}: {:?}", e);
74-
}
75-
});
76-
},
77-
_ = &mut signal => {
78-
info!("Shutting down");
79-
// stop the accept loop
80-
break;
81-
}
82-
}
83-
}
84-
85-
// Wait graceful shutdown
86-
tokio::select! {
87-
_ = graceful.shutdown() => {},
88-
_ = tokio::time::sleep(Duration::from_secs(10)) => {
89-
warn!("Timed out waiting for all connections to close");
90-
}
91-
}
25+
Self(endpoint)
9226
}
9327
}
9428

95-
#[derive(Clone)]
96-
struct HyperEndpoint(Endpoint);
97-
9829
impl Service<Request<Incoming>> for HyperEndpoint {
9930
type Response = Response<Either<Full<Bytes>, BidiStreamRunner>>;
10031
type Error = endpoint::Error;
@@ -155,8 +86,7 @@ fn response_builder_from_response_head(response_head: ResponseHead) -> response:
15586
response_builder
15687
}
15788

158-
// TODO use pin_project
159-
struct BidiStreamRunner {
89+
pub struct BidiStreamRunner {
16090
fut: Option<BoxFuture<'static, Result<(), endpoint::Error>>>,
16191
output_rx: mpsc::UnboundedReceiver<Bytes>,
16292
end_stream: bool,

src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@ pub mod context;
55
pub mod discovery;
66
pub mod errors;
77
#[cfg(feature = "http_server")]
8-
pub mod http;
8+
pub mod http_server;
9+
#[cfg(feature = "hyper")]
10+
pub mod hyper;
911
pub mod serde;
1012

1113
pub use restate_sdk_macros::{object, service, workflow};
1214

1315
pub mod prelude {
1416
#[cfg(feature = "http_server")]
15-
pub use crate::http::HyperServer;
17+
pub use crate::http_server::HttpServer;
1618

1719
pub use crate::context::{
1820
Context, ContextAwakeables, ContextClient, ContextPromises, ContextReadState,

test-services/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ mod non_deterministic;
1010
mod proxy;
1111
mod test_utils_service;
1212

13-
use restate_sdk::prelude::{Endpoint, HyperServer};
13+
use restate_sdk::prelude::{Endpoint, HttpServer};
1414
use std::env;
1515

1616
#[tokio::main]
@@ -77,7 +77,7 @@ async fn main() {
7777
))
7878
}
7979

80-
HyperServer::new(builder.build())
80+
HttpServer::new(builder.build())
8181
.listen_and_serve(format!("0.0.0.0:{port}").parse().unwrap())
8282
.await;
8383
}

tests/ui/shared_handler_in_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ impl SharedHandlerInService for SharedHandlerInServiceImpl {
1717
#[tokio::main]
1818
async fn main() {
1919
tracing_subscriber::fmt::init();
20-
HyperServer::new(
20+
HttpServer::new(
2121
Endpoint::builder()
2222
.with_service(SharedHandlerInServiceImpl.serve())
2323
.build(),

0 commit comments

Comments
 (0)