From 04f4987d54efa022ab6289bc94dc889db2110a8e Mon Sep 17 00:00:00 2001 From: Geoffrey Mureithi <95377562+geofmureithi@users.noreply.github.com> Date: Fri, 19 Jul 2024 09:26:08 +0300 Subject: [PATCH] add: test utils that allow backend polling during tests (#374) * add: test utils that allow backend polling during tests * fix: introduce testwrapper and add more tests * fix: add sample for testing * fix: more fixes and actions fixes * fix: more fixes on vacuuming * tests: improve cleanup and generic testing * fix: improve testing and fix some found bugs * fix: postgres query and remove incompatible tests * fix: remove redis incompatible check * fix: minor fixes * fix: postgres json elements --- .github/workflows/mysql.yaml | 7 - .github/workflows/postgres.yaml | 9 +- .github/workflows/redis.yaml | 4 - .github/workflows/sqlite.yaml | 9 +- examples/email-service/Cargo.toml | 3 +- examples/email-service/src/lib.rs | 43 ++++- examples/redis-with-msg-pack/src/main.rs | 6 +- examples/redis/src/main.rs | 2 +- packages/apalis-core/Cargo.toml | 3 +- packages/apalis-core/src/codec/json.rs | 15 +- packages/apalis-core/src/error.rs | 20 +- packages/apalis-core/src/lib.rs | 226 +++++++++++++++++++++++ packages/apalis-core/src/memory.rs | 15 +- packages/apalis-core/src/monitor/mod.rs | 9 +- packages/apalis-core/src/request.rs | 13 +- packages/apalis-core/src/response.rs | 18 +- packages/apalis-core/src/task/task_id.rs | 2 +- packages/apalis-core/src/worker/mod.rs | 2 +- packages/apalis-cron/src/lib.rs | 3 +- packages/apalis-redis/Cargo.toml | 1 + packages/apalis-redis/src/storage.rs | 25 +-- packages/apalis-sql/Cargo.toml | 4 +- packages/apalis-sql/src/context.rs | 6 +- packages/apalis-sql/src/from_row.rs | 20 +- packages/apalis-sql/src/lib.rs | 88 ++++++++- packages/apalis-sql/src/mysql.rs | 115 ++++-------- packages/apalis-sql/src/postgres.rs | 113 +++++------- packages/apalis-sql/src/sqlite.rs | 68 +++---- src/layers/catch_panic/mod.rs | 5 +- 29 files changed, 573 insertions(+), 281 deletions(-) diff --git a/.github/workflows/mysql.yaml b/.github/workflows/mysql.yaml index 6b67a82b..236c01c8 100644 --- a/.github/workflows/mysql.yaml +++ b/.github/workflows/mysql.yaml @@ -1,11 +1,4 @@ on: - push: - paths: - - "packages/apalis-sql/src/lib.rs" - - "packages/apalis-sql/mysql.rs" - - "packages/apalis-sql/src/migrations/mysql/**" - - "packages/apalis-sql/src/Cargo.toml" - - ".github/workflows/mysql.yaml" pull_request: paths: - "packages/apalis-sql/src/lib.rs" diff --git a/.github/workflows/postgres.yaml b/.github/workflows/postgres.yaml index 050753e4..5845f0c1 100644 --- a/.github/workflows/postgres.yaml +++ b/.github/workflows/postgres.yaml @@ -1,11 +1,4 @@ on: - push: - paths: - - "packages/apalis-sql/src/lib.rs" - - "packages/apalis-sql/postgres.rs" - - "packages/apalis-sql/src/migrations/postgres/**" - - "packages/apalis-sql/src/Cargo.toml" - - ".github/workflows/postgres.yaml" pull_request: paths: - "packages/apalis-sql/src/lib.rs" @@ -37,4 +30,4 @@ jobs: toolchain: stable override: true - run: cargo test --no-default-features --features postgres,migrate,tokio-comp -- --test-threads=1 - working-directory: packages/apalis-sql \ No newline at end of file + working-directory: packages/apalis-sql diff --git a/.github/workflows/redis.yaml b/.github/workflows/redis.yaml index 53990175..68cbc038 100644 --- a/.github/workflows/redis.yaml +++ b/.github/workflows/redis.yaml @@ -1,8 +1,4 @@ on: - push: - paths: - - "packages/apalis-redis/**" - - ".github/workflows/redis.yaml" pull_request: paths: - "packages/apalis-redis/**" diff --git a/.github/workflows/sqlite.yaml b/.github/workflows/sqlite.yaml index 1319f259..f7129bf2 100644 --- a/.github/workflows/sqlite.yaml +++ b/.github/workflows/sqlite.yaml @@ -1,11 +1,4 @@ on: - push: - paths: - - "packages/apalis-sql/src/lib.rs" - - "packages/apalis-sql/src/sqlite.rs" - - "packages/apalis-sql/src/migrations/sqlite/**" - - "packages/apalis-sql/src/Cargo.toml" - - ".github/workflows/sqlite.yaml" pull_request: paths: - "packages/apalis-sql/src/lib.rs" @@ -28,4 +21,4 @@ jobs: toolchain: stable override: true - run: cargo test --no-default-features --features sqlite,migrate,tokio-comp -- --test-threads=1 - working-directory: packages/apalis-sql \ No newline at end of file + working-directory: packages/apalis-sql diff --git a/examples/email-service/Cargo.toml b/examples/email-service/Cargo.toml index 8ede34ed..3aca96a1 100644 --- a/examples/email-service/Cargo.toml +++ b/examples/email-service/Cargo.toml @@ -8,4 +8,5 @@ apalis = { path = "../../", default-features = false } futures-util = "0.3.0" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } -log = "0.4" \ No newline at end of file +log = "0.4" +email_address = "0.2.5" diff --git a/examples/email-service/src/lib.rs b/examples/email-service/src/lib.rs index f5f833dd..467de899 100644 --- a/examples/email-service/src/lib.rs +++ b/examples/email-service/src/lib.rs @@ -1,3 +1,7 @@ +use std::{str::FromStr, sync::Arc}; + +use apalis::prelude::*; +use email_address::EmailAddress; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, Clone)] @@ -7,8 +11,43 @@ pub struct Email { pub text: String, } -pub async fn send_email(job: Email) { - log::info!("Attempting to send email to {}", job.to); +pub async fn send_email(job: Email) -> Result<(), Error> { + let validation = EmailAddress::from_str(&job.to); + match validation { + Ok(email) => { + log::info!("Attempting to send email to {}", email.as_str()); + Ok(()) + } + Err(email_address::Error::InvalidCharacter) => { + log::error!("Killed send email job. Invalid character {}", job.to); + Err(Error::Abort(String::from("Invalid character. Job killed"))) + } + Err(e) => Err(Error::Failed(Arc::new(Box::new(e)))), + } +} + +pub fn example_good_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example@gmail.com".to_string(), + text: "Some Text".to_string(), + } +} + +pub fn example_killed_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example@©.com".to_string(), // killed because it has © which is invalid + text: "Some Text".to_string(), + } +} + +pub fn example_retry_able_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example".to_string(), + text: "Some Text".to_string(), + } } pub const FORM_HTML: &str = r#" diff --git a/examples/redis-with-msg-pack/src/main.rs b/examples/redis-with-msg-pack/src/main.rs index 9a37165a..4afa89ad 100644 --- a/examples/redis-with-msg-pack/src/main.rs +++ b/examples/redis-with-msg-pack/src/main.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use anyhow::Result; use apalis::prelude::*; @@ -13,11 +13,11 @@ struct MessagePack; impl Codec> for MessagePack { type Error = Error; fn encode(&self, input: &T) -> Result, Self::Error> { - rmp_serde::to_vec(input).map_err(|e| Error::SourceError(Box::new(e))) + rmp_serde::to_vec(input).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } fn decode(&self, compact: &Vec) -> Result { - rmp_serde::from_slice(compact).map_err(|e| Error::SourceError(Box::new(e))) + rmp_serde::from_slice(compact).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } } diff --git a/examples/redis/src/main.rs b/examples/redis/src/main.rs index 3a352783..b0c9dd9d 100644 --- a/examples/redis/src/main.rs +++ b/examples/redis/src/main.rs @@ -47,7 +47,7 @@ async fn main() -> Result<()> { produce_jobs(storage.clone()).await?; let worker = WorkerBuilder::new("rango-tango") - .chain(|svc| svc.map_err(Error::Failed)) + .chain(|svc| svc.map_err(|e| Error::Failed(Arc::new(e)))) .layer(RateLimitLayer::new(5, Duration::from_secs(1))) .layer(TimeoutLayer::new(Duration::from_millis(500))) .data(Count::default()) diff --git a/packages/apalis-core/Cargo.toml b/packages/apalis-core/Cargo.toml index 2a809bf9..b04ad9cf 100644 --- a/packages/apalis-core/Cargo.toml +++ b/packages/apalis-core/Cargo.toml @@ -29,10 +29,11 @@ optional = true [features] -default = [] +default = ["test-utils"] docsrs = ["document-features"] sleep = ["futures-timer"] json = ["serde_json"] +test-utils = [] [package.metadata.docs.rs] # defines the configuration attribute `docsrs` diff --git a/packages/apalis-core/src/codec/json.rs b/packages/apalis-core/src/codec/json.rs index 7ed7c7f8..ef85854c 100644 --- a/packages/apalis-core/src/codec/json.rs +++ b/packages/apalis-core/src/codec/json.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{error::Error, Codec}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; @@ -9,32 +11,33 @@ pub struct JsonCodec; impl Codec> for JsonCodec { type Error = Error; fn encode(&self, input: &T) -> Result, Self::Error> { - serde_json::to_vec(input).map_err(|e| Error::SourceError(Box::new(e))) + serde_json::to_vec(input).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } fn decode(&self, compact: &Vec) -> Result { - serde_json::from_slice(compact).map_err(|e| Error::SourceError(Box::new(e))) + serde_json::from_slice(compact).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } } impl Codec for JsonCodec { type Error = Error; fn encode(&self, input: &T) -> Result { - serde_json::to_string(input).map_err(|e| Error::SourceError(Box::new(e))) + serde_json::to_string(input).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } fn decode(&self, compact: &String) -> Result { - serde_json::from_str(compact).map_err(|e| Error::SourceError(Box::new(e))) + serde_json::from_str(compact).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } } impl Codec for JsonCodec { type Error = Error; fn encode(&self, input: &T) -> Result { - serde_json::to_value(input).map_err(|e| Error::SourceError(Box::new(e))) + serde_json::to_value(input).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } fn decode(&self, compact: &Value) -> Result { - serde_json::from_value(compact.clone()).map_err(|e| Error::SourceError(Box::new(e))) + serde_json::from_value(compact.clone()) + .map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } } diff --git a/packages/apalis-core/src/error.rs b/packages/apalis-core/src/error.rs index 0f526999..6c812863 100644 --- a/packages/apalis-core/src/error.rs +++ b/packages/apalis-core/src/error.rs @@ -1,4 +1,4 @@ -use std::error::Error as StdError; +use std::{error::Error as StdError, sync::Arc}; use thiserror::Error; use crate::worker::WorkerError; @@ -7,28 +7,24 @@ use crate::worker::WorkerError; pub type BoxDynError = Box; /// Represents a general error returned by a task or by internals of the platform -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum Error { /// An error occurred during execution. #[error("FailedError: {0}")] - Failed(#[source] BoxDynError), + Failed(#[source] Arc), /// A generic IO error #[error("IoError: {0}")] - Io(#[from] std::io::Error), + Io(#[from] Arc), /// Missing some context and yet it was requested during execution. #[error("MissingContextError: {0}")] MissingContext(String), /// Execution was aborted - #[error("AbortError")] - Abort, - - /// Execution failed and job will be retried - #[error("RetryError: {0}")] - Retry(#[source] BoxDynError), + #[error("AbortError: {0}")] + Abort(String), /// Encountered an error during worker execution #[error("WorkerError: {0}")] @@ -38,11 +34,11 @@ pub enum Error { /// Encountered an error during service execution /// This should not be used inside a task function #[error("Encountered an error during service execution")] - ServiceError(#[source] BoxDynError), + ServiceError(#[source] Arc), #[doc(hidden)] /// Encountered an error during service execution /// This should not be used inside a task function #[error("Encountered an error during streaming")] - SourceError(#[source] BoxDynError), + SourceError(#[source] Arc), } diff --git a/packages/apalis-core/src/lib.rs b/packages/apalis-core/src/lib.rs index 1ab6244c..430014ae 100644 --- a/packages/apalis-core/src/lib.rs +++ b/packages/apalis-core/src/lib.rs @@ -168,3 +168,229 @@ impl crate::executor::Executor for TestExecutor { tokio::spawn(future); } } + +#[cfg(feature = "test-utils")] +/// Test utilities that allows you to test backends +pub mod test_utils { + use crate::error::BoxDynError; + use crate::request::Request; + use crate::storage::Storage; + use crate::task::task_id::TaskId; + use crate::worker::WorkerId; + use crate::Backend; + use futures::channel::mpsc::{channel, Receiver, Sender}; + use futures::future::BoxFuture; + use futures::stream::{Stream, StreamExt}; + use futures::{Future, FutureExt, SinkExt}; + use std::fmt::Debug; + use std::marker::PhantomData; + use std::ops::{Deref, DerefMut}; + use std::pin::Pin; + use std::task::{Context, Poll}; + use tower::{Layer, Service}; + + /// Define a dummy service + #[derive(Debug, Clone)] + pub struct DummyService; + + impl Service for DummyService { + type Response = Request; + type Error = std::convert::Infallible; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + let fut = async move { Ok(req) }; + Box::pin(fut) + } + } + + /// A generic backend wrapper that polls and executes jobs + #[derive(Debug)] + pub struct TestWrapper { + stop_tx: Sender<()>, + res_rx: Receiver<(TaskId, Result)>, + _p: PhantomData, + backend: B, + } + /// A test wrapper to allow you to test without requiring a worker. + /// Important for testing backends and jobs + /// # Example + /// ```no_run + /// #[cfg(tests)] + /// mod tests { + /// use crate::{ + /// error::Error, memory::MemoryStorage, mq::MessageQueue, service_fn::service_fn, + /// }; + /// + /// use super::*; + /// + /// async fn is_even(req: usize) -> Result<(), Error> { + /// if req % 2 == 0 { + /// Ok(()) + /// } else { + /// Err(Error::Abort("Not an even number".to_string())) + /// } + /// } + /// + /// #[tokio::test] + /// async fn test_accepts_even() { + /// let backend = MemoryStorage::new(); + /// let (mut tester, poller) = TestWrapper::new_with_service(backend, service_fn(is_even)); + /// tokio::spawn(poller); + /// tester.enqueue(42usize).await.unwrap(); + /// assert_eq!(tester.size().await.unwrap(), 1); + /// let (_, resp) = tester.execute_next().await; + /// assert_eq!(resp, Ok("()".to_string())); + /// } + ///} + /// ```` + impl TestWrapper + where + B: Backend> + Send + Sync + 'static + Clone, + Req: Send + 'static, + B::Stream: Send + 'static, + B::Stream: Stream>, crate::error::Error>> + Unpin, + { + /// Build a new instance provided a custom service + pub fn new_with_service(backend: B, service: S) -> (Self, BoxFuture<'static, ()>) + where + S: Service> + Send + 'static, + B::Layer: Layer, + <>>::Layer as Layer>::Service: Service> + Send + 'static, + <<>>::Layer as Layer>::Service as Service>>::Response: Send + Debug, + <<>>::Layer as Layer>::Service as Service>>::Error: Send + Into + Sync, + <<>>::Layer as Layer>::Service as Service>>::Future: Send + 'static, + { + let worker_id = WorkerId::new("test-worker"); + let b = backend.clone(); + let mut poller = b.poll(worker_id); + let (stop_tx, mut stop_rx) = channel::<()>(1); + + let (mut res_tx, res_rx) = channel(10); + + let mut service = poller.layer.layer(service); + + let poller = async move { + let heartbeat = poller.heartbeat.shared(); + loop { + futures::select! { + + item = poller.stream.next().fuse() => match item { + Some(Ok(Some(req))) => { + + let task_id = req.get::().cloned().unwrap_or_default(); + // .expect("Request does not contain Task_ID"); + // handle request + match service.call(req).await { + Ok(res) => { + res_tx.send((task_id, Ok(format!("{res:?}")))).await.unwrap(); + }, + Err(err) => { + res_tx.send((task_id, Err(err.into().to_string()))).await.unwrap(); + } + } + } + Some(Ok(None)) | None => break, + Some(Err(_e)) => { + // handle error + break; + } + }, + _ = stop_rx.next().fuse() => break, + _ = heartbeat.clone().fuse() => { + + }, + } + } + }; + ( + Self { + stop_tx, + res_rx, + _p: PhantomData, + backend, + }, + poller.boxed(), + ) + } + + /// Stop polling + pub fn stop(mut self) { + let _ = self.stop_tx.send(()); + } + + /// Gets the current state of results + pub async fn execute_next(&mut self) -> (TaskId, Result) { + self.res_rx.next().await.unwrap() + } + } + + impl Deref for TestWrapper + where + B: Backend>, + { + type Target = B; + + fn deref(&self) -> &Self::Target { + &self.backend + } + } + + impl DerefMut for TestWrapper + where + B: Backend>, + { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.backend + } + } + + pub use tower::service_fn as apalis_test_service_fn; + + #[macro_export] + /// Tests a generic mq + macro_rules! test_message_queue { + ($backend_instance:expr) => { + #[tokio::test] + async fn it_works_as_an_mq_backend() { + let backend = $backend_instance; + let service = apalis_test_service_fn(|request: Request| async { + Ok::<_, io::Error>(request) + }); + let (mut t, poller) = TestWrapper::new_with_service(backend, service); + tokio::spawn(poller); + t.enqueue(1).await.unwrap(); + tokio::time::sleep(Duration::from_secs(1)).await; + let _res = t.execute_next().await; + // assert_eq!(res.len(), 1); // One job is done + } + }; + } + #[macro_export] + /// Tests a generic storage + macro_rules! generic_storage_test { + ($setup:path ) => { + #[tokio::test] + async fn integration_test_storage_push_and_consume() { + let backend = $setup().await; + let service = apalis_test_service_fn(|request: Request| async move { + Ok::<_, io::Error>(request.take()) + }); + let (mut t, poller) = TestWrapper::new_with_service(backend, service); + tokio::spawn(poller); + let res = t.len().await.unwrap(); + assert_eq!(res, 0); // No jobs + t.push(1).await.unwrap(); + let res = t.len().await.unwrap(); + assert_eq!(res, 1); // A job exists + let res = t.execute_next().await; + assert_eq!(res.1, Ok("1".to_owned())); + t.vacuum().await.unwrap(); + } + }; + } +} diff --git a/packages/apalis-core/src/memory.rs b/packages/apalis-core/src/memory.rs index 558b0e30..250190c4 100644 --- a/packages/apalis-core/src/memory.rs +++ b/packages/apalis-core/src/memory.rs @@ -52,8 +52,8 @@ impl Clone for MemoryStorage { /// In-memory queue that implements [Stream] #[derive(Debug)] pub struct MemoryWrapper { - sender: Sender, - receiver: Arc>>, + sender: Sender>, + receiver: Arc>>>, } impl Clone for MemoryWrapper { @@ -84,7 +84,7 @@ impl Default for MemoryWrapper { } impl Stream for MemoryWrapper { - type Item = T; + type Item = Request; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Some(mut receiver) = self.receiver.try_lock() { @@ -102,7 +102,7 @@ impl Backend> for MemoryStorage { type Layer = Identity; fn poll(self, _worker: WorkerId) -> Poller { - let stream = self.inner.map(|r| Ok(Some(Request::new(r)))).boxed(); + let stream = self.inner.map(|r| Ok(Some(r))).boxed(); Poller { stream: BackendStream::new(stream, self.controller), heartbeat: Box::pin(async {}), @@ -114,12 +114,15 @@ impl Backend> for MemoryStorage { impl MessageQueue for MemoryStorage { type Error = (); async fn enqueue(&mut self, message: Message) -> Result<(), Self::Error> { - self.inner.sender.try_send(message).unwrap(); + self.inner + .sender + .try_send(Request::new(message)) + .map_err(|_| ())?; Ok(()) } async fn dequeue(&mut self) -> Result, ()> { - Ok(self.inner.receiver.lock().await.next().await) + Ok(self.inner.receiver.lock().await.next().await.map(|r| r.req)) } async fn size(&mut self) -> Result { diff --git a/packages/apalis-core/src/monitor/mod.rs b/packages/apalis-core/src/monitor/mod.rs index aad4dafc..beee0f56 100644 --- a/packages/apalis-core/src/monitor/mod.rs +++ b/packages/apalis-core/src/monitor/mod.rs @@ -297,6 +297,7 @@ impl Monitor { #[cfg(test)] mod tests { + use crate::test_utils::apalis_test_service_fn; use std::{io, time::Duration}; use tokio::time::sleep; @@ -307,11 +308,15 @@ mod tests { monitor::Monitor, mq::MessageQueue, request::Request, + test_message_queue, + test_utils::TestWrapper, TestExecutor, }; + test_message_queue!(MemoryStorage::new()); + #[tokio::test] - async fn it_works() { + async fn it_works_with_workers() { let backend = MemoryStorage::new(); let mut handle = backend.clone(); @@ -342,7 +347,7 @@ mod tests { let mut handle = backend.clone(); tokio::spawn(async move { - for i in 0..1000 { + for i in 0..10 { handle.enqueue(i).await.unwrap(); } }); diff --git a/packages/apalis-core/src/request.rs b/packages/apalis-core/src/request.rs index d6478240..ac381472 100644 --- a/packages/apalis-core/src/request.rs +++ b/packages/apalis-core/src/request.rs @@ -4,7 +4,10 @@ use tower::layer::util::Identity; use std::{fmt::Debug, pin::Pin}; -use crate::{data::Extensions, error::Error, poller::Poller, worker::WorkerId, Backend}; +use crate::{ + data::Extensions, error::Error, poller::Poller, task::task_id::TaskId, worker::WorkerId, + Backend, +}; /// Represents a job which can be serialized and executed @@ -18,10 +21,10 @@ pub struct Request { impl Request { /// Creates a new [Request] pub fn new(req: T) -> Self { - Self { - req, - data: Extensions::new(), - } + let id = TaskId::new(); + let mut data = Extensions::new(); + data.insert(id); + Self::new_with_data(req, data) } /// Creates a request with context provided diff --git a/packages/apalis-core/src/response.rs b/packages/apalis-core/src/response.rs index 7c2a231d..efda8920 100644 --- a/packages/apalis-core/src/response.rs +++ b/packages/apalis-core/src/response.rs @@ -1,4 +1,4 @@ -use std::any::Any; +use std::{any::Any, sync::Arc}; use crate::error::Error; @@ -15,22 +15,30 @@ impl IntoResponse for bool { fn into_response(self) -> std::result::Result { match self { true => Ok(true), - false => Err(Error::Failed(Box::new(std::io::Error::new( + false => Err(Error::Failed(Arc::new(Box::new(std::io::Error::new( std::io::ErrorKind::Other, "Job returned false", - )))), + ))))), } } } -impl IntoResponse +impl IntoResponse for std::result::Result { type Result = Result; fn into_response(self) -> Result { match self { Ok(value) => Ok(value), - Err(e) => Err(Error::Failed(Box::new(e))), + Err(e) => { + // Try to downcast the error to see if it is already of type `Error` + if let Some(custom_error) = + (&e as &(dyn std::error::Error + 'static)).downcast_ref::() + { + return Err(custom_error.clone()); + } + Err(Error::Failed(Arc::new(Box::new(e)))) + } } } } diff --git a/packages/apalis-core/src/task/task_id.rs b/packages/apalis-core/src/task/task_id.rs index 6d6a2504..455e531f 100644 --- a/packages/apalis-core/src/task/task_id.rs +++ b/packages/apalis-core/src/task/task_id.rs @@ -7,7 +7,7 @@ use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer}; use ulid::Ulid; /// A wrapper type that defines a task id. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, Hash, PartialEq)] pub struct TaskId(Ulid); impl TaskId { diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index b9c071f0..6d911ecf 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -135,7 +135,7 @@ pub enum Event { } /// Possible errors that can occur when starting a worker. -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] pub enum WorkerError { /// An error occurred while processing a job. #[error("Failed to process job: {0}")] diff --git a/packages/apalis-cron/src/lib.rs b/packages/apalis-cron/src/lib.rs index 024fa9f4..babcf245 100644 --- a/packages/apalis-cron/src/lib.rs +++ b/packages/apalis-cron/src/lib.rs @@ -68,6 +68,7 @@ use apalis_core::{error::Error, request::Request}; use chrono::{DateTime, TimeZone, Utc}; pub use cron::Schedule; use std::marker::PhantomData; +use std::sync::Arc; /// Represents a stream from a cron schedule with a timezone #[derive(Clone, Debug)] @@ -117,7 +118,7 @@ where match next { Some(next) => { let to_sleep = next - timezone.from_utc_datetime(&Utc::now().naive_utc()); - let to_sleep = to_sleep.to_std().map_err(|e| Error::Failed(e.into()))?; + let to_sleep = to_sleep.to_std().map_err(|e| Error::SourceError(Arc::new(e.into())))?; apalis_core::sleep(to_sleep).await; let mut data = Extensions::new(); data.insert(TaskId::new()); diff --git a/packages/apalis-redis/Cargo.toml b/packages/apalis-redis/Cargo.toml index 023ec645..bd0742ed 100644 --- a/packages/apalis-redis/Cargo.toml +++ b/packages/apalis-redis/Cargo.toml @@ -39,6 +39,7 @@ email-service = { path = "../../examples/email-service" } apalis = { path = "../../", default-features = false, features = [ "tokio-comp", ] } +apalis-core = { path = "../apalis-core", features = ["test-utils"] } [features] default = ["tokio-comp"] diff --git a/packages/apalis-redis/src/storage.rs b/packages/apalis-redis/src/storage.rs index 0dcacff9..faad9a48 100644 --- a/packages/apalis-redis/src/storage.rs +++ b/packages/apalis-redis/src/storage.rs @@ -559,6 +559,7 @@ impl Ack Ok(()) } + // TODO: Just automatically retry e if e.starts_with("RetryError") => { let retry_job = self.scripts.retry_job.clone(); let retry_jobs_set = &self.config.scheduled_jobs_set(); @@ -983,25 +984,32 @@ impl RedisStorage { #[cfg(test)] mod tests { + use apalis_core::generic_storage_test; use email_service::Email; + use apalis_core::test_utils::apalis_test_service_fn; + use apalis_core::test_utils::TestWrapper; + + generic_storage_test!(setup); + use super::*; /// migrate DB and return a storage instance. - async fn setup() -> RedisStorage { + async fn setup() -> RedisStorage { let redis_url = std::env::var("REDIS_URL").expect("No REDIS_URL is specified"); // Because connections cannot be shared across async runtime // (different runtimes are created for each test), // we don't share the storage and tests must be run sequentially. let conn = connect(redis_url).await.unwrap(); - let storage = RedisStorage::new(conn); + let mut storage = RedisStorage::new(conn); + cleanup(&mut storage, &WorkerId::new("test-worker")).await; storage } /// rollback DB changes made by tests. /// /// You should execute this function in the end of a test - async fn cleanup(mut storage: RedisStorage, _worker_id: &WorkerId) { + async fn cleanup(storage: &mut RedisStorage, _worker_id: &WorkerId) { let _resp: String = redis::cmd("FLUSHDB") .query_async(&mut storage.conn) .await @@ -1063,8 +1071,6 @@ mod tests { let worker_id = register_worker(&mut storage).await; let _job = consume_one(&mut storage, &worker_id).await; - - cleanup(storage, &worker_id).await; } #[tokio::test] @@ -1076,19 +1082,19 @@ mod tests { let job = consume_one(&mut storage, &worker_id).await; let job_id = &job.get::().unwrap().id; + let attempts = job.get::().unwrap().clone(); storage .ack(AckResponse { acknowledger: job_id.clone(), result: Ok("Success".to_string()), worker: worker_id.clone(), - attempts: Attempt::new_with_value(0) + attempts, }) .await .expect("failed to acknowledge the job"); let _job = get_job(&mut storage, &job_id).await; - cleanup(storage, &worker_id).await; } #[tokio::test] @@ -1108,8 +1114,6 @@ mod tests { .expect("failed to kill job"); let _job = get_job(&mut storage, &job_id).await; - - cleanup(storage, &worker_id).await; } #[tokio::test] @@ -1125,7 +1129,6 @@ mod tests { .reenqueue_orphaned(5, 300) .await .expect("failed to reenqueue_orphaned"); - cleanup(storage, &worker_id).await; } #[tokio::test] @@ -1141,7 +1144,5 @@ mod tests { .reenqueue_orphaned(5, 300) .await .expect("failed to reenqueue_orphaned"); - - cleanup(storage, &worker_id).await; } } diff --git a/packages/apalis-sql/Cargo.toml b/packages/apalis-sql/Cargo.toml index 8187dd16..b6c0431e 100644 --- a/packages/apalis-sql/Cargo.toml +++ b/packages/apalis-sql/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT" description = "SQL Storage for apalis. Use sqlite, postgres and mysql for background job processing" [features] -default = ["sqlite", "migrate", "postgres"] +default = ["migrate"] postgres = ["sqlx/postgres", "sqlx/json"] sqlite = ["sqlx/sqlite", "sqlx/json"] mysql = ["sqlx/mysql", "sqlx/json", "sqlx/bigdecimal"] @@ -45,6 +45,8 @@ apalis = { path = "../../", default-features = false, features = [ "tokio-comp", ] } once_cell = "1.19.0" +apalis-sql = { path = ".", features = ["tokio-comp"] } +apalis-core = { path = "../apalis-core", features = ["test-utils"] } [package.metadata.docs.rs] # defines the configuration attribute `docsrs` diff --git a/packages/apalis-sql/src/context.rs b/packages/apalis-sql/src/context.rs index 7d5f3856..5aed23ef 100644 --- a/packages/apalis-sql/src/context.rs +++ b/packages/apalis-sql/src/context.rs @@ -1,8 +1,8 @@ use apalis_core::error::Error; use apalis_core::task::{attempt::Attempt, task_id::TaskId}; use apalis_core::worker::WorkerId; -use serde::{Deserialize, Serialize}; use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; use std::{fmt, str::FromStr}; /// The context for a job is represented here @@ -137,8 +137,6 @@ pub enum State { Running, /// Job was done successfully Done, - /// Retry Job - Retry, /// Job has failed. Check `last_error` Failed, /// Job has been killed @@ -159,7 +157,6 @@ impl FromStr for State { "Pending" | "Latest" => Ok(State::Pending), "Running" => Ok(State::Running), "Done" => Ok(State::Done), - "Retry" => Ok(State::Retry), "Failed" => Ok(State::Failed), "Killed" => Ok(State::Killed), _ => Err(Error::MissingContext("Invalid Job state".to_string())), @@ -173,7 +170,6 @@ impl fmt::Display for State { State::Pending => write!(f, "Pending"), State::Running => write!(f, "Running"), State::Done => write!(f, "Done"), - State::Retry => write!(f, "Retry"), State::Failed => write!(f, "Failed"), State::Killed => write!(f, "Killed"), } diff --git a/packages/apalis-sql/src/from_row.rs b/packages/apalis-sql/src/from_row.rs index 7151e1ef..89d6628b 100644 --- a/packages/apalis-sql/src/from_row.rs +++ b/packages/apalis-sql/src/from_row.rs @@ -1,5 +1,6 @@ use apalis_core::task::task_id::TaskId; use apalis_core::{data::Extensions, request::Request, worker::WorkerId}; +use chrono::Utc; use serde::{Deserialize, Serialize}; use sqlx::{Decode, Type}; @@ -149,11 +150,11 @@ impl<'r, T: Decode<'r, sqlx::Postgres> + Type> let max_attempts = row.try_get("max_attempts").unwrap_or(25); context.set_max_attempts(max_attempts); - let done_at: Option = row.try_get("done_at").unwrap_or_default(); - context.set_done_at(done_at); + let done_at: Option> = row.try_get("done_at").unwrap_or_default(); + context.set_done_at(done_at.map(|d| d.timestamp())); - let lock_at: Option = row.try_get("lock_at").unwrap_or_default(); - context.set_lock_at(lock_at); + let lock_at: Option> = row.try_get("lock_at").unwrap_or_default(); + context.set_lock_at(lock_at.map(|d| d.timestamp())); let last_error = row.try_get("last_error").unwrap_or_default(); context.set_last_error(last_error); @@ -187,9 +188,6 @@ impl<'r, T: Decode<'r, sqlx::MySql> + Type> sqlx::FromRow<'r, sqlx: fn from_row(row: &'r sqlx::mysql::MySqlRow) -> Result { use sqlx::Row; use std::str::FromStr; - - type Timestamp = i64; - let job: T = row.try_get("job")?; let id: TaskId = TaskId::from_str(row.try_get("id")?).map_err(|e| sqlx::Error::ColumnDecode { @@ -207,11 +205,11 @@ impl<'r, T: Decode<'r, sqlx::MySql> + Type> sqlx::FromRow<'r, sqlx: let max_attempts = row.try_get("max_attempts").unwrap_or(25); context.set_max_attempts(max_attempts); - let done_at: Option = row.try_get("done_at").unwrap_or_default(); - context.set_done_at(done_at); + let done_at: Option = row.try_get("done_at").unwrap_or_default(); + context.set_done_at(done_at.map(|d| d.and_utc().timestamp())); - let lock_at: Option = row.try_get("lock_at").unwrap_or_default(); - context.set_lock_at(lock_at); + let lock_at: Option = row.try_get("lock_at").unwrap_or_default(); + context.set_lock_at(lock_at.map(|d| d.and_utc().timestamp())); let last_error = row.try_get("last_error").unwrap_or_default(); context.set_last_error(last_error); diff --git a/packages/apalis-sql/src/lib.rs b/packages/apalis-sql/src/lib.rs index b5d21133..82d97197 100644 --- a/packages/apalis-sql/src/lib.rs +++ b/packages/apalis-sql/src/lib.rs @@ -49,7 +49,7 @@ impl Default for Config { Self { keep_alive: Duration::from_secs(30), buffer_size: 10, - poll_interval: Duration::from_millis(50), + poll_interval: Duration::from_millis(100), namespace: String::from("apalis::sql"), } } @@ -63,7 +63,7 @@ impl Config { /// Interval between database poll queries /// - /// Defaults to 30ms + /// Defaults to 100ms pub fn set_poll_interval(mut self, interval: Duration) -> Self { self.poll_interval = interval; self @@ -134,9 +134,91 @@ pub(crate) fn calculate_status(res: &Result) -> State { match res { Ok(_) => State::Done, Err(e) => match &e { - _ if e.starts_with("RetryError") => State::Retry, _ if e.starts_with("AbortError") => State::Killed, _ => State::Failed, }, } } + +/// Standard checks for any sql backend +#[macro_export] +macro_rules! sql_storage_tests { + ($setup:path, $storage_type:ty, $job_type:ty) => { + async fn setup_test_wrapper() -> TestWrapper<$storage_type, $job_type> { + let (mut t, poller) = TestWrapper::new_with_service( + $setup().await, + apalis_core::service_fn::service_fn(email_service::send_email), + ); + tokio::spawn(poller); + t.vacuum().await.unwrap(); + t + } + + #[tokio::test] + async fn integration_test_kill_job() { + let mut storage = setup_test_wrapper().await; + + storage + .push(email_service::example_killed_email()) + .await + .unwrap(); + + let (job_id, res) = storage.execute_next().await; + assert_eq!( + res, + Err("AbortError: Invalid character. Job killed".to_owned()) + ); + apalis_core::sleep(Duration::from_secs(1)).await; + let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); + let ctx = job.get::().unwrap(); + assert_eq!(*ctx.status(), State::Killed); + assert!(ctx.done_at().is_some()); + assert_eq!( + ctx.last_error().clone().unwrap(), + "{\"Err\":\"AbortError: Invalid character. Job killed\"}" + ); + } + + #[tokio::test] + async fn integration_test_acknowledge_good_job() { + let mut storage = setup_test_wrapper().await; + storage + .push(email_service::example_good_email()) + .await + .unwrap(); + + let (job_id, res) = storage.execute_next().await; + assert_eq!(res, Ok("()".to_owned())); + apalis_core::sleep(Duration::from_secs(1)).await; + let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); + let ctx = job.get::().unwrap(); + assert_eq!(*ctx.status(), State::Done); + assert!(ctx.done_at().is_some()); + } + + #[tokio::test] + async fn integration_test_acknowledge_failed_job() { + let mut storage = setup_test_wrapper().await; + + storage + .push(email_service::example_retry_able_email()) + .await + .unwrap(); + + let (job_id, res) = storage.execute_next().await; + assert_eq!( + res, + Err("FailedError: Missing separator character '@'.".to_owned()) + ); + apalis_core::sleep(Duration::from_secs(1)).await; + let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); + let ctx = job.get::().unwrap(); + assert_eq!(*ctx.status(), State::Failed); + assert!(ctx.attempts().current() >= 1); + assert_eq!( + ctx.last_error().clone().unwrap(), + "{\"Err\":\"FailedError: Missing separator character '@'.\"}" + ); + } + }; +} diff --git a/packages/apalis-sql/src/mysql.rs b/packages/apalis-sql/src/mysql.rs index afedb48c..0e39c1c5 100644 --- a/packages/apalis-sql/src/mysql.rs +++ b/packages/apalis-sql/src/mysql.rs @@ -12,12 +12,12 @@ use apalis_core::task::task_id::TaskId; use apalis_core::worker::WorkerId; use apalis_core::{Backend, BoxCodec}; use async_stream::try_stream; +use chrono::{DateTime, Utc}; use futures::{Stream, StreamExt, TryStreamExt}; use log::error; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use sqlx::mysql::MySqlRow; -use chrono::{DateTime, Utc}; use sqlx::{MySql, Pool, Row}; use std::any::type_name; use std::convert::TryInto; @@ -385,7 +385,7 @@ impl Backend Backend MysqlStorage { mod tests { use crate::context::State; + use crate::sql_storage_tests; use super::*; use apalis_core::task::attempt::Attempt; + + use apalis_core::test_utils::DummyService; use email_service::Email; use futures::StreamExt; + use apalis_core::generic_storage_test; + use apalis_core::test_utils::apalis_test_service_fn; + use apalis_core::test_utils::TestWrapper; + + generic_storage_test!(setup); + + sql_storage_tests!(setup::, MysqlStorage, Email); + /// migrate DB and return a storage instance. - async fn setup() -> MysqlStorage { + async fn setup() -> MysqlStorage { let db_url = &std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified"); // Because connections cannot be shared across async runtime // (different runtimes are created for each test), @@ -527,8 +538,8 @@ mod tests { MysqlStorage::setup(&pool) .await .expect("failed to migrate DB"); - let storage = MysqlStorage::new(pool); - + let mut storage = MysqlStorage::new(pool); + cleanup(&mut storage, &WorkerId::new("test-worker")).await; storage } @@ -538,9 +549,9 @@ mod tests { /// - worker identified by `worker_id` /// /// You should execute this function in the end of a test - async fn cleanup(storage: MysqlStorage, worker_id: &WorkerId) { - sqlx::query("DELETE FROM jobs WHERE lock_by = ? OR status = 'Pending'") - .bind(worker_id.to_string()) + async fn cleanup(storage: &mut MysqlStorage, worker_id: &WorkerId) { + sqlx::query("DELETE FROM jobs WHERE job_type = ?") + .bind(storage.config.namespace()) .execute(&storage.pool) .await .expect("failed to delete jobs"); @@ -551,9 +562,11 @@ mod tests { .expect("failed to delete worker"); } - async fn consume_one(storage: &MysqlStorage, worker_id: &WorkerId) -> Request { - let storage = storage.clone(); - let mut stream = storage.stream_jobs( + async fn consume_one( + storage: &mut MysqlStorage, + worker_id: &WorkerId, + ) -> Request { + let mut stream = storage.clone().stream_jobs( worker_id, std::time::Duration::from_secs(10), 1, @@ -575,8 +588,6 @@ mod tests { } } - struct DummyService {} - async fn register_worker_at( storage: &mut MysqlStorage, last_seen: DateTime, @@ -596,17 +607,13 @@ mod tests { register_worker_at(storage, now).await } - async fn push_email(storage: &mut S, email: Email) - where - S: Storage, - { + async fn push_email(storage: &mut MysqlStorage, email: Email) { storage.push(email).await.expect("failed to push a job"); } - async fn get_job(storage: &mut S, job_id: &TaskId) -> Request - where - S: Storage, - { + async fn get_job(storage: &mut MysqlStorage, job_id: &TaskId) -> Request { + // add a slight delay to allow background actions like ack to complete + apalis_core::sleep(Duration::from_secs(1)).await; storage .fetch_by_id(job_id) .await @@ -624,42 +631,9 @@ mod tests { let job = consume_one(&mut storage, &worker_id).await; let ctx = job.get::().unwrap(); // TODO: Fix assertions - // assert_eq!(*ctx.status(), State::Running); - // assert_eq!(*ctx.lock_by(), Some(worker_id.clone())); - // assert!(ctx.lock_at().is_some()); - - cleanup(storage, &worker_id).await; - } - - #[tokio::test] - async fn test_acknowledge_job() { - let mut storage = setup().await; - push_email(&mut storage, example_email()).await; - - let worker_id = register_worker(&mut storage).await; - - let job = consume_one(&mut storage, &worker_id).await; - let ctx = job.get::().unwrap(); - let job_id = ctx.id(); - - storage - .ack(AckResponse { - acknowledger: job_id.clone(), - result: Ok("Success".to_string()), - worker: worker_id.clone(), - attempts: Attempt::new_with_value(0) - }) - .await - .expect("failed to acknowledge the job"); - - let job = get_job(&mut storage, job_id).await; - let ctx = job.get::().unwrap(); - - // TODO: Fix assertions - // assert_eq!(*ctx.status(), State::Done); - // assert!(ctx.done_at().is_some()); - - cleanup(storage, &worker_id).await; + assert_eq!(*ctx.status(), State::Running); + assert_eq!(*ctx.lock_by(), Some(worker_id.clone())); + assert!(ctx.lock_at().is_some()); } #[tokio::test] @@ -683,10 +657,8 @@ mod tests { let job = get_job(&mut storage, job_id).await; let ctx = job.get::().unwrap(); // TODO: Fix assertions - // assert_eq!(*ctx.status(), State::Killed); - // assert!(ctx.done_at().is_some()); - - cleanup(storage, &worker_id).await; + assert_eq!(*ctx.status(), State::Killed); + assert!(ctx.done_at().is_some()); } #[tokio::test] @@ -720,14 +692,11 @@ mod tests { // then, the job status has changed to Pending let job = storage.fetch_by_id(ctx.id()).await.unwrap().unwrap(); let context = job.get::().unwrap(); - // TODO: Fix assertions - // assert_eq!(*context.status(), State::Pending); - // assert!(context.lock_by().is_none()); - // assert!(context.lock_at().is_none()); - // assert!(context.done_at().is_none()); - // assert_eq!(*context.last_error(), Some("Job was abandoned".to_string())); - - cleanup(storage, &worker_id).await; + assert_eq!(*context.status(), State::Pending); + assert!(context.lock_by().is_none()); + assert!(context.lock_at().is_none()); + assert!(context.done_at().is_none()); + assert_eq!(*context.last_error(), Some("Job was abandoned".to_string())); } #[tokio::test] @@ -762,9 +731,7 @@ mod tests { let job = storage.fetch_by_id(ctx.id()).await.unwrap().unwrap(); let context = job.get::().unwrap(); // TODO: Fix assertions - // assert_eq!(*context.status(), State::Running); - // assert_eq!(*context.lock_by(), Some(worker_id.clone())); - - cleanup(storage, &worker_id).await; + assert_eq!(*context.status(), State::Running); + assert_eq!(*context.lock_by(), Some(worker_id.clone())); } } diff --git a/packages/apalis-sql/src/postgres.rs b/packages/apalis-sql/src/postgres.rs index 356e87ce..e01941a4 100644 --- a/packages/apalis-sql/src/postgres.rs +++ b/packages/apalis-sql/src/postgres.rs @@ -53,13 +53,13 @@ use apalis_core::task::namespace::Namespace; use apalis_core::task::task_id::TaskId; use apalis_core::worker::WorkerId; use apalis_core::{Backend, BoxCodec}; +use chrono::{DateTime, Utc}; use futures::channel::mpsc; use futures::StreamExt; use futures::{select, stream, SinkExt}; use log::error; use serde::{de::DeserializeOwned, Serialize}; use sqlx::postgres::PgListener; -use chrono::{DateTime, Utc}; use sqlx::{Pool, Postgres, Row}; use std::any::type_name; use std::convert::TryInto; @@ -149,11 +149,11 @@ impl Backend Backend { if let Some(ids) = ids { let ack_ids: Vec<(String, String, String, String, u64)> = ids.iter().map(|c| { - (c.acknowledger.to_string(), c.worker.to_string(), serde_json::to_string(&c.result).unwrap(), calculate_status(&c.result).to_string(), c.attempts.current() as u64) + (c.acknowledger.to_string(), c.worker.to_string(), serde_json::to_string(&c.result).unwrap(), calculate_status(&c.result).to_string(), (c.attempts.current() + 1) as u64 ) }).collect(); let query = - "UPDATE apalis.jobs SET status = Q.status, done_at = now(), lock_by = Q.lock_by, last_error = Q.result, attempts = Q.attempts FROM ( - SELECT(value-->0)::text as id, (value->>1)::text as worker_id, (value->>2)::text as result, (value->>3)::text as status, (value->>4)::int as attempts FROM json_array_elements($1) - ) Q - WHERE id = Q.id"; + "UPDATE apalis.jobs + SET status = Q.status, + done_at = now(), + lock_by = Q.worker_id, + last_error = Q.result, + attempts = Q.attempts + FROM ( + SELECT (value->>0)::text as id, + (value->>1)::text as worker_id, + (value->>2)::text as result, + (value->>3)::text as status, + (value->>4)::int as attempts + FROM json_array_elements($1::json) + ) Q + WHERE apalis.jobs.id = Q.id; + "; if let Err(e) = sqlx::query(query) - .bind(serde_json::to_string(&ack_ids).unwrap()) + .bind(serde_json::to_value(&ack_ids).unwrap()) .execute(&pool) .await { - error!("AckError: {e}"); + panic!("AckError: {e}"); } } } @@ -207,13 +219,7 @@ impl Backend PostgresStorage { #[cfg(test)] mod tests { use crate::context::State; + use crate::sql_storage_tests; use super::*; use apalis_core::task::attempt::Attempt; - use email_service::Email; + use apalis_core::test_utils::DummyService; use chrono::Utc; + use email_service::Email; + + use apalis_core::generic_storage_test; + use apalis_core::test_utils::apalis_test_service_fn; + use apalis_core::test_utils::TestWrapper; + + generic_storage_test!(setup); + + sql_storage_tests!(setup::, PostgresStorage, Email); /// migrate DB and return a storage instance. - async fn setup() -> PostgresStorage { + async fn setup() -> PostgresStorage { let db_url = &std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified"); let pool = PgPool::connect(&db_url).await.unwrap(); // Because connections cannot be shared across async runtime // (different runtimes are created for each test), // we don't share the storage and tests must be run sequentially. PostgresStorage::setup(&pool).await.unwrap(); - let storage = PostgresStorage::new(pool); + let mut storage = PostgresStorage::new(pool); + cleanup(&mut storage, &WorkerId::new("test-worker")).await; storage } /// rollback DB changes made by tests. /// Delete the following rows: - /// - jobs whose state is `Pending` or locked by `worker_id` + /// - jobs of the current type /// - worker identified by `worker_id` /// /// You should execute this function in the end of a test - async fn cleanup(storage: PostgresStorage, worker_id: &WorkerId) { + async fn cleanup(storage: &mut PostgresStorage, worker_id: &WorkerId) { let mut tx = storage .pool .acquire() .await .expect("failed to get connection"); - sqlx::query("Delete from apalis.jobs where lock_by = $1 or status = 'Pending'") + sqlx::query("Delete from apalis.jobs where job_type = $1 OR lock_by = $2") + .bind(storage.config.namespace()) .bind(worker_id.to_string()) .execute(&mut *tx) .await @@ -651,8 +669,6 @@ mod tests { .expect("failed to delete worker"); } - struct DummyService {} - fn example_email() -> Email { Email { subject: "Test Subject".to_string(), @@ -691,6 +707,8 @@ mod tests { } async fn get_job(storage: &mut PostgresStorage, job_id: &TaskId) -> Request { + // add a slight delay to allow background actions like ack to complete + apalis_core::sleep(Duration::from_secs(2)).await; storage .fetch_by_id(job_id) .await @@ -705,44 +723,15 @@ mod tests { let worker_id = register_worker(&mut storage).await; - let job = consume_one(&mut storage, &worker_id).await; - let ctx = job.get::().unwrap(); - assert_eq!(*ctx.status(), State::Running); - assert_eq!(*ctx.lock_by(), Some(worker_id.clone())); - // TODO: assert!(ctx.lock_at().is_some()); - - cleanup(storage, &worker_id).await; - } - - #[tokio::test] - async fn test_acknowledge_job() { - let mut storage = setup().await; - push_email(&mut storage, example_email()).await; - - let worker_id = register_worker(&mut storage).await; - let job = consume_one(&mut storage, &worker_id).await; let ctx = job.get::().unwrap(); let job_id = ctx.id(); - - storage - .ack(AckResponse { - acknowledger: job_id.clone(), - result: Ok("Success".to_string()), - worker: worker_id.clone(), - attempts: Attempt::new_with_value(0) - - }) - .await - .expect("failed to acknowledge the job"); - + // Refresh our job let job = get_job(&mut storage, job_id).await; let ctx = job.get::().unwrap(); - // TODO: Currently ack is done in the background - // assert_eq!(*ctx.status(), State::Done); - // assert!(ctx.done_at().is_some()); - - cleanup(storage, &worker_id).await; + assert_eq!(*ctx.status(), State::Running); + assert_eq!(*ctx.lock_by(), Some(worker_id.clone())); + assert!(ctx.lock_at().is_some()); } #[tokio::test] @@ -765,9 +754,7 @@ mod tests { let job = get_job(&mut storage, job_id).await; let ctx = job.get::().unwrap(); assert_eq!(*ctx.status(), State::Killed); - // TODO: assert!(ctx.done_at().is_some()); - - cleanup(storage, &worker_id).await; + assert!(ctx.done_at().is_some()); } #[tokio::test] @@ -793,8 +780,6 @@ mod tests { assert!(ctx.lock_by().is_none()); assert!(ctx.lock_at().is_none()); assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_string())); - - cleanup(storage, &worker_id).await; } #[tokio::test] @@ -822,7 +807,5 @@ mod tests { assert_eq!(*ctx.status(), State::Running); assert_eq!(*ctx.lock_by(), Some(worker_id.clone())); - - cleanup(storage, &worker_id).await; } } diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index fba94bf0..5991e61c 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -14,9 +14,9 @@ use apalis_core::task::task_id::TaskId; use apalis_core::worker::WorkerId; use apalis_core::{Backend, BoxCodec}; use async_stream::try_stream; +use chrono::Utc; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use serde::{de::DeserializeOwned, Serialize}; -use chrono::Utc; use sqlx::{Pool, Row, Sqlite}; use std::any::type_name; use std::convert::TryInto; @@ -188,7 +188,6 @@ impl SqliteStorage { let config = self.config.clone(); try_stream! { loop { - apalis_core::sleep(interval).await; let tx = pool.clone(); let mut tx = tx.acquire().await?; let job_type = &config.namespace; @@ -217,6 +216,7 @@ impl SqliteStorage { } } }; + apalis_core::sleep(interval).await; } } } @@ -462,7 +462,7 @@ impl Backend Ack for SqliteStorage { .bind(res.worker.to_string()) .bind(result) .bind(calculate_status(&res.result).to_string()) - .bind(res.attempts.current() as i64) + .bind(res.attempts.current() as i64 + 1) .execute(&pool) .await?; Ok(()) @@ -503,15 +503,25 @@ impl Ack for SqliteStorage { mod tests { use crate::context::State; + use crate::sql_storage_tests; use super::*; use apalis_core::task::attempt::Attempt; + use apalis_core::test_utils::DummyService; + use chrono::Utc; + use email_service::example_good_email; use email_service::Email; use futures::StreamExt; - use chrono::Utc; + + use apalis_core::generic_storage_test; + use apalis_core::test_utils::apalis_test_service_fn; + use apalis_core::test_utils::TestWrapper; + + generic_storage_test!(setup); + sql_storage_tests!(setup::, SqliteStorage, Email); /// migrate DB and return a storage instance. - async fn setup() -> SqliteStorage { + async fn setup() -> SqliteStorage { // Because connections cannot be shared across async runtime // (different runtimes are created for each test), // we don't share the storage and tests must be run sequentially. @@ -519,7 +529,7 @@ mod tests { SqliteStorage::setup(&pool) .await .expect("failed to migrate DB"); - let storage = SqliteStorage::::new(pool); + let storage = SqliteStorage::::new(pool); storage } @@ -539,16 +549,6 @@ mod tests { assert_eq!(len, 1); } - struct DummyService {} - - fn example_email() -> Email { - Email { - subject: "Test Subject".to_string(), - to: "example@postgres".to_string(), - text: "Some Text".to_string(), - } - } - async fn consume_one( storage: &mut SqliteStorage, worker_id: &WorkerId, @@ -593,10 +593,12 @@ mod tests { #[tokio::test] async fn test_consume_last_pushed_job() { let mut storage = setup().await; - push_email(&mut storage, example_email()).await; - let worker_id = register_worker(&mut storage).await; + push_email(&mut storage, example_good_email()).await; + let len = storage.len().await.expect("Could not fetch the jobs count"); + assert_eq!(len, 1); + let job = consume_one(&mut storage, &worker_id).await; let ctx = job.get::().unwrap(); assert_eq!(*ctx.status(), State::Running); @@ -607,21 +609,20 @@ mod tests { #[tokio::test] async fn test_acknowledge_job() { let mut storage = setup().await; - push_email(&mut storage, example_email()).await; - let worker_id = register_worker(&mut storage).await; + push_email(&mut storage, example_good_email()).await; let job = consume_one(&mut storage, &worker_id).await; - let ctx = job.get::().unwrap(); - let job_id = ctx.id(); + let ctx = job.get::(); + assert!(ctx.is_some()); + let job_id = ctx.unwrap().id(); storage .ack(AckResponse { acknowledger: job_id.clone(), result: Ok("Success".to_string()), worker: worker_id.clone(), - attempts: Attempt::new_with_value(0) - + attempts: Attempt::new_with_value(1), }) .await .expect("failed to acknowledge the job"); @@ -636,7 +637,7 @@ mod tests { async fn test_kill_job() { let mut storage = setup().await; - push_email(&mut storage, example_email()).await; + push_email(&mut storage, example_good_email()).await; let worker_id = register_worker(&mut storage).await; @@ -659,7 +660,7 @@ mod tests { async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() { let mut storage = setup().await; - push_email(&mut storage, example_email()).await; + push_email(&mut storage, example_good_email()).await; let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60); @@ -675,19 +676,18 @@ mod tests { let job_id = ctx.id(); let job = get_job(&mut storage, job_id).await; let ctx = job.get::().unwrap(); - // TODO: rework these assertions - // assert_eq!(*ctx.status(), State::Pending); - // assert!(ctx.done_at().is_none()); - // assert!(ctx.lock_by().is_none()); - // assert!(ctx.lock_at().is_none()); - // assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_string())); + assert_eq!(*ctx.status(), State::Running); + assert!(ctx.done_at().is_none()); + assert!(ctx.lock_by().is_some()); + assert!(ctx.lock_at().is_some()); + assert_eq!(*ctx.last_error(), Some("".to_string())); //TODO: Fix this } #[tokio::test] async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() { let mut storage = setup().await; - push_email(&mut storage, example_email()).await; + push_email(&mut storage, example_good_email()).await; let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60); let worker_id = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await; diff --git a/src/layers/catch_panic/mod.rs b/src/layers/catch_panic/mod.rs index c9c17917..6fe95879 100644 --- a/src/layers/catch_panic/mod.rs +++ b/src/layers/catch_panic/mod.rs @@ -2,6 +2,7 @@ use std::fmt; use std::future::Future; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use apalis_core::error::Error; @@ -100,10 +101,10 @@ where } else { "Unknown panic".to_string() }; - Poll::Ready(Err(Error::Failed(Box::new(PanicError( + Poll::Ready(Err(Error::Failed(Arc::new(Box::new(PanicError( panic_info, Backtrace::new(), - ))))) + )))))) } } }