From d51829d35e7ed27fe39ae35ad75ad5bcf22cc592 Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Tue, 16 Jul 2024 10:11:13 +0300 Subject: [PATCH 1/3] fix: handle attempts in storages --- packages/apalis-core/src/layers.rs | 19 ++++++++----------- packages/apalis-sql/src/mysql.rs | 3 ++- packages/apalis-sql/src/postgres.rs | 8 ++++---- packages/apalis-sql/src/sqlite.rs | 3 ++- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/packages/apalis-core/src/layers.rs b/packages/apalis-core/src/layers.rs index 29984a2d..82f9ca04 100644 --- a/packages/apalis-core/src/layers.rs +++ b/packages/apalis-core/src/layers.rs @@ -1,7 +1,9 @@ +use crate::task::attempt::Attempt; use crate::{request::Request, worker::WorkerId}; use futures::channel::mpsc::{SendError, Sender}; use futures::SinkExt; use futures::{future::BoxFuture, Future, FutureExt}; +use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use std::{fmt, sync::Arc}; pub use tower::{ @@ -168,7 +170,7 @@ pub trait Ack { } /// ACK response -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct AckResponse { /// The worker id pub worker: WorkerId, @@ -176,16 +178,8 @@ pub struct AckResponse { pub acknowledger: A, /// The stringified result pub result: Result, -} - -impl AckResponse { - /// Output a json for the response - pub fn to_json(&self) -> String { - format!( - r#"{{"worker": "{}", "acknowledger": "{}", "result": "{:?}"}}"#, - self.worker, self.acknowledger, self.result - ) - } + /// The number of attempts made by the request + pub attempts: Attempt, } /// A generic stream that emits (worker_id, task_id) @@ -286,6 +280,8 @@ where let mut ack = self.ack.clone(); let worker_id = self.worker_id.clone(); let data = request.get::<>::Acknowledger>().cloned(); + let attempts = request.get::().cloned().unwrap_or_default(); + let fut = self.service.call(request); let fut_with_ack = async move { let res = fut.await; @@ -299,6 +295,7 @@ where worker: worker_id, acknowledger: task_id, result, + attempts, }) .await { diff --git a/packages/apalis-sql/src/mysql.rs b/packages/apalis-sql/src/mysql.rs index 1dcbdb28..f103d185 100644 --- a/packages/apalis-sql/src/mysql.rs +++ b/packages/apalis-sql/src/mysql.rs @@ -396,11 +396,12 @@ impl Backend Backend { if let Some(ids) = ids { - let ack_ids: Vec<(String, String, String, String)> = ids.iter().map(|c| { - (c.acknowledger.to_string(), c.worker.to_string(), serde_json::to_string(&c.result).unwrap(), calculate_status(&c.result).to_string()) + let ack_ids: Vec<(String, String, String, String, u64)> = ids.iter().map(|c| { + (c.acknowledger.to_string(), c.worker.to_string(), serde_json::to_string(&c.result).unwrap(), calculate_status(&c.result).to_string(), c.attempts.current() as u64) }).collect(); let query = - "UPDATE apalis.jobs SET status = Q.status, done_at = now(), lock_by = Q.lock_by, last_error = Q.result FROM ( - SELECT(value-->0)::text as id, (value->>1)::text as worker_id, (value->>2)::text as result, (value->>3)::text as status FROM json_array_elements($1) + "UPDATE apalis.jobs SET status = Q.status, done_at = now(), lock_by = Q.lock_by, last_error = Q.result, attempts = Q.attempts FROM ( + SELECT(value-->0)::text as id, (value->>1)::text as worker_id, (value->>2)::text as result, (value->>3)::text as status, (value->>4)::int as attempts FROM json_array_elements($1) ) Q WHERE id = Q.id"; if let Err(e) = sqlx::query(query) diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index c6b7d4cd..4d640b75 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -484,7 +484,7 @@ impl Ack for SqliteStorage { async fn ack(&mut self, res: AckResponse) -> Result<(), sqlx::Error> { let pool = self.pool.clone(); let query = - "UPDATE Jobs SET status = ?4, done_at = strftime('%s','now'), last_error = ?3 WHERE id = ?1 AND lock_by = ?2"; + "UPDATE Jobs SET status = ?4, done_at = strftime('%s','now'), last_error = ?3, attempts =?5 WHERE id = ?1 AND lock_by = ?2"; let result = serde_json::to_string(&res.result) .map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?; sqlx::query(query) @@ -492,6 +492,7 @@ impl Ack for SqliteStorage { .bind(res.worker.to_string()) .bind(result) .bind(calculate_status(&res.result).to_string()) + .bind(res.attempts.current() as i64) .execute(&pool) .await?; Ok(()) From e0c6e2549d70ff2094dba1976b6b8a210267b3ac Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Tue, 16 Jul 2024 10:50:16 +0300 Subject: [PATCH 2/3] fix: chrono serialization --- packages/apalis-sql/Cargo.toml | 2 ++ packages/apalis-sql/src/context.rs | 2 +- packages/apalis-sql/src/from_row.rs | 2 +- packages/apalis-sql/src/mysql.rs | 2 +- packages/apalis-sql/src/postgres.rs | 4 ++-- packages/apalis-sql/src/sqlite.rs | 4 ++-- 6 files changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/apalis-sql/Cargo.toml b/packages/apalis-sql/Cargo.toml index 8e984f62..8187dd16 100644 --- a/packages/apalis-sql/Cargo.toml +++ b/packages/apalis-sql/Cargo.toml @@ -35,6 +35,8 @@ async-stream = "0.3.5" tokio = { version = "1", features = ["rt", "net"], optional = true } futures-lite = "2.3.0" async-std = { version = "1.12.0", optional = true } +chrono = { version = "0.4", features = ["serde"] } + [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/packages/apalis-sql/src/context.rs b/packages/apalis-sql/src/context.rs index 4b295419..7d5f3856 100644 --- a/packages/apalis-sql/src/context.rs +++ b/packages/apalis-sql/src/context.rs @@ -2,7 +2,7 @@ use apalis_core::error::Error; use apalis_core::task::{attempt::Attempt, task_id::TaskId}; use apalis_core::worker::WorkerId; use serde::{Deserialize, Serialize}; -use sqlx::types::chrono::{DateTime, Utc}; +use chrono::{DateTime, Utc}; use std::{fmt, str::FromStr}; /// The context for a job is represented here diff --git a/packages/apalis-sql/src/from_row.rs b/packages/apalis-sql/src/from_row.rs index b47f75c9..7151e1ef 100644 --- a/packages/apalis-sql/src/from_row.rs +++ b/packages/apalis-sql/src/from_row.rs @@ -70,7 +70,7 @@ impl<'r, T: Decode<'r, sqlx::Sqlite> + Type> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow> for SqlRequest { fn from_row(row: &'r sqlx::sqlite::SqliteRow) -> Result { - use sqlx::types::chrono::DateTime; + use chrono::DateTime; use sqlx::Row; use std::str::FromStr; diff --git a/packages/apalis-sql/src/mysql.rs b/packages/apalis-sql/src/mysql.rs index f103d185..811d7988 100644 --- a/packages/apalis-sql/src/mysql.rs +++ b/packages/apalis-sql/src/mysql.rs @@ -17,7 +17,7 @@ use log::error; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use sqlx::mysql::MySqlRow; -use sqlx::types::chrono::{DateTime, Utc}; +use chrono::{DateTime, Utc}; use sqlx::{MySql, Pool, Row}; use std::any::type_name; use std::convert::TryInto; diff --git a/packages/apalis-sql/src/postgres.rs b/packages/apalis-sql/src/postgres.rs index 32e467fa..84595179 100644 --- a/packages/apalis-sql/src/postgres.rs +++ b/packages/apalis-sql/src/postgres.rs @@ -59,7 +59,7 @@ use futures::{select, stream, SinkExt}; use log::error; use serde::{de::DeserializeOwned, Serialize}; use sqlx::postgres::PgListener; -use sqlx::types::chrono::{DateTime, Utc}; +use chrono::{DateTime, Utc}; use sqlx::{Pool, Postgres, Row}; use std::any::type_name; use std::convert::TryInto; @@ -612,7 +612,7 @@ mod tests { use super::*; use email_service::Email; - use sqlx::types::chrono::Utc; + use chrono::Utc; /// migrate DB and return a storage instance. async fn setup() -> PostgresStorage { diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index 4d640b75..ca0aebc6 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -16,7 +16,7 @@ use apalis_core::{Backend, BoxCodec}; use async_stream::try_stream; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use serde::{de::DeserializeOwned, Serialize}; -use sqlx::types::chrono::Utc; +use chrono::Utc; use sqlx::{Pool, Row, Sqlite}; use std::any::type_name; use std::convert::TryInto; @@ -507,7 +507,7 @@ mod tests { use super::*; use email_service::Email; use futures::StreamExt; - use sqlx::types::chrono::Utc; + use chrono::Utc; /// migrate DB and return a storage instance. async fn setup() -> SqliteStorage { From 3634805bdd626773558e4f58a11a058158b96930 Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Tue, 16 Jul 2024 20:22:52 +0300 Subject: [PATCH 3/3] fix: tests failing because of tests --- packages/apalis-redis/src/storage.rs | 1 + packages/apalis-sql/src/mysql.rs | 2 ++ packages/apalis-sql/src/postgres.rs | 3 +++ packages/apalis-sql/src/sqlite.rs | 3 +++ 4 files changed, 9 insertions(+) diff --git a/packages/apalis-redis/src/storage.rs b/packages/apalis-redis/src/storage.rs index f7510baf..0dcacff9 100644 --- a/packages/apalis-redis/src/storage.rs +++ b/packages/apalis-redis/src/storage.rs @@ -1082,6 +1082,7 @@ mod tests { acknowledger: job_id.clone(), result: Ok("Success".to_string()), worker: worker_id.clone(), + attempts: Attempt::new_with_value(0) }) .await .expect("failed to acknowledge the job"); diff --git a/packages/apalis-sql/src/mysql.rs b/packages/apalis-sql/src/mysql.rs index 811d7988..afedb48c 100644 --- a/packages/apalis-sql/src/mysql.rs +++ b/packages/apalis-sql/src/mysql.rs @@ -513,6 +513,7 @@ mod tests { use crate::context::State; use super::*; + use apalis_core::task::attempt::Attempt; use email_service::Email; use futures::StreamExt; @@ -646,6 +647,7 @@ mod tests { acknowledger: job_id.clone(), result: Ok("Success".to_string()), worker: worker_id.clone(), + attempts: Attempt::new_with_value(0) }) .await .expect("failed to acknowledge the job"); diff --git a/packages/apalis-sql/src/postgres.rs b/packages/apalis-sql/src/postgres.rs index 84595179..356e87ce 100644 --- a/packages/apalis-sql/src/postgres.rs +++ b/packages/apalis-sql/src/postgres.rs @@ -611,6 +611,7 @@ mod tests { use crate::context::State; use super::*; + use apalis_core::task::attempt::Attempt; use email_service::Email; use chrono::Utc; @@ -729,6 +730,8 @@ mod tests { acknowledger: job_id.clone(), result: Ok("Success".to_string()), worker: worker_id.clone(), + attempts: Attempt::new_with_value(0) + }) .await .expect("failed to acknowledge the job"); diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index ca0aebc6..fba94bf0 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -505,6 +505,7 @@ mod tests { use crate::context::State; use super::*; + use apalis_core::task::attempt::Attempt; use email_service::Email; use futures::StreamExt; use chrono::Utc; @@ -619,6 +620,8 @@ mod tests { acknowledger: job_id.clone(), result: Ok("Success".to_string()), worker: worker_id.clone(), + attempts: Attempt::new_with_value(0) + }) .await .expect("failed to acknowledge the job");