diff --git a/packages/apalis-core/src/lib.rs b/packages/apalis-core/src/lib.rs index 79320375..77b91a9a 100644 --- a/packages/apalis-core/src/lib.rs +++ b/packages/apalis-core/src/lib.rs @@ -125,6 +125,7 @@ pub mod interval { /// Test utilities that allows you to test backends pub mod test_utils { use crate::backend::Backend; + use crate::builder::WorkerBuilder; use crate::error::BoxDynError; use crate::request::Request; use crate::task::task_id::TaskId; @@ -132,13 +133,15 @@ pub mod test_utils { use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::future::BoxFuture; use futures::stream::{Stream, StreamExt}; - use futures::{Future, FutureExt, SinkExt}; + use futures::{Future, FutureExt, SinkExt, TryFutureExt}; + use std::error::Error; use std::fmt::Debug; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::task::{Context, Poll}; - use tower::{Layer, Service}; + use tower::layer::layer_fn; + use tower::{Layer, Service, ServiceExt}; /// Define a dummy service #[derive(Debug, Clone)] @@ -200,21 +203,22 @@ pub mod test_utils { /// } ///} /// ```` - impl TestWrapper, Res> + impl + TestWrapper, Res> where B: Backend, Res> + Send + Sync + 'static + Clone, Req: Send + 'static, - Ctx: Send, + Ctx: Send + Sync + 'static, B::Stream: Send + 'static, B::Stream: Stream>, crate::error::Error>> + Unpin, { /// Build a new instance provided a custom service pub fn new_with_service(backend: B, service: S) -> (Self, BoxFuture<'static, ()>) where - S: Service, Response = Res> + Send + 'static, + S: Service, Response = Res> + Send + 'static + Sync, B::Layer: Layer, <, Res>>::Layer as Layer>::Service: - Service> + Send + 'static, + Service, Response = Res> + Send + 'static, <<, Res>>::Layer as Layer>::Service as Service< Request, >>::Response: Send + Debug, @@ -224,47 +228,28 @@ pub mod test_utils { <<, Res>>::Layer as Layer>::Service as Service< Request, >>::Future: Send + 'static, + >>::Future: Send, + >>::Error: Send, + >>::Error: Sync, + >>::Error: std::error::Error, + , Res>>::Layer: Layer>, + <, Res>>::Layer as Layer>>::Service: + Service>, { - let worker_id = WorkerId::new("test-worker"); - let worker = Worker::new(worker_id, crate::worker::Context::default()); - worker.start(); - let b = backend.clone(); - let mut poller = b.poll::(&worker); - let (stop_tx, mut stop_rx) = channel::<()>(1); - + use crate::builder::WorkerFactory; let (mut res_tx, res_rx) = channel(10); + let worker = WorkerBuilder::new("test-worker") + .layer(TestEmitLayer { + tx: res_tx, + service, + }) + .backend(backend) + .build(service); - let mut service = poller.layer.layer(service); + let (stop_tx, mut stop_rx) = channel::<()>(1); let poller = async move { - let heartbeat = poller.heartbeat.shared(); - loop { - futures::select! { - - item = poller.stream.next().fuse() => match item { - Some(Ok(Some(req))) => { - let task_id = req.parts.task_id.clone(); - match service.call(req).await { - Ok(res) => { - res_tx.send((task_id, Ok(format!("{res:?}")))).await.unwrap(); - }, - Err(err) => { - res_tx.send((task_id, Err(err.into().to_string()))).await.unwrap(); - } - } - } - Some(Ok(None)) | None => break, - Some(Err(_e)) => { - // handle error - break; - } - }, - _ = stop_rx.next().fuse() => break, - _ = heartbeat.clone().fuse() => { - - }, - } - } + worker.run().await; }; ( TestWrapper { @@ -289,6 +274,63 @@ pub mod test_utils { } } + pub struct TestEmitService { + tx: Sender<(TaskId, Result)>, + service: S, + } + + pub struct TestEmitLayer { + tx: Sender<(TaskId, Result)>, + service: S, + } + + impl Layer for TestEmitLayer { + type Service = TestEmitService; + + fn layer(&self, service: S) -> Self::Service { + TestEmitService { + tx: self.tx.clone(), + service, + } + } + } + + impl + Service> for TestEmitService + where + S: Service, Response = Res, Error = Err>, + S::Future: Send + 'static, + { + type Response = S::Response; + type Error = S::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + let task_id = request.parts.task_id.clone(); + let mut res_tx = self.tx.clone(); + let fut = self.service.call(request); + Box::pin(async move { + let res = fut.await; + match &res { + Ok(res) => { + res_tx + .send((task_id, Ok(format!("{res:?}")))) + .await + .unwrap(); + } + Err(err) => { + res_tx.send((task_id, Err(err.to_string()))).await.unwrap(); + } + } + res + }) + } + } + impl Deref for TestWrapper, Res> where B: Backend, Res>, @@ -373,6 +415,49 @@ pub mod test_utils { let res = t.len().await.unwrap(); assert_eq!(res, 0); // After vacuuming, there should be nothing } + + #[tokio::test] + async fn integration_test_storage_retry_persists() { + use std::io::{Error, ErrorKind}; + let mut backend = $setup().await; + let service = apalis_test_service_fn(|request: Request| async move { + Err::(Error::new(ErrorKind::Other, "oh no!")) + }); + let (mut t, poller) = TestWrapper::new_with_service(backend.clone(), service); + tokio::spawn(poller); + let res = t.len().await.unwrap(); + assert_eq!(res, 0, "should have no jobs"); // No jobs + let parts = t.push(1).await.unwrap(); + let res = t.len().await.unwrap(); + assert_eq!(res, 1, "should have 1 job"); // A job exists + let res = t.execute_next().await; + assert_eq!(res.1, Err("FailedError: oh no!".to_owned())); + + tokio::time::sleep(Duration::from_secs(1)).await; + let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap(); + assert_eq!(task.parts.attempt.current(), 1, "should have 1 attempt"); + + let res = t.execute_next().await; + assert_eq!(res.1, Err("FailedError: oh no!".to_owned())); + + let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap(); + assert_eq!(task.parts.attempt.current(), 2); + + let res = t.execute_next().await; + assert_eq!(res.1, Err("FailedError: oh no!".to_owned())); + + let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap(); + assert_eq!(task.parts.attempt.current(), 3); + + let res = t.execute_next().await; + assert_eq!(res.1, Err("FailedError: oh no!".to_owned())); + + let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap(); + assert_eq!(task.parts.attempt.current(), 4); + + let res = t.len().await.unwrap(); + assert_eq!(res, 1); // The job still exists and there is no duplicates + } }; } } diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index 0e025dcb..75548054 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -583,6 +583,8 @@ where } fn call(&mut self, request: Request) -> Self::Future { + dbg!(&request.parts.attempt); + request.parts.attempt.increment(); self.ctx.track(self.service.call(request)) } } diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index e794c71e..7b33cf26 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -22,6 +22,7 @@ use serde::{de::DeserializeOwned, Serialize}; use sqlx::{Pool, Row, Sqlite}; use std::any::type_name; use std::convert::TryInto; +use std::fmt::Debug; use std::sync::Arc; use std::{fmt, io}; use std::{marker::PhantomData, time::Duration}; @@ -159,7 +160,7 @@ async fn fetch_next( config: &Config, ) -> Result>, sqlx::Error> { let now: i64 = Utc::now().timestamp(); - let update_query = "UPDATE Jobs SET status = 'Running', lock_by = ?2, lock_at = ?3, attempts = attempts + 1 WHERE id = ?1 AND job_type = ?4 AND status = 'Pending' AND lock_by IS NULL; Select * from Jobs where id = ?1 AND lock_by = ?2 AND job_type = ?4"; + let update_query = "UPDATE Jobs SET status = 'Running', lock_by = ?2, lock_at = ?3 WHERE id = ?1 AND job_type = ?4 AND status = 'Pending' AND lock_by IS NULL; Select * from Jobs where id = ?1 AND lock_by = ?2 AND job_type = ?4"; let job: Option> = sqlx::query_as(update_query) .bind(id.to_string()) .bind(worker_id.to_string()) @@ -226,7 +227,7 @@ where impl Storage for SqliteStorage where - T: Serialize + DeserializeOwned + Send + 'static + Unpin + Sync, + T: Serialize + DeserializeOwned + Send + 'static + Unpin + Sync + Debug, C: Codec + Send, { type Job = T; @@ -300,7 +301,7 @@ where } async fn len(&mut self) -> Result { - let query = "Select Count(*) as count from Jobs where status='Pending'"; + let query = "Select Count(*) as count from Jobs WHERE (status = 'Pending' OR (status = 'Failed' AND attempts < max_attempts))"; let record = sqlx::query(query).fetch_one(&self.pool).await?; record.try_get("count") } @@ -405,29 +406,6 @@ impl SqliteStorage { Ok(()) } - /// Add jobs that failed back to the queue if there are still remaining attemps - pub async fn reenqueue_failed(&mut self) -> Result<(), sqlx::Error> { - let job_type = self.config.namespace.clone(); - let mut tx = self.pool.acquire().await?; - let query = r#"Update Jobs - SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL - WHERE id in - (SELECT Jobs.id from Jobs - WHERE status= "Failed" AND Jobs.attempts < Jobs.max_attempts - ORDER BY lock_at ASC LIMIT ?2);"#; - sqlx::query(query) - .bind(job_type) - .bind::( - self.config - .buffer_size - .try_into() - .map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?, - ) - .execute(&mut *tx) - .await?; - Ok(()) - } - /// Add jobs that workers have disappeared to the queue pub async fn reenqueue_orphaned( &self, @@ -437,7 +415,7 @@ impl SqliteStorage { let job_type = self.config.namespace.clone(); let mut tx = self.pool.acquire().await?; let query = r#"Update Jobs - SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL, last_error ="Job was abandoned" + SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL, attempts = attempts + 1, last_error ="Job was abandoned" WHERE id in (SELECT Jobs.id from Jobs INNER join Workers ON lock_by = Workers.id WHERE status= "Running" AND workers.last_seen < ?1 @@ -527,9 +505,10 @@ impl Ack for SqliteStorage { type Context = SqlContext; type AckError = sqlx::Error; async fn ack(&mut self, ctx: &Self::Context, res: &Response) -> Result<(), sqlx::Error> { + dbg!(&res.attempt); let pool = self.pool.clone(); let query = - "UPDATE Jobs SET status = ?4, done_at = strftime('%s','now'), last_error = ?3 WHERE id = ?1 AND lock_by = ?2"; + "UPDATE Jobs SET status = ?4, attempts = ?5, done_at = strftime('%s','now'), last_error = ?3 WHERE id = ?1 AND lock_by = ?2"; let result = serde_json::to_string(&res.inner.as_ref().map_err(|r| r.to_string())) .map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?; sqlx::query(query) @@ -542,6 +521,7 @@ impl Ack for SqliteStorage { ) .bind(result) .bind(calculate_status(&res.inner).to_string()) + .bind(res.attempt.current() as u32) .execute(&pool) .await?; Ok(()) @@ -799,6 +779,27 @@ mod tests { assert!(ctx.lock_at().is_none()); assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_owned())); assert_eq!(job.parts.attempt.current(), 1); + + let job = consume_one(&mut storage, &worker).await; + let ctx = &job.parts.context; + // Simulate worker + job.parts.attempt.increment(); + storage + .ack( + ctx, + &Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt), + ) + .await + .unwrap(); + //end simulate worker + + let job = get_job(&mut storage, &job_id).await; + let ctx = &job.parts.context; + assert_eq!(*ctx.status(), State::Done); + assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); + assert!(ctx.lock_at().is_some()); + assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned())); + assert_eq!(job.parts.attempt.current(), 2); } #[tokio::test] @@ -812,18 +813,32 @@ mod tests { let worker = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await; let job = consume_one(&mut storage, &worker).await; - let job_id = &job.parts.task_id; + let job_id = job.parts.task_id; storage .reenqueue_orphaned(1, six_minutes_ago) .await .expect("failed to heartbeat"); - let job = get_job(&mut storage, job_id).await; + let job = get_job(&mut storage, &job_id).await; let ctx = &job.parts.context; - assert_eq!(*ctx.status(), State::Running); + + // Simulate worker + job.parts.attempt.increment(); + storage + .ack( + ctx, + &Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt), + ) + .await + .unwrap(); + //end simulate worker + + let job = get_job(&mut storage, &job_id).await; + let ctx = &job.parts.context; + assert_eq!(*ctx.status(), State::Done); assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); assert!(ctx.lock_at().is_some()); - assert_eq!(*ctx.last_error(), None); + assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned())); assert_eq!(job.parts.attempt.current(), 1); } }