Skip to content

Commit

Permalink
fix: ack api to allow backend to handle differently (#383)
Browse files Browse the repository at this point in the history
* fix: ack api to allow backend to handle differently

* fix: related to storage tests

* fix: calculate status for postgres
  • Loading branch information
geofmureithi authored Jul 25, 2024
1 parent cc58602 commit be1674f
Show file tree
Hide file tree
Showing 21 changed files with 365 additions and 311 deletions.
1 change: 1 addition & 0 deletions examples/actix-web/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ async fn main() -> Result<()> {
WorkerBuilder::new("tasty-avocado")
.layer(TraceLayer::new())
.backend(storage)
// .chain(|svc|svc.map_err(|e| Box::new(e)))
.build_fn(send_email)
})
.run_with_signal(signal::ctrl_c());
Expand Down
33 changes: 18 additions & 15 deletions examples/redis-mq-example/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::{marker::PhantomData, time::Duration};
use std::{fmt::Debug, marker::PhantomData, time::Duration};

use anyhow::Result;
use apalis::{layers::tracing::TraceLayer, prelude::*};

use apalis_redis::{self, Config, RedisCodec, RedisJob};

use apalis_core::{
codec::json::JsonCodec,
layers::{Ack, AckLayer, AckResponse},
layers::{Ack, AckLayer},
};
use email_service::{send_email, Email};
use futures::{channel::mpsc, SinkExt};
Expand All @@ -34,15 +33,15 @@ impl<T> Clone for RedisMq<T> {
}
}

impl<M: Send + 'static> Backend<Request<M>> for RedisMq<M> {
impl<M: Send + 'static, Res> Backend<Request<M>, Res> for RedisMq<M> {
type Stream = RequestStream<Request<M>>;

type Layer = AckLayer<Self, M>;
type Layer = AckLayer<Self, M, Res>;

fn poll(mut self, worker_id: WorkerId) -> Poller<Self::Stream, Self::Layer> {
fn poll<Svc>(mut self, _worker_id: WorkerId) -> Poller<Self::Stream, Self::Layer> {
let (mut tx, rx) = mpsc::channel(self.config.get_buffer_size());
let stream: RequestStream<Request<M>> = Box::pin(rx);
let layer = AckLayer::new(self.clone(), worker_id);
let layer = AckLayer::new(self.clone());
let heartbeat = async move {
loop {
sleep(*self.config.get_poll_interval()).await;
Expand All @@ -63,14 +62,18 @@ impl<M: Send + 'static> Backend<Request<M>> for RedisMq<M> {
}
}

impl<T: Send> Ack<T> for RedisMq<T> {
type Acknowledger = String;
impl<T: Send, Res: Debug + Send + Sync> Ack<T, Res> for RedisMq<T> {
type Context = String;

type Error = RsmqError;
type AckError = RsmqError;

async fn ack(&mut self, ack: AckResponse<String>) -> Result<(), Self::Error> {
println!("Attempting to ACK {}", ack.acknowledger);
self.conn.delete_message("email", &ack.acknowledger).await?;
async fn ack(
&mut self,
ctx: &Self::Context,
res: &Result<Res, apalis_core::error::Error>,
) -> Result<(), Self::AckError> {
println!("Attempting to ACK {:?}", res);
self.conn.delete_message("email", &ctx).await?;
Ok(())
}
}
Expand Down Expand Up @@ -105,7 +108,7 @@ impl<Message: Send + 'static> MessageQueue<Message> for RedisMq<Message> {
}
}

async fn produce_jobs(mq: &mut RedisMq<Email>) -> Result<()> {
async fn produce_jobs(mq: &mut RedisMq<Email>) -> anyhow::Result<()> {
for index in 0..1 {
mq.enqueue(Email {
to: index.to_string(),
Expand All @@ -118,7 +121,7 @@ async fn produce_jobs(mq: &mut RedisMq<Email>) -> Result<()> {
}

#[tokio::main]
async fn main() -> Result<()> {
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "debug");

tracing_subscriber::fmt::init();
Expand Down
17 changes: 12 additions & 5 deletions packages/apalis-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ impl<J, M, Serv> WorkerBuilder<J, (), M, Serv> {
}

/// Set the source to a backend that implements [Backend]
pub fn backend<NS: Backend<Request<NJ>>, NJ>(
pub fn backend<NB: Backend<Request<NJ>, Res>, NJ, Res: Send>(
self,
backend: NS,
) -> WorkerBuilder<NJ, NS, M, Serv> {
backend: NB,
) -> WorkerBuilder<NJ, NB, M, Serv>
where
Serv: Service<Request<NJ>, Response = Res>,
{
WorkerBuilder {
request: PhantomData,
layer: self.layer,
Expand Down Expand Up @@ -131,8 +134,12 @@ impl<Request, Stream, M, Serv> WorkerBuilder<Request, Stream, M, Serv> {
}
}

impl<Req: Send + 'static + Sync, P: Backend<Request<Req>> + 'static, M: 'static, S>
WorkerFactory<Req, S> for WorkerBuilder<Req, P, M, S>
impl<
Req: Send + 'static + Sync,
P: Backend<Request<Req>, S::Response> + 'static,
M: 'static,
S,
> WorkerFactory<Req, S> for WorkerBuilder<Req, P, M, S>
where
S: Service<Request<Req>> + Send + 'static + Clone + Sync,
S::Future: Send,
Expand Down
130 changes: 59 additions & 71 deletions packages/apalis-core/src/layers.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::task::attempt::Attempt;
use crate::{request::Request, worker::WorkerId};
use crate::error::{BoxDynError, Error};
use crate::request::Request;
use futures::channel::mpsc::{SendError, Sender};
use futures::SinkExt;
use futures::{future::BoxFuture, Future, FutureExt};
use serde::{Deserialize, Serialize};
use serde::Serialize;
use std::marker::PhantomData;
use std::{fmt, sync::Arc};
pub use tower::{
Expand Down Expand Up @@ -157,148 +157,136 @@ pub mod extensions {
/// A trait for acknowledging successful processing
/// This trait is called even when a task fails.
/// This is a way of a [`Backend`] to save the result of a job or message
pub trait Ack<Task> {
pub trait Ack<Task, Response> {
/// The data to fetch from context to allow acknowledgement
type Acknowledger;
type Context;
/// The error returned by the ack
type Error: std::error::Error;
type AckError: std::error::Error;

/// Acknowledges successful processing of the given request
fn ack(
&mut self,
response: AckResponse<Self::Acknowledger>,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
}

/// ACK response
#[derive(Debug, Serialize, Deserialize)]
pub struct AckResponse<A> {
/// The worker id
pub worker: WorkerId,
/// The acknowledger
pub acknowledger: A,
/// The stringified result
pub result: Result<String, String>,
/// The number of attempts made by the request
pub attempts: Attempt,
ctx: &Self::Context,
result: &Result<Response, Error>,
) -> impl Future<Output = Result<(), Self::AckError>> + Send;
}

/// A generic stream that emits (worker_id, task_id)
#[derive(Debug, Clone)]
pub struct AckStream<A>(pub Sender<AckResponse<A>>);

impl<J, A: Send + Clone + 'static> Ack<J> for AckStream<A> {
type Acknowledger = A;
type Error = SendError;
fn ack(
impl<T, Res: Clone + Send + Sync, Ctx: Clone + Send + Sync> Ack<T, Res>
for Sender<(Ctx, Result<Res, Error>)>
{
type AckError = SendError;
type Context = Ctx;
async fn ack(
&mut self,
response: AckResponse<A>,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
self.0.send(response).boxed()
ctx: &Self::Context,
result: &Result<Res, Error>,
) -> Result<(), Self::AckError> {
let ctx = ctx.clone();
self.send((ctx, result.clone())).await.unwrap();
Ok(())
}
}

/// A layer that acknowledges a job completed successfully
#[derive(Debug)]
pub struct AckLayer<A: Ack<J>, J> {
pub struct AckLayer<A, J, Res> {
ack: A,
job_type: PhantomData<J>,
worker_id: WorkerId,
res: PhantomData<Res>,
}

impl<A: Ack<J>, J> AckLayer<A, J> {
impl<A, J, Res> AckLayer<A, J, Res> {
/// Build a new [AckLayer] for a job
pub fn new(ack: A, worker_id: WorkerId) -> Self {
pub fn new(ack: A) -> Self {
Self {
ack,
job_type: PhantomData,
worker_id,
res: PhantomData,
}
}
}

impl<A, J, S> Layer<S> for AckLayer<A, J>
impl<A, J, S, Res> Layer<S> for AckLayer<A, J, Res>
where
S: Service<Request<J>> + Send + 'static,
S::Error: std::error::Error + Send + Sync + 'static,
S::Future: Send + 'static,
A: Ack<J> + Clone + Send + Sync + 'static,
A: Ack<J, S::Response> + Clone + Send + Sync + 'static,
{
type Service = AckService<S, A, J>;
type Service = AckService<S, A, J, S::Response>;

fn layer(&self, service: S) -> Self::Service {
AckService {
service,
ack: self.ack.clone(),
job_type: PhantomData,
worker_id: self.worker_id.clone(),
res: PhantomData,
}
}
}

/// The underlying service for an [AckLayer]
#[derive(Debug)]
pub struct AckService<SV, A, J> {
pub struct AckService<SV, A, J, Res> {
service: SV,
ack: A,
job_type: PhantomData<J>,
worker_id: WorkerId,
res: PhantomData<Res>,
}

impl<Sv: Clone, A: Clone, J> Clone for AckService<Sv, A, J> {
impl<Sv: Clone, A: Clone, J, Res> Clone for AckService<Sv, A, J, Res> {
fn clone(&self) -> Self {
Self {
ack: self.ack.clone(),
job_type: PhantomData,
worker_id: self.worker_id.clone(),
service: self.service.clone(),
res: PhantomData,
}
}
}

impl<SV, A, T> Service<Request<T>> for AckService<SV, A, T>
impl<SV, A, T, Res> Service<Request<T>> for AckService<SV, A, T, Res>
where
SV: Service<Request<T>> + Send + Sync + 'static,
SV::Error: std::error::Error + Send + Sync + 'static,
<SV as Service<Request<T>>>::Error: Into<BoxDynError> + Send + Sync + 'static,
<SV as Service<Request<T>>>::Future: std::marker::Send + 'static,
A: Ack<T> + Send + 'static + Clone + Send + Sync,
T: 'static,
<SV as Service<Request<T>>>::Response: std::marker::Send + fmt::Debug + Sync,
<A as Ack<T>>::Acknowledger: Sync + Send + Clone,
A: Ack<T, <SV as Service<Request<T>>>::Response> + Send + 'static + Clone + Send + Sync,
T: 'static + Send,
<SV as Service<Request<T>>>::Response: std::marker::Send + fmt::Debug + Sync + Serialize,
<A as Ack<T, SV::Response>>::Context: Sync + Send + Clone,
{
type Response = SV::Response;
type Error = SV::Error;
type Error = Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
self.service
.poll_ready(cx)
.map_err(|e| Error::Failed(Arc::new(e.into())))
}

fn call(&mut self, request: Request<T>) -> Self::Future {
let mut ack = self.ack.clone();
let worker_id = self.worker_id.clone();
let data = request.get::<<A as Ack<T>>::Acknowledger>().cloned();
let attempts = request.get::<Attempt>().cloned().unwrap_or_default();
let data = request
.get::<<A as Ack<T, SV::Response>>::Context>()
.cloned();

let fut = self.service.call(request);
let fut_with_ack = async move {
let res = fut.await;
let result = res
.as_ref()
.map(|ok| format!("{ok:?}"))
.map_err(|e| e.to_string());
if let Some(task_id) = data {
if let Err(_e) = ack
.ack(AckResponse {
worker: worker_id,
acknowledger: task_id,
result,
attempts,
})
.await
{
let res = fut.await.map_err(|err| {
let e: BoxDynError = err.into();
// Try to downcast the error to see if it is already of type `Error`
if let Some(custom_error) = e.downcast_ref::<Error>() {
return custom_error.clone();
}
Error::Failed(Arc::new(e))
});

if let Some(ctx) = data {
if let Err(_e) = ack.ack(&ctx, &res).await {
// TODO: Implement tracing in apalis core
// tracing::error!("Acknowledgement Failed: {}", e);
}
Expand Down
Loading

0 comments on commit be1674f

Please sign in to comment.