Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: generic retry persist check #498

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 127 additions & 42 deletions packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,23 @@
/// Test utilities that allows you to test backends
pub mod test_utils {
use crate::backend::Backend;
use crate::builder::WorkerBuilder;
use crate::error::BoxDynError;
use crate::request::Request;
use crate::task::task_id::TaskId;
use crate::worker::{Worker, WorkerId};

Check warning on line 132 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `WorkerId` and `Worker`

Check warning on line 132 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused imports: `WorkerId` and `Worker`
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::future::BoxFuture;
use futures::stream::{Stream, StreamExt};
use futures::{Future, FutureExt, SinkExt};
use futures::{Future, FutureExt, SinkExt, TryFutureExt};

Check warning on line 136 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `TryFutureExt`

Check warning on line 136 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `TryFutureExt`
use std::error::Error;

Check warning on line 137 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `std::error::Error`

Check warning on line 137 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `std::error::Error`
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{Layer, Service};
use tower::layer::layer_fn;

Check warning on line 143 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `tower::layer::layer_fn`

Check warning on line 143 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `tower::layer::layer_fn`
use tower::{Layer, Service, ServiceExt};

Check warning on line 144 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `ServiceExt`

Check warning on line 144 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `ServiceExt`

/// Define a dummy service
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -200,21 +203,22 @@
/// }
///}
/// ````
impl<B, Req, Res, Ctx> TestWrapper<B, Request<Req, Ctx>, Res>
impl<B, Req: Sync, Res: Send + Debug + Sync + serde::Serialize + 'static, Ctx>
TestWrapper<B, Request<Req, Ctx>, Res>
where
B: Backend<Request<Req, Ctx>, Res> + Send + Sync + 'static + Clone,
Req: Send + 'static,
Ctx: Send,
Ctx: Send + Sync + 'static,
B::Stream: Send + 'static,
B::Stream: Stream<Item = Result<Option<Request<Req, Ctx>>, crate::error::Error>> + Unpin,
{
/// Build a new instance provided a custom service
pub fn new_with_service<S>(backend: B, service: S) -> (Self, BoxFuture<'static, ()>)
where
S: Service<Request<Req, Ctx>, Response = Res> + Send + 'static,
S: Service<Request<Req, Ctx>, Response = Res> + Send + 'static + Sync,
B::Layer: Layer<S>,
<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<S>>::Service:
Service<Request<Req, Ctx>> + Send + 'static,
Service<Request<Req, Ctx>, Response = Res> + Send + 'static,
<<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<S>>::Service as Service<
Request<Req, Ctx>,
>>::Response: Send + Debug,
Expand All @@ -224,47 +228,28 @@
<<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<S>>::Service as Service<
Request<Req, Ctx>,
>>::Future: Send + 'static,
<S as Service<Request<Req, Ctx>>>::Future: Send,
<S as Service<Request<Req, Ctx>>>::Error: Send,
<S as Service<Request<Req, Ctx>>>::Error: Sync,
<S as Service<Request<Req, Ctx>>>::Error: std::error::Error,
<B as Backend<Request<Req, Ctx>, Res>>::Layer: Layer<TestEmitService<S>>,
<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<TestEmitService<S>>>::Service:
Service<Request<Req, Ctx>>,
{
let worker_id = WorkerId::new("test-worker");
let worker = Worker::new(worker_id, crate::worker::Context::default());
worker.start();
let b = backend.clone();
let mut poller = b.poll::<S>(&worker);
let (stop_tx, mut stop_rx) = channel::<()>(1);

use crate::builder::WorkerFactory;
let (mut res_tx, res_rx) = channel(10);
let worker = WorkerBuilder::new("test-worker")
.layer(TestEmitLayer {
tx: res_tx,
service,
})
.backend(backend)
.build(service);

let mut service = poller.layer.layer(service);
let (stop_tx, mut stop_rx) = channel::<()>(1);

let poller = async move {
let heartbeat = poller.heartbeat.shared();
loop {
futures::select! {

item = poller.stream.next().fuse() => match item {
Some(Ok(Some(req))) => {
let task_id = req.parts.task_id.clone();
match service.call(req).await {
Ok(res) => {
res_tx.send((task_id, Ok(format!("{res:?}")))).await.unwrap();
},
Err(err) => {
res_tx.send((task_id, Err(err.into().to_string()))).await.unwrap();
}
}
}
Some(Ok(None)) | None => break,
Some(Err(_e)) => {
// handle error
break;
}
},
_ = stop_rx.next().fuse() => break,
_ = heartbeat.clone().fuse() => {

},
}
}
worker.run().await;

Check failure on line 252 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

type mismatch resolving `<<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<TestEmitService<S>>>::Service as Service<Request<Req, ...>>>::Response == Res`

Check failure on line 252 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

`<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<...>>::Service` cannot be sent between threads safely

Check failure on line 252 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

`<<... as Layer<...>>::Service as Service<...>>::Future` cannot be sent between threads safely

Check failure on line 252 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

`<<... as Layer<...>>::Service as Service<...>>::Error` cannot be sent between threads safely

Check failure on line 252 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

`<<... as Layer<...>>::Service as Service<...>>::Error` cannot be shared between threads safely

Check failure on line 252 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check

the trait bound `<<<B as Backend<request::Request<Req, Ctx>, Res>>::Layer as Layer<TestEmitService<S>>>::Service as Service<request::Request<Req, Ctx>>>::Error: Into<Box<(dyn StdError + std::marker::Send + Sync + 'static)>>` is not satisfied

Check failure on line 252 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

type mismatch resolving `<<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<TestEmitService<S>>>::Service as Service<Request<Req, ...>>>::Response == Res`

Check failure on line 252 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

`<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<...>>::Service` cannot be sent between threads safely

Check failure on line 252 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

`<<... as Layer<...>>::Service as Service<...>>::Future` cannot be sent between threads safely

Check failure on line 252 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

`<<... as Layer<...>>::Service as Service<...>>::Error` cannot be sent between threads safely

Check failure on line 252 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

`<<... as Layer<...>>::Service as Service<...>>::Error` cannot be shared between threads safely

Check failure on line 252 in packages/apalis-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

the trait bound `<<<B as backend::Backend<request::Request<Req, Ctx>, Res>>::Layer as tower::Layer<test_utils::TestEmitService<S>>>::Service as tower::Service<request::Request<Req, Ctx>>>::Error: std::convert::Into<std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>>` is not satisfied
};
(
TestWrapper {
Expand All @@ -289,6 +274,63 @@
}
}

pub struct TestEmitService<S> {
tx: Sender<(TaskId, Result<String, String>)>,
service: S,
}

pub struct TestEmitLayer<S> {
tx: Sender<(TaskId, Result<String, String>)>,
service: S,
}

impl<S> Layer<S> for TestEmitLayer<S> {
type Service = TestEmitService<S>;

fn layer(&self, service: S) -> Self::Service {
TestEmitService {
tx: self.tx.clone(),
service,
}
}
}

impl<S, Req, Ctx, Res: Debug + Send + Sync, Err: std::error::Error + Send>
Service<Request<Req, Ctx>> for TestEmitService<S>
where
S: Service<Request<Req, Ctx>, Response = Res, Error = Err>,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Res, Err>> + Send>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, request: Request<Req, Ctx>) -> Self::Future {
let task_id = request.parts.task_id.clone();
let mut res_tx = self.tx.clone();
let fut = self.service.call(request);
Box::pin(async move {
let res = fut.await;
match &res {
Ok(res) => {
res_tx
.send((task_id, Ok(format!("{res:?}"))))
.await
.unwrap();
}
Err(err) => {
res_tx.send((task_id, Err(err.to_string()))).await.unwrap();
}
}
res
})
}
}

impl<B, Req, Res, Ctx> Deref for TestWrapper<B, Request<Req, Ctx>, Res>
where
B: Backend<Request<Req, Ctx>, Res>,
Expand Down Expand Up @@ -373,6 +415,49 @@
let res = t.len().await.unwrap();
assert_eq!(res, 0); // After vacuuming, there should be nothing
}

#[tokio::test]
async fn integration_test_storage_retry_persists() {
use std::io::{Error, ErrorKind};
let mut backend = $setup().await;
let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
Err::<String, io::Error>(Error::new(ErrorKind::Other, "oh no!"))
});
let (mut t, poller) = TestWrapper::new_with_service(backend.clone(), service);
tokio::spawn(poller);
let res = t.len().await.unwrap();
assert_eq!(res, 0, "should have no jobs"); // No jobs
let parts = t.push(1).await.unwrap();
let res = t.len().await.unwrap();
assert_eq!(res, 1, "should have 1 job"); // A job exists
let res = t.execute_next().await;
assert_eq!(res.1, Err("FailedError: oh no!".to_owned()));

tokio::time::sleep(Duration::from_secs(1)).await;
let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
assert_eq!(task.parts.attempt.current(), 1, "should have 1 attempt");

let res = t.execute_next().await;
assert_eq!(res.1, Err("FailedError: oh no!".to_owned()));

let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
assert_eq!(task.parts.attempt.current(), 2);

let res = t.execute_next().await;
assert_eq!(res.1, Err("FailedError: oh no!".to_owned()));

let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
assert_eq!(task.parts.attempt.current(), 3);

let res = t.execute_next().await;
assert_eq!(res.1, Err("FailedError: oh no!".to_owned()));

let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
assert_eq!(task.parts.attempt.current(), 4);

let res = t.len().await.unwrap();
assert_eq!(res, 1); // The job still exists and there is no duplicates
}
};
}
}
2 changes: 2 additions & 0 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,8 @@ where
}

fn call(&mut self, request: Request<Req, Ctx>) -> Self::Future {
dbg!(&request.parts.attempt);
request.parts.attempt.increment();
self.ctx.track(self.service.call(request))
}
}
Expand Down
79 changes: 47 additions & 32 deletions packages/apalis-sql/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use serde::{de::DeserializeOwned, Serialize};
use sqlx::{Pool, Row, Sqlite};
use std::any::type_name;
use std::convert::TryInto;
use std::fmt::Debug;
use std::sync::Arc;
use std::{fmt, io};
use std::{marker::PhantomData, time::Duration};
Expand Down Expand Up @@ -159,7 +160,7 @@ async fn fetch_next(
config: &Config,
) -> Result<Option<SqlRequest<String>>, sqlx::Error> {
let now: i64 = Utc::now().timestamp();
let update_query = "UPDATE Jobs SET status = 'Running', lock_by = ?2, lock_at = ?3, attempts = attempts + 1 WHERE id = ?1 AND job_type = ?4 AND status = 'Pending' AND lock_by IS NULL; Select * from Jobs where id = ?1 AND lock_by = ?2 AND job_type = ?4";
let update_query = "UPDATE Jobs SET status = 'Running', lock_by = ?2, lock_at = ?3 WHERE id = ?1 AND job_type = ?4 AND status = 'Pending' AND lock_by IS NULL; Select * from Jobs where id = ?1 AND lock_by = ?2 AND job_type = ?4";
let job: Option<SqlRequest<String>> = sqlx::query_as(update_query)
.bind(id.to_string())
.bind(worker_id.to_string())
Expand Down Expand Up @@ -226,7 +227,7 @@ where

impl<T, C> Storage for SqliteStorage<T, C>
where
T: Serialize + DeserializeOwned + Send + 'static + Unpin + Sync,
T: Serialize + DeserializeOwned + Send + 'static + Unpin + Sync + Debug,
C: Codec<Compact = String> + Send,
{
type Job = T;
Expand Down Expand Up @@ -300,7 +301,7 @@ where
}

async fn len(&mut self) -> Result<i64, Self::Error> {
let query = "Select Count(*) as count from Jobs where status='Pending'";
let query = "Select Count(*) as count from Jobs WHERE (status = 'Pending' OR (status = 'Failed' AND attempts < max_attempts))";
let record = sqlx::query(query).fetch_one(&self.pool).await?;
record.try_get("count")
}
Expand Down Expand Up @@ -405,29 +406,6 @@ impl<T> SqliteStorage<T> {
Ok(())
}

/// Add jobs that failed back to the queue if there are still remaining attemps
pub async fn reenqueue_failed(&mut self) -> Result<(), sqlx::Error> {
let job_type = self.config.namespace.clone();
let mut tx = self.pool.acquire().await?;
let query = r#"Update Jobs
SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL
WHERE id in
(SELECT Jobs.id from Jobs
WHERE status= "Failed" AND Jobs.attempts < Jobs.max_attempts
ORDER BY lock_at ASC LIMIT ?2);"#;
sqlx::query(query)
.bind(job_type)
.bind::<u32>(
self.config
.buffer_size
.try_into()
.map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?,
)
.execute(&mut *tx)
.await?;
Ok(())
}

/// Add jobs that workers have disappeared to the queue
pub async fn reenqueue_orphaned(
&self,
Expand All @@ -437,7 +415,7 @@ impl<T> SqliteStorage<T> {
let job_type = self.config.namespace.clone();
let mut tx = self.pool.acquire().await?;
let query = r#"Update Jobs
SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL, last_error ="Job was abandoned"
SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL, attempts = attempts + 1, last_error ="Job was abandoned"
WHERE id in
(SELECT Jobs.id from Jobs INNER join Workers ON lock_by = Workers.id
WHERE status= "Running" AND workers.last_seen < ?1
Expand Down Expand Up @@ -527,9 +505,10 @@ impl<T: Sync + Send, Res: Serialize + Sync> Ack<T, Res> for SqliteStorage<T> {
type Context = SqlContext;
type AckError = sqlx::Error;
async fn ack(&mut self, ctx: &Self::Context, res: &Response<Res>) -> Result<(), sqlx::Error> {
dbg!(&res.attempt);
let pool = self.pool.clone();
let query =
"UPDATE Jobs SET status = ?4, done_at = strftime('%s','now'), last_error = ?3 WHERE id = ?1 AND lock_by = ?2";
"UPDATE Jobs SET status = ?4, attempts = ?5, done_at = strftime('%s','now'), last_error = ?3 WHERE id = ?1 AND lock_by = ?2";
let result = serde_json::to_string(&res.inner.as_ref().map_err(|r| r.to_string()))
.map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?;
sqlx::query(query)
Expand All @@ -542,6 +521,7 @@ impl<T: Sync + Send, Res: Serialize + Sync> Ack<T, Res> for SqliteStorage<T> {
)
.bind(result)
.bind(calculate_status(&res.inner).to_string())
.bind(res.attempt.current() as u32)
.execute(&pool)
.await?;
Ok(())
Expand Down Expand Up @@ -799,6 +779,27 @@ mod tests {
assert!(ctx.lock_at().is_none());
assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_owned()));
assert_eq!(job.parts.attempt.current(), 1);

let job = consume_one(&mut storage, &worker).await;
let ctx = &job.parts.context;
// Simulate worker
job.parts.attempt.increment();
storage
.ack(
ctx,
&Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt),
)
.await
.unwrap();
//end simulate worker

let job = get_job(&mut storage, &job_id).await;
let ctx = &job.parts.context;
assert_eq!(*ctx.status(), State::Done);
assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
assert!(ctx.lock_at().is_some());
assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned()));
assert_eq!(job.parts.attempt.current(), 2);
}

#[tokio::test]
Expand All @@ -812,18 +813,32 @@ mod tests {
let worker = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await;

let job = consume_one(&mut storage, &worker).await;
let job_id = &job.parts.task_id;
let job_id = job.parts.task_id;
storage
.reenqueue_orphaned(1, six_minutes_ago)
.await
.expect("failed to heartbeat");

let job = get_job(&mut storage, job_id).await;
let job = get_job(&mut storage, &job_id).await;
let ctx = &job.parts.context;
assert_eq!(*ctx.status(), State::Running);

// Simulate worker
job.parts.attempt.increment();
storage
.ack(
ctx,
&Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt),
)
.await
.unwrap();
//end simulate worker

let job = get_job(&mut storage, &job_id).await;
let ctx = &job.parts.context;
assert_eq!(*ctx.status(), State::Done);
assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
assert!(ctx.lock_at().is_some());
assert_eq!(*ctx.last_error(), None);
assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned()));
assert_eq!(job.parts.attempt.current(), 1);
}
}
Loading