-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support hyper 1.0 #1307
Comments
If you need hyper 1.0 support you should be able to use https://crates.io/crates/tower-hyper-http-body-compat |
Are there any examples with My current calling code: let svc = tls::grpc_connector(address, root_cert)?;
let svc = tower_hyper_http_body_compat::Hyper1HttpServiceAsTowerService03HttpService::new(svc);
let client = GrpcServiceClient::with_interceptor(svc, auth); Fails with
While my Service (hyper 1.0 service) is: type BoxBody1 = http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, tonic::Status>;
impl hyper::service::Service<hyper::http::Request<BoxBody1>> for TlsGrpcChannel {
type Response = hyper::http::Response<hyper::body::Incoming>;
type Error = hyper_util::client::legacy::Error;
type Future = ResponseFuture;
fn call(&mut self, mut req: Request<BoxBody1>) -> Self::Future {
let uri = Uri::builder()
.scheme(self.uri.scheme().unwrap().to_owned())
.authority(self.uri.authority().unwrap().to_owned())
.path_and_query(req.uri().path_and_query().unwrap().to_owned())
.build()
.unwrap();
*req.uri_mut() = uri;
self.client.request(req)
}
} Struggling with bridging the body type I think |
I did get it working for gRPC services, just not for gRPC clients. Here is the full code if it helps: https://github.com/howardjohn/ztunnel/blob/e7ca3033da71574d668f699fba824685dc7115c8/src/identity/caclient.rs#L46-L50 |
I'm not sure. Haven't actually done it myself. I tried getting a small example working but kept running into connection issues: use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use http::{Request, Response};
use hyper::{
body::Incoming,
client::conn::{self, http2::SendRequest},
};
use proto::GreetRequest;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tower::{Service, ServiceExt};
use tower_hyper_http_body_compat::{HttpBody04ToHttpBody1, HttpBody1ToHttpBody04};
use crate::proto::{
greeter_server::{Greeter, GreeterServer},
GreetReply,
};
pub mod proto {
tonic::include_proto!("greeter");
}
#[tokio::main]
async fn main() {
// run a tonic server
let listener = TcpListener::bind("0.0.0.0:3030").await.unwrap();
println!("server listening on {}", listener.local_addr().unwrap());
tokio::spawn(async move {
let greeter = MyGreeter::default();
tonic::transport::Server::builder()
.add_service(GreeterServer::new(greeter))
.serve_with_incoming(TcpListenerStream::new(listener))
.await
.unwrap();
});
let mut connector = hyper_util::client::connect::HttpConnector::new();
let target_stream = connector
.ready()
.await
.unwrap()
.call(http::Uri::from_static("http://0.0.0.0:3030"))
.await
.unwrap();
let (request_sender, connection) = conn::http2::handshake(TokioExec, target_stream)
.await
.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Error in connection: {}", e);
}
});
let svc = SendRequestService {
sender: request_sender,
};
let mut client = proto::greeter_client::GreeterClient::new(svc);
let res = client
.greet(GreetRequest {
name: "Alice".to_owned(),
})
.await
.unwrap();
dbg!(&res);
}
#[derive(Default)]
pub struct MyGreeter {}
#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn greet(
&self,
req: tonic::Request<GreetRequest>,
) -> Result<tonic::Response<GreetReply>, tonic::Status> {
let GreetRequest { name } = req.into_inner();
Ok(tonic::Response::new(GreetReply {
greeting: format!("Hi {name}"),
}))
}
}
#[derive(Copy, Clone)]
struct TokioExec;
impl<Fut> hyper::rt::Executor<Fut> for TokioExec
where
Fut: Future + Send + 'static,
Fut::Output: Send,
{
fn execute(&self, fut: Fut) {
tokio::spawn(fut);
}
}
struct SendRequestService<B> {
sender: SendRequest<B>,
}
impl<B> tower::Service<Request<B>> for SendRequestService<HttpBody04ToHttpBody1<B>>
where
B: http_body::Body + Send + 'static,
HttpBody04ToHttpBody1<B>: hyper::body::Body,
{
type Response = Response<HttpBody1ToHttpBody04<Incoming>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.sender.poll_ready(cx)
}
fn call(&mut self, req: Request<B>) -> Self::Future {
let req = req.map(HttpBody04ToHttpBody1::new);
let future = self.sender.send_request(req);
Box::pin(async move {
let res = future.await?;
Ok(res.map(HttpBody1ToHttpBody04::new))
})
}
} This fails with
Not sure why. |
Nice! That got things working for me. The one thing I did have to do was stop using interceptor, which requires body to implement Default, and |
see hyperium/tonic#1307 Tonic interceptors require a Default on the body. This makes it easier to use an adapter type tonic. FWIW the default type I am using: ```rust #[derive(Default)] pub enum DefaultIncoming { Some(Incoming), #[default] Empty } impl Body for DefaultIncoming { type Data = Bytes; type Error = hyper::Error; fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { match self.get_mut() { DefaultIncoming::Some(ref mut i) => Pin::new(i).poll_frame(cx), DefaultIncoming::Empty => Pin::new(&mut http_body_util::Empty::<Bytes>::new()).poll_frame(cx).map_err(|_| unreachable!()) } } } ```
Is there any timeline for when will hyper 1.0 be supported? |
I believe we will need to refactor the server to support hyper 1.0's new API. If someone is up for taking on this week feel free to post here and tag me in the PR. I currently don't have time on my plate to get this done. |
We will probably follow what axum does since we use their router pretty heavily. |
Axum is currently using Hyper 1.1 . It looks like this was the PR with the shift tokio-rs/axum#1882 |
for anyone else landing here when looking for this error that @davidpdrsn encountered:
it's likely because your request has a uri that doesn't contain a scheme. http2's rfc says:
in my case i was re-using an http1 incoming request when calling an http2 endpoint, but this request had |
Is this now closed by #1670? |
I believe this can be closed, yes: Line 82 in 255a885
https://github.com/hyperium/tonic/releases/tag/v0.12.0 |
Hyper currently has a 1.0-rc3 build out and 1.0.0 is coming later on (https://hyper.rs/contrib/roadmap/). Given tonic depends on hyper, it would be desirable to have tonic support hyper 1.0. I didn't see any issue tracking this so I thought I'd start one.
The text was updated successfully, but these errors were encountered: