Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A struct to combine two service #813

Open
starwing opened this issue Jan 31, 2025 · 4 comments
Open

A struct to combine two service #813

starwing opened this issue Jan 31, 2025 · 4 comments

Comments

@starwing
Copy link

starwing commented Jan 31, 2025

Hi,

I'm new to use tower. And I find it common for me to write such a Service: e.g. I have a Service<A, Response = B>, and a Service<B>, I want make up a new service that calls the first, and then pipe the response from A to B.

I have searched the tower docs for this but I get nothing, so I make one that working for me and share it here.

It's worth to merge into tower? Or is there any better way to do that?

use std::{
    future::Future,
    task::{ready, Poll},
};

use tower::Service;

pub trait ServiceExt<T>: Service<T> {
    fn chain<S>(self, outer: S) -> Chain<Self, S>
    where
        Self: Sized,
    {
        Chain::new(self, outer)
    }
}

impl<T, R> ServiceExt<R> for T where T: Service<R> {}

pub struct Chain<First, Second> {
    first: First,
    second: Second,
}

impl<First, Second> Clone for Chain<First, Second>
where
    First: Clone,
    Second: Clone,
{
    fn clone(&self) -> Self {
        Self {
            first: self.first.clone(),
            second: self.second.clone(),
        }
    }
}

impl<First, Second> Copy for Chain<First, Second>
where
    First: Copy,
    Second: Copy,
{
}

impl<First, Second> Chain<First, Second> {
    pub fn new(first: First, second: Second) -> Self {
        Self { first, second }
    }
}

impl<T, First, Second> Service<T> for Chain<First, Second>
where
    First: Service<T>,
    Second: Service<First::Response> + Clone,
    <First as Service<T>>::Error: Into<anyhow::Error>,
    <Second as Service<First::Response>>::Error: Into<anyhow::Error>,
{
    type Response = Second::Response;
    type Error = anyhow::Error;
    type Future =
        ChainFuture<First::Future, Second, First::Response, First::Error>;

    fn poll_ready(
        &mut self,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        if let Err(err) = ready!(self.first.poll_ready(cx)) {
            return Err(err.into()).into();
        }
        if let Err(err) = ready!(self.second.poll_ready(cx)) {
            return Err(err.into()).into();
        }
        Ok(()).into()
    }

    fn call(&mut self, req: T) -> Self::Future {
        let fut = self.first.call(req);
        let clone = self.second.clone();
        let second = std::mem::replace(&mut self.second, clone);
        ChainFuture::new(fut, second)
    }
}

pin_project_lite::pin_project! {
    #[project = ChainFutureProj]
    pub enum ChainFuture<Fut, Second, T, E> where
        Fut: Future<Output = Result<T, E>>,
        Second: Service<T>,
    {
        WaitingInner {
            #[pin]
            fut: Fut,
            second: Second,
        },
        WaitingOuter {
            #[pin]
            fut: Second::Future,
        },
    }
}

impl<Fut, Second, T, E> ChainFuture<Fut, Second, T, E>
where
    Fut: Future<Output = Result<T, E>>,
    Second: Service<T>,
{
    pub fn new(fut: Fut, outer: Second) -> Self {
        Self::WaitingInner { fut, second: outer }
    }
}

impl<Fut, Second, T, E> Future for ChainFuture<Fut, Second, T, E>
where
    Fut: Future<Output = Result<T, E>>,
    Second: Service<T>,
    <Second as Service<T>>::Error: Into<anyhow::Error>,
    E: Into<anyhow::Error>,
{
    type Output = anyhow::Result<Second::Response>;

    fn poll(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        loop {
            match self.as_mut().project() {
                ChainFutureProj::WaitingInner { fut, second: outer } => {
                    let res = ready!(fut.poll(cx)).map_err(Into::into);
                    if let Err(err) = res {
                        return Poll::Ready(Err(err.into()));
                    }
                    let fut = outer.call(res.unwrap());
                    self.set(ChainFuture::WaitingOuter { fut });
                }
                ChainFutureProj::WaitingOuter { fut } => {
                    return fut.poll(cx).map_err(Into::into)
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::ServiceExt as _;
    use tower::{service_fn, ServiceExt};

    #[tokio::test]
    async fn test_chain_service() {
        let svr1 = service_fn(|a: u32| async move {
            Ok::<_, anyhow::Error>(a as u64 + 1)
        });
        let svr2 = service_fn(|a: u64| async move {
            Ok::<_, anyhow::Error>(a as u32 + 2)
        });
        let fut = svr1.chain(svr2).oneshot(1);
        assert_eq!(fut.await.unwrap(), 4);
    }
}
@GlenDC
Copy link
Contributor

GlenDC commented Feb 1, 2025

Maybe i'm missing something here, but isn't this basically what layers is all about?

@starwing
Copy link
Author

starwing commented Feb 1, 2025

Maybe i'm missing something here, but isn't this basically what layers is all about?

Yes! That's why I need this. I have many module that offer a layer interface, but I have to repeat time and time to write a lot of layer_fn service_fn in every modules. So I made this to DRY.

After implementing this, I could write my layer in only one line (call Chain::new() in layer_fn).

Btw I also made a ready_call in my own ServiceExt to avoid everytime svc.ready().await?.call(req).await? I can just svc.ready_call(req).await?.

(above is just a example, real code are more much longer because I have to repeat .map_err(Into::into).context(...) time and time)

@GlenDC
Copy link
Contributor

GlenDC commented Feb 2, 2025

I see.

There's already Stack. It's generic. Currently implemented for Layers. Maybe can also be implemented for 2 Services? Maybe not.

I do see where you come from. Makes me think that in theory any service could be turned into a layer, with some kind of building block like this

@starwing
Copy link
Author

starwing commented Feb 2, 2025

FYI below is the ready_call I implement, I can't believe that tower-rs do not have a util for this... they already have ready, even ready_oneshot...

the Chain I implemented can not implement directly in tower-rs, because I just make a Service that returns anyhow::Error, and tower-rs should not rely anyhow crate... I think it's tricky to merge two error types...

It's indeed possible to implementing the combine of two service with Stack, because it just contains two arbitrarily types. just implement Service for Stack. But IMHO it's better to add a new type to do that.

And I have to make other utils for tower:

  • ChainLayer: makes ServiceBuilder can pipe requests into a series of Service, using it as ServiceBuilder().new().layer(ChainLayer::new(svc1)).layer(ChainLayer::new(svc2)..., implemented with Chain.
  • ServiceCell: A container that have a OnceLock to set a real service later in it.
  • SinkService: turn a futures::Sink into a Service

below is the implement of ready_call:

use std::{future::Future, task::ready};

use tower::Service;

pub trait ServiceExt<T>: Service<T> {
    fn ready_call(&mut self, req: T) -> ReadyCall<Self, T>
    where
        Self: Sized,
    {
        ReadyCall::new(self, req)
    }
}


pin_project_lite::pin_project! {
    pub struct ReadyCall<'a, S, T>
    where
        S: Service<T>,
    {
        service: &'a mut S,
        req: Option<T>,
        #[pin]
        fut: Option<S::Future>,
    }
}

impl<'a, S, T> ReadyCall<'a, S, T>
where
    S: Service<T>,
{
    pub fn new(service: &'a mut S, req: T) -> Self {
        Self {
            service,
            req: Some(req),
            fut: None,
        }
    }
}

impl<'a, S, T> Future for ReadyCall<'a, S, T>
where
    S: Service<T>,
{
    type Output = Result<S::Response, S::Error>;

    fn poll(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        loop {
            let mut proj = self.as_mut().project();
            if let Some(fut) = proj.fut.as_mut().as_pin_mut() {
                return ready!(fut.poll(cx)).into();
            }
            if let Err(e) = ready!(proj.service.poll_ready(cx)) {
                return Err(e).into();
            }
            let req = proj.req.take().expect("req is none");
            let fut = proj.service.call(req);
            proj.fut.set(Some(fut));
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants