Skip to content

Commit

Permalink
Merge pull request #176 from palantir/async-service
Browse files Browse the repository at this point in the history
Make Service async fn-compatible
  • Loading branch information
sfackler authored Jan 3, 2024
2 parents 53e0231 + 5c997c9 commit 5b9fb2f
Show file tree
Hide file tree
Showing 29 changed files with 394 additions and 926 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ version: 2
jobs:
build:
docker:
- image: rust:1.65.0
- image: rust:1.75.0
environment:
RUSTFLAGS: -D warnings
steps:
Expand Down
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-176.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: break
break:
description: The `Service` trait now supports `async fn call`.
links:
- https://github.com/palantir/conjure-rust-runtime/pull/176
1 change: 0 additions & 1 deletion conjure-runtime/src/blocking/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ impl<T, B> conjure_http::client::Client for Client<T>
where
T: Service<Request<RawBody>, Response = Response<B>> + 'static + Sync + Send,
T::Error: Into<Box<dyn error::Error + Sync + Send>>,
T::Future: Send,
B: http_body::Body<Data = Bytes> + 'static + Send,
B::Error: Into<Box<dyn error::Error + Sync + Send>>,
{
Expand Down
5 changes: 1 addition & 4 deletions conjure-runtime/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ impl<T, B> AsyncClient for Client<T>
where
T: Service<http::Request<RawBody>, Response = http::Response<B>> + 'static + Sync + Send,
T::Error: Into<Box<dyn error::Error + Sync + Send>>,
T::Future: Send,
B: http_body::Body<Data = Bytes> + 'static + Send,
B::Error: Into<Box<dyn error::Error + Sync + Send>>,
{
Expand All @@ -159,8 +158,6 @@ where
&self,
request: Request<AsyncRequestBody<'_, Self::BodyWriter>>,
) -> Result<Response<Self::ResponseBody>, Error> {
// split into 2 statements to avoid holding onto the state while awaiting the future
let future = self.state.load().service.call(request);
future.await
self.state.load().service.call(request).await
}
}
44 changes: 12 additions & 32 deletions conjure-runtime/src/raw/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ use crate::service::tls_metrics::{TlsMetricsLayer, TlsMetricsService};
use crate::Builder;
use bytes::Bytes;
use conjure_error::Error;
use futures::ready;
use http::{HeaderMap, Request, Response};
use http_body::{Body, SizeHint};
use hyper::client::{HttpConnector, ResponseFuture};
use hyper::client::HttpConnector;
use hyper::Client;
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use pin_project::pin_project;
Expand All @@ -30,7 +29,6 @@ use rustls_pemfile::Item;
use std::error;
use std::fmt;
use std::fs::File;
use std::future::Future;
use std::io::BufReader;
use std::marker::PhantomPinned;
use std::path::Path;
Expand Down Expand Up @@ -162,13 +160,18 @@ pub struct DefaultRawClient(Client<ConjureConnector, RawBody>);
impl Service<Request<RawBody>> for DefaultRawClient {
type Response = Response<DefaultRawBody>;
type Error = DefaultRawError;
type Future = DefaultRawFuture;

fn call(&self, req: Request<RawBody>) -> Self::Future {
DefaultRawFuture {
future: self.0.request(req),
_p: PhantomPinned,
}
async fn call(&self, req: Request<RawBody>) -> Result<Self::Response, Self::Error> {
self.0
.request(req)
.await
.map(|r| {
r.map(|inner| DefaultRawBody {
inner,
_p: PhantomPinned,
})
})
.map_err(DefaultRawError)
}
}

Expand Down Expand Up @@ -214,29 +217,6 @@ impl Body for DefaultRawBody {
}
}

/// The future type used by `DefaultRawClient`.
#[pin_project]
pub struct DefaultRawFuture {
#[pin]
future: ResponseFuture,
#[pin]
_p: PhantomPinned,
}

impl Future for DefaultRawFuture {
type Output = Result<Response<DefaultRawBody>, DefaultRawError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let response = ready!(self.project().future.poll(cx)).map_err(DefaultRawError)?;
let response = response.map(|inner| DefaultRawBody {
inner,
_p: PhantomPinned,
});

Poll::Ready(Ok(response))
}
}

/// The error type used by `DefaultRawClient`.
#[derive(Debug)]
pub struct DefaultRawError(hyper::Error);
Expand Down
7 changes: 2 additions & 5 deletions conjure-runtime/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,9 @@ pub trait Service<R> {
type Response;
/// The error type returned by the service.
type Error;
/// The future type returned by the service.
type Future: Future<Output = Result<Self::Response, Self::Error>>;

/// Asynchronously perform the request.
fn call(&self, req: R) -> Self::Future;
fn call(&self, req: R) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send;
}

impl<R, T> Service<R> for Arc<T>
Expand All @@ -66,9 +64,8 @@ where
{
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;

fn call(&self, req: R) -> Self::Future {
fn call(&self, req: R) -> impl Future<Output = Result<T::Response, T::Error>> {
(**self).call(req)
}
}
Expand Down
32 changes: 5 additions & 27 deletions conjure-runtime/src/service/gzip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use http_body::{Body, SizeHint};
use once_cell::sync::Lazy;
use pin_project::pin_project;
use std::error::Error;
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand All @@ -50,41 +49,20 @@ pub struct GzipService<S> {

impl<S, B1, B2> Service<Request<B1>> for GzipService<S>
where
S: Service<Request<B1>, Response = Response<B2>>,
S: Service<Request<B1>, Response = Response<B2>> + Sync + Send,
B1: Sync + Send,
B2: Body<Data = Bytes>,
B2::Error: Into<Box<dyn Error + Sync + Send>>,
{
type Response = Response<DecodedBody<B2>>;
type Error = S::Error;
type Future = GzipFuture<S::Future>;

fn call(&self, mut req: Request<B1>) -> Self::Future {
async fn call(&self, mut req: Request<B1>) -> Result<Self::Response, Self::Error> {
if let Entry::Vacant(e) = req.headers_mut().entry(ACCEPT_ENCODING) {
e.insert(GZIP.clone());
}

GzipFuture {
future: self.inner.call(req),
}
}
}

#[pin_project]
pub struct GzipFuture<F> {
#[pin]
future: F,
}

impl<F, E, B> Future for GzipFuture<F>
where
F: Future<Output = Result<Response<B>, E>>,
B: Body<Data = Bytes>,
B::Error: Into<Box<dyn Error + Sync + Send>>,
{
type Output = Result<Response<DecodedBody<B>>, E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let response = ready!(self.project().future.poll(cx))?;
let response = self.inner.call(req).await?;
let (mut parts, body) = response.into_parts();

let decoder = match parts.headers.get(CONTENT_ENCODING) {
Expand All @@ -102,7 +80,7 @@ where
done: false,
};

Poll::Ready(Ok(Response::from_parts(parts, body)))
Ok(Response::from_parts(parts, body))
}
}

Expand Down
Loading

0 comments on commit 5b9fb2f

Please sign in to comment.