From 682f45ddf347da2a0f523b440c3cd7b07308347b Mon Sep 17 00:00:00 2001 From: --show-origin Date: Fri, 29 Nov 2024 13:59:35 +0400 Subject: [PATCH 01/18] Make Failure public --- src/lib.rs | 4 ++-- src/proto/mod.rs | 4 ++-- src/proto/single/mod.rs | 34 +++++++++++++++++++++++----- tests/real/community.rs | 49 +++++++++++++++++++++++++++++++++++------ 4 files changed, 74 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 02a88aec..5fc30fd5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -106,8 +106,8 @@ mod worker; pub use crate::error::Error; pub use crate::proto::{ - Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, MutationFilter, - MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId, + Client, Connection, DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId, + MutationFilter, MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId, }; pub use crate::worker::{JobRunner, StopDetails, StopReason, Worker, WorkerBuilder}; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 24fe7533..7f16a061 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -12,8 +12,8 @@ pub use client::{Client, Connection}; mod single; pub use single::{ - DataSnapshot, FaktoryState, Job, JobBuilder, JobId, MutationFilter, MutationFilterBuilder, - MutationTarget, ServerSnapshot, WorkerId, + DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId, MutationFilter, + MutationFilterBuilder, MutationTarget, ServerSnapshot, WorkerId, }; pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl}; diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 2cbe4087..5bf7503a 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -124,6 +124,8 @@ pub struct Job { /// Defaults to 25. #[serde(skip_serializing_if = "Option::is_none")] #[builder(default = "Some(JOB_DEFAULT_RETRY_COUNT)")] + // TODO: this should probably be a usize, see Failure::retry_count + // TODO: and Failure::retry_remaining. This can go to 0.14 release pub retry: Option, /// The priority of this job from 1-9 (9 is highest). @@ -206,19 +208,39 @@ impl JobBuilder { } } +/// Details on a job's failure. #[derive(Serialize, Deserialize, Debug, Clone)] +#[non_exhaustive] pub struct Failure { - retry_count: usize, - failed_at: String, + /// [`Number`](Job::retry) of times this job can be retried. + pub retry_count: usize, + + /// Number of remaining retry attempts. + #[serde(rename = "remaining")] + pub retry_remaining: usize, + + /// Last time this job failed. + pub failed_at: DateTime, #[serde(skip_serializing_if = "Option::is_none")] - next_at: Option, + + /// When this job will be retried. + /// + /// This will be `None` if there are no retry + /// attempts (see [`Failure::retry_remaining`]) left. + pub next_at: Option>, #[serde(skip_serializing_if = "Option::is_none")] - message: Option, + + /// Error message, if any. + pub message: Option, #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "errtype")] - kind: Option, + + /// Error kind, if known. + pub kind: Option, #[serde(skip_serializing_if = "Option::is_none")] - backtrace: Option>, + + /// Stack trace from last failure, if any. + pub backtrace: Option>, } impl Job { diff --git a/tests/real/community.rs b/tests/real/community.rs index b5646db0..dcd7367e 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1,9 +1,10 @@ -use crate::{assert_gte, skip_check}; +use crate::{assert_gt, assert_gte, assert_lt, skip_check}; use chrono::Utc; use faktory::{ Client, Job, JobBuilder, JobId, MutationFilter, MutationTarget, StopReason, Worker, WorkerBuilder, WorkerId, }; +use rand::Rng; use serde_json::Value; use std::time::Duration; use std::{io, sync}; @@ -835,6 +836,8 @@ async fn test_panic_in_handler() { #[tokio::test(flavor = "multi_thread")] async fn mutation_requeue_jobs() { skip_check!(); + let test_started_at = Utc::now(); + let max_retries = rand::thread_rng().gen_range(2..25); // prepare a client and clean up the queue // to ensure there are no left-overs @@ -859,7 +862,10 @@ async fn mutation_requeue_jobs() { .unwrap(); // enqueue a job - let job = JobBuilder::new(local).queue(local).build(); + let job = JobBuilder::new(local) + .queue(local) + .retry(max_retries) + .build(); let job_id = job.id().clone(); client.enqueue(job).await.unwrap(); @@ -872,7 +878,7 @@ async fn mutation_requeue_jobs() { let had_one = worker.run_one(0, &[local]).await.unwrap(); assert!(!had_one); - // ... we can force it + // ... we can force it, so let's requeue the job and ... client .requeue( MutationTarget::Retries, @@ -881,11 +887,40 @@ async fn mutation_requeue_jobs() { .await .unwrap(); - // the job has been re-enqueued and we consumed it again - let had_one = worker.run_one(0, &[local]).await.unwrap(); - assert!(had_one); + // ... this time, instead of failing the job this time, let's + // create a new woker that will just send the job + // to the test thread so that we can inspect and + // assert on the failure + let (tx, rx) = sync::mpsc::channel(); + let tx = sync::Arc::new(sync::Mutex::new(tx)); + let mut w = WorkerBuilder::default() + .hostname("tester".to_string()) + .wid(WorkerId::new(local)) + .register_fn(local, move |j| { + let tx = sync::Arc::clone(&tx); + Box::pin(async move { + tx.lock().unwrap().send(j).unwrap(); + Ok::<(), io::Error>(()) + }) + }) + .connect() + .await + .unwrap(); + assert!(w.run_one(0, &[local]).await.unwrap()); + let job = rx.recv().unwrap(); - // TODO: Examine the job's failure (will need a dedicated PR) + let failure_info = job.failure().as_ref().unwrap(); + assert_eq!(failure_info.retry_count, 0); + assert_eq!( + failure_info.retry_remaining, + max_retries as usize - failure_info.retry_count + ); + assert_lt!(failure_info.failed_at, Utc::now()); + assert_gt!(failure_info.failed_at, test_started_at); + assert!(failure_info.next_at.is_some()); + assert_eq!(failure_info.kind.as_ref().unwrap(), "unknown"); + assert!(failure_info.message.is_some()); // "task panicked" + assert!(failure_info.backtrace.is_none()); } #[tokio::test(flavor = "multi_thread")] From e5611ba5d8ee0296f2de522e012fa7ac6c8fcfe7 Mon Sep 17 00:00:00 2001 From: --show-origin Date: Fri, 29 Nov 2024 14:04:31 +0400 Subject: [PATCH 02/18] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 862ab583..a43b567f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Support Faktory's `MUTATE` API ([#87]) +- Make `Failure` struct public ([#89]) ### Changed @@ -22,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security [#87]: https://github.com/jonhoo/faktory-rs/pull/87 +[#89]: https://github.com/jonhoo/faktory-rs/pull/89 ## [0.13.0] - 2024-10-27 From 6bca4d6112e3adfa6839f9cd436eb9e9d60a2877 Mon Sep 17 00:00:00 2001 From: --show-origin Date: Fri, 29 Nov 2024 15:44:24 +0400 Subject: [PATCH 03/18] Get more info from panicking --- src/worker/mod.rs | 17 ++++++++++++++++- tests/real/community.rs | 9 ++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 2b6aab89..3547d438 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -360,7 +360,22 @@ impl Worker { let fail = match e { Failed::BadJobType(jt) => Fail::generic(jid, format!("No handler for {}", jt)), Failed::Application(e) => Fail::generic_with_backtrace(jid, e), - Failed::HandlerPanic(e) => Fail::generic_with_backtrace(jid, e), + Failed::HandlerPanic(e) => { + if e.is_cancelled() { + Fail::generic(jid, "job processing was cancelled") + } else if e.is_panic() { + let panic_obj = e.into_panic(); + if panic_obj.is::() { + Fail::generic(jid, *panic_obj.downcast::().unwrap()) + } else if panic_obj.is::<&'static str>() { + Fail::generic(jid, *panic_obj.downcast::<&'static str>().unwrap()) + } else { + Fail::generic(jid, "job processing panicked") + } + } else { + Fail::generic_with_backtrace(jid, e) + } + } }; self.worker_states.register_failure(worker, fail.clone()); self.c.issue(&fail).await?.read_ok().await?; diff --git a/tests/real/community.rs b/tests/real/community.rs index dcd7367e..eab38877 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -6,6 +6,7 @@ use faktory::{ }; use rand::Rng; use serde_json::Value; +use std::panic::panic_any; use std::time::Duration; use std::{io, sync}; use tokio::time::{self as tokio_time}; @@ -838,6 +839,7 @@ async fn mutation_requeue_jobs() { skip_check!(); let test_started_at = Utc::now(); let max_retries = rand::thread_rng().gen_range(2..25); + let panic_message = "Failure should be recorded"; // prepare a client and clean up the queue // to ensure there are no left-overs @@ -855,7 +857,7 @@ async fn mutation_requeue_jobs() { // prepare a worker that will fail the job unconditionally let mut worker = Worker::builder::() .register_fn(local, move |_job| async move { - panic!("Failure should be recorded"); + panic_any(panic_message); }) .connect() .await @@ -918,8 +920,9 @@ async fn mutation_requeue_jobs() { assert_lt!(failure_info.failed_at, Utc::now()); assert_gt!(failure_info.failed_at, test_started_at); assert!(failure_info.next_at.is_some()); - assert_eq!(failure_info.kind.as_ref().unwrap(), "unknown"); - assert!(failure_info.message.is_some()); // "task panicked" + assert_eq!(failure_info.kind.as_ref().unwrap(), "unknown"); // see Fail::generic + + assert_eq!(failure_info.message.as_ref().unwrap(), panic_message); assert!(failure_info.backtrace.is_none()); } From d31972ee4215640597292b9d4244046f0877df02 Mon Sep 17 00:00:00 2001 From: --show-origin Date: Sat, 30 Nov 2024 14:17:20 +0400 Subject: [PATCH 04/18] Check different ways a job can fail --- tests/real/community.rs | 79 ++++++++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 21 deletions(-) diff --git a/tests/real/community.rs b/tests/real/community.rs index eab38877..7e8288b4 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -797,41 +797,78 @@ async fn test_jobs_with_blocking_handlers() { } #[tokio::test(flavor = "multi_thread")] -async fn test_panic_in_handler() { +async fn test_panic_and_errors_in_handler() { skip_check!(); - let local = "test_panic_in_handler"; + let local = "test_panic_and_errors_in_handler"; let mut w = Worker::builder::() - .register_blocking_fn("panic_SYNC_handler", |_j| { - panic!("Panic inside sync the handler..."); + // sync handlers + .register_blocking_fn("panic_SYNC_handler_str", |_j| { + panic!("Panic inside sync handler..."); }) - .register_fn("panic_ASYNC_handler", |_j| async move { + .register_blocking_fn("panic_SYNC_handler_String", |_j| { + panic_any("Panic inside sync handler...".to_string()); + }) + .register_blocking_fn("panic_SYNC_handler_int", |_j| { + panic_any(0); + }) + .register_blocking_fn("error_from_SYNC_handler", |_j| { + Err::<(), io::Error>(io::Error::new( + io::ErrorKind::Other, + "error returned from SYNC handler", + )) + }) + // async handlers + .register_fn("panic_ASYNC_handler_str", |_j| async move { panic!("Panic inside async handler..."); }) + .register_fn("panic_ASYNC_handler_String", |_j| async move { + panic_any("Panic inside async handler...".to_string()); + }) + .register_fn("panic_ASYNC_handler_int", |_j| async move { + panic_any(1); + }) + .register_blocking_fn("error_from_ASYNC_handler", |_j| { + Err::<(), io::Error>(io::Error::new( + io::ErrorKind::Other, + "error returned from ASYNC handler", + )) + }) .connect() .await .unwrap(); let mut c = Client::connect().await.unwrap(); - c.enqueue(Job::builder("panic_SYNC_handler").queue(local).build()) - .await - .unwrap(); - - // we _did_ consume and process the job, the processing result itself though - // was a failure; however, a panic in the handler was "intercepted" and communicated - // to the Faktory server via the FAIL command; - // note how the test run is not interrupted with a panic - assert!(w.run_one(0, &[local]).await.unwrap()); - - c.enqueue(Job::builder("panic_ASYNC_handler").queue(local).build()) - .await - .unwrap(); + c.enqueue_many([ + Job::builder("panic_SYNC_handler_str").queue(local).build(), + Job::builder("panic_SYNC_handler_String") + .queue(local) + .build(), + Job::builder("panic_SYNC_handler_int").queue(local).build(), + Job::builder("error_from_SYNC_handler").queue(local).build(), + Job::builder("panic_ASYNC_handler_str").queue(local).build(), + Job::builder("panic_ASYNC_handler_String") + .queue(local) + .build(), + Job::builder("panic_ASYNC_handler_int").queue(local).build(), + Job::builder("error_from_ASYNC_handler") + .queue(local) + .build(), + ]) + .await + .unwrap(); + + // let's consume all the jobs from the queue and fail them "in different ways" + for _ in 0..8 { + assert!(w.run_one(0, &[local]).await.unwrap()); + } + assert!(!w.run_one(0, &[local]).await.unwrap()); // drained - // same for async handler, note how the test run is not interrupted with a panic - assert!(!w.is_terminated()); - assert!(w.run_one(0, &[local]).await.unwrap()); + // TODO: 1)requeue all the jobs of this kind + // TODO: 2)create a new worker that will be sending a job via channel + // TODO: 3)inspect the Job::failure to see various error messages } #[tokio::test(flavor = "multi_thread")] From 4b38e81343ee9e6ee8284cb8f99c4f51d3f6dc62 Mon Sep 17 00:00:00 2001 From: --show-origin Date: Sat, 30 Nov 2024 14:26:27 +0400 Subject: [PATCH 05/18] Add test clean up --- tests/real/community.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/real/community.rs b/tests/real/community.rs index 7e8288b4..f78c39ed 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -800,7 +800,16 @@ async fn test_jobs_with_blocking_handlers() { async fn test_panic_and_errors_in_handler() { skip_check!(); + // clean up let local = "test_panic_and_errors_in_handler"; + let mut c = Client::connect().await.unwrap(); + c.requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + c.queue_remove(&[local]).await.unwrap(); let mut w = Worker::builder::() // sync handlers @@ -839,8 +848,6 @@ async fn test_panic_and_errors_in_handler() { .await .unwrap(); - let mut c = Client::connect().await.unwrap(); - c.enqueue_many([ Job::builder("panic_SYNC_handler_str").queue(local).build(), Job::builder("panic_SYNC_handler_String") From 9842937ed3c886778fd3b0cf1ed0255508a23f04 Mon Sep 17 00:00:00 2001 From: --show-origin Date: Sat, 30 Nov 2024 15:09:14 +0400 Subject: [PATCH 06/18] Account for re-queued jobs --- tests/real/community.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/real/community.rs b/tests/real/community.rs index f78c39ed..dba720ae 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -871,7 +871,6 @@ async fn test_panic_and_errors_in_handler() { for _ in 0..8 { assert!(w.run_one(0, &[local]).await.unwrap()); } - assert!(!w.run_one(0, &[local]).await.unwrap()); // drained // TODO: 1)requeue all the jobs of this kind // TODO: 2)create a new worker that will be sending a job via channel From 5e507666de5d265c387d9b30d34f71ee97d38a4c Mon Sep 17 00:00:00 2001 From: --show-origin Date: Sat, 30 Nov 2024 15:20:10 +0400 Subject: [PATCH 07/18] Check job id --- tests/real/community.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/real/community.rs b/tests/real/community.rs index dba720ae..fc0c9661 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -954,7 +954,10 @@ async fn mutation_requeue_jobs() { assert!(w.run_one(0, &[local]).await.unwrap()); let job = rx.recv().unwrap(); + assert_eq!(job.id(), &job_id); // sanity check + let failure_info = job.failure().as_ref().unwrap(); + eprintln!("{:?}", &failure_info); assert_eq!(failure_info.retry_count, 0); assert_eq!( failure_info.retry_remaining, From f3f40199e2d38f61e6df42bb31164fc6ee34c4cb Mon Sep 17 00:00:00 2001 From: --show-origin Date: Sat, 30 Nov 2024 16:22:12 +0400 Subject: [PATCH 08/18] Collect all kinds of handler failure --- tests/real/community.rs | 99 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 87 insertions(+), 12 deletions(-) diff --git a/tests/real/community.rs b/tests/real/community.rs index fc0c9661..30df94d7 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1,14 +1,16 @@ use crate::{assert_gt, assert_gte, assert_lt, skip_check}; use chrono::Utc; use faktory::{ - Client, Job, JobBuilder, JobId, MutationFilter, MutationTarget, StopReason, Worker, + Client, Job, JobBuilder, JobId, JobRunner, MutationFilter, MutationTarget, StopReason, Worker, WorkerBuilder, WorkerId, }; use rand::Rng; use serde_json::Value; use std::panic::panic_any; +use std::sync::Arc; use std::time::Duration; use std::{io, sync}; +use tokio::sync::mpsc::error::SendError; use tokio::time::{self as tokio_time}; use tokio_util::sync::CancellationToken; @@ -680,7 +682,7 @@ async fn test_jobs_created_with_builder() { use std::future::Future; use std::pin::Pin; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, Sender}; fn process_hard_task( sender: sync::Arc>, ) -> Box< @@ -803,9 +805,10 @@ async fn test_panic_and_errors_in_handler() { // clean up let local = "test_panic_and_errors_in_handler"; let mut c = Client::connect().await.unwrap(); + let pattern = format!(r#"*\"args\":\[\"{}\"\]*"#, local); c.requeue( MutationTarget::Retries, - MutationFilter::builder().kind(local).build(), + MutationFilter::builder().pattern(pattern.as_str()).build(), ) .await .unwrap(); @@ -848,33 +851,105 @@ async fn test_panic_and_errors_in_handler() { .await .unwrap(); + // let's prepare 9 jobs: c.enqueue_many([ - Job::builder("panic_SYNC_handler_str").queue(local).build(), + Job::builder("panic_SYNC_handler_str") + .queue(local) + .args([local]) + .build(), Job::builder("panic_SYNC_handler_String") .queue(local) + .args([local]) + .build(), + Job::builder("panic_SYNC_handler_int") + .queue(local) + .args([local]) + .build(), + Job::builder("error_from_SYNC_handler") + .queue(local) + .args([local]) + .build(), + Job::builder("panic_ASYNC_handler_str") + .queue(local) + .args([local]) .build(), - Job::builder("panic_SYNC_handler_int").queue(local).build(), - Job::builder("error_from_SYNC_handler").queue(local).build(), - Job::builder("panic_ASYNC_handler_str").queue(local).build(), Job::builder("panic_ASYNC_handler_String") .queue(local) + .args([local]) + .build(), + Job::builder("panic_ASYNC_handler_int") + .queue(local) + .args([local]) .build(), - Job::builder("panic_ASYNC_handler_int").queue(local).build(), Job::builder("error_from_ASYNC_handler") .queue(local) + .args([local]) + .build(), + Job::builder("no_handler_registered_for_this_jobtype_initially") + .queue(local) + .args([local]) .build(), ]) .await .unwrap(); // let's consume all the jobs from the queue and fail them "in different ways" - for _ in 0..8 { + for _ in 0..9 { assert!(w.run_one(0, &[local]).await.unwrap()); } - // TODO: 1)requeue all the jobs of this kind - // TODO: 2)create a new worker that will be sending a job via channel - // TODO: 3)inspect the Job::failure to see various error messages + // let's make sure all the jobs are re-enqueued + c.requeue( + MutationTarget::Retries, + MutationFilter::builder().pattern(pattern.as_str()).build(), + ) + .await + .unwrap(); + + // now, let's create a worker who will only send jobs to the + // test's main thread to make some assertions + struct JobHandler { + chan: Arc>, + } + impl JobHandler { + pub fn new(chan: Arc>) -> Self { + Self { chan } + } + } + #[async_trait::async_trait] + impl JobRunner for JobHandler { + type Error = SendError; + async fn run(&self, job: Job) -> Result<(), Self::Error> { + self.chan.send(job).await + } + } + let (tx, mut rx) = tokio::sync::mpsc::channel(9); + let tx = sync::Arc::new(tx); + let mut w = Worker::builder() + .register("panic_SYNC_handler_str", JobHandler::new(tx.clone())) + .register("panic_SYNC_handler_String", JobHandler::new(tx.clone())) + .register("panic_SYNC_handler_int", JobHandler::new(tx.clone())) + .register("error_from_SYNC_handler", JobHandler::new(tx.clone())) + .register("panic_ASYNC_handler_str", JobHandler::new(tx.clone())) + .register("panic_ASYNC_handler_String", JobHandler::new(tx.clone())) + .register("panic_ASYNC_handler_int", JobHandler::new(tx.clone())) + .register("error_from_ASYNC_handler", JobHandler::new(tx.clone())) + .register( + "no_handler_registered_for_this_jobtype_initially", + JobHandler::new(tx.clone()), + ) + .connect() + .await + .unwrap(); + + for _ in 0..9 { + assert!(w.run_one(0, &[local]).await.unwrap()); // reminder: we've requeued the failed jobs + } + let mut jobs = Vec::with_capacity(9); + rx.recv_many(jobs.as_mut(), 9).await; + + dbg!(jobs); + // TODO: Inspect the Job::failure for each case to see various error messages } #[tokio::test(flavor = "multi_thread")] From 092e6982145b2784d16845a760646bebbf004445 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Fri, 27 Dec 2024 17:14:46 +0400 Subject: [PATCH 09/18] Add Job::failure_message. Check all error messages --- src/proto/single/mod.rs | 13 ++++ tests/real/community.rs | 132 +++++++++++++++++++++++----------------- 2 files changed, 88 insertions(+), 57 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 5bf7503a..7cdd54b4 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -288,6 +288,19 @@ impl Job { pub fn failure(&self) -> &Option { &self.failure } + + /// Error message for this job, if any. + /// + /// A convenience method that, internally, will check if this + /// job has got a [`Failure`] and - if it does - will access + /// the failure's message (see [`Failure::message`]). + /// + /// To access the entire failure, if any, use [`Job::failure`]. + pub fn failure_message(&self) -> Option<&str> { + self.failure() + .as_ref() + .and_then(|f| f.message.as_ref().map(|m| m.as_str())) + } } pub async fn write_command( diff --git a/tests/real/community.rs b/tests/real/community.rs index 30df94d7..47e0fa47 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -6,6 +6,7 @@ use faktory::{ }; use rand::Rng; use serde_json::Value; +use std::collections::HashMap; use std::panic::panic_any; use std::sync::Arc; use std::time::Duration; @@ -802,6 +803,22 @@ async fn test_jobs_with_blocking_handlers() { async fn test_panic_and_errors_in_handler() { skip_check!(); + let job_kind_vs_error_msg = HashMap::from([ + ("panic_SYNC_handler_str", "panic_SYNC_handler_str"), + ("panic_SYNC_handler_String", "panic_SYNC_handler_String"), + ("panic_SYNC_handler_int", "job processing panicked"), + ("error_from_SYNC_handler", "error_from_SYNC_handler"), + ("panic_ASYNC_handler_str", "panic_ASYNC_handler_str"), + ("panic_ASYNC_handler_String", "panic_ASYNC_handler_String"), + ("panic_ASYNC_handler_int", "job processing panicked"), + ("error_from_ASYNC_handler", "error_from_ASYNC_handler"), + ( + "no_handler_registered_for_this_jobtype_initially", + "No handler for no_handler_registered_for_this_jobtype_initially", + ), + ]); + let njobs = job_kind_vs_error_msg.keys().len(); + // clean up let local = "test_panic_and_errors_in_handler"; let mut c = Client::connect().await.unwrap(); @@ -817,10 +834,10 @@ async fn test_panic_and_errors_in_handler() { let mut w = Worker::builder::() // sync handlers .register_blocking_fn("panic_SYNC_handler_str", |_j| { - panic!("Panic inside sync handler..."); + panic!("panic_SYNC_handler_str"); }) .register_blocking_fn("panic_SYNC_handler_String", |_j| { - panic_any("Panic inside sync handler...".to_string()); + panic_any("panic_SYNC_handler_String".to_string()); }) .register_blocking_fn("panic_SYNC_handler_int", |_j| { panic_any(0); @@ -828,77 +845,46 @@ async fn test_panic_and_errors_in_handler() { .register_blocking_fn("error_from_SYNC_handler", |_j| { Err::<(), io::Error>(io::Error::new( io::ErrorKind::Other, - "error returned from SYNC handler", + "error_from_SYNC_handler", )) }) // async handlers .register_fn("panic_ASYNC_handler_str", |_j| async move { - panic!("Panic inside async handler..."); + panic!("panic_ASYNC_handler_str"); }) .register_fn("panic_ASYNC_handler_String", |_j| async move { - panic_any("Panic inside async handler...".to_string()); + panic_any("panic_ASYNC_handler_String".to_string()); }) .register_fn("panic_ASYNC_handler_int", |_j| async move { - panic_any(1); + panic_any(0); }) - .register_blocking_fn("error_from_ASYNC_handler", |_j| { + .register_fn("error_from_ASYNC_handler", |_j| async move { Err::<(), io::Error>(io::Error::new( io::ErrorKind::Other, - "error returned from ASYNC handler", + "error_from_ASYNC_handler", )) }) .connect() .await .unwrap(); - // let's prepare 9 jobs: - c.enqueue_many([ - Job::builder("panic_SYNC_handler_str") - .queue(local) - .args([local]) - .build(), - Job::builder("panic_SYNC_handler_String") - .queue(local) - .args([local]) - .build(), - Job::builder("panic_SYNC_handler_int") - .queue(local) - .args([local]) - .build(), - Job::builder("error_from_SYNC_handler") - .queue(local) - .args([local]) - .build(), - Job::builder("panic_ASYNC_handler_str") - .queue(local) - .args([local]) - .build(), - Job::builder("panic_ASYNC_handler_String") - .queue(local) - .args([local]) - .build(), - Job::builder("panic_ASYNC_handler_int") - .queue(local) - .args([local]) - .build(), - Job::builder("error_from_ASYNC_handler") - .queue(local) - .args([local]) - .build(), - Job::builder("no_handler_registered_for_this_jobtype_initially") - .queue(local) - .args([local]) - .build(), - ]) + // let's enqueue jobs first time and ... + c.enqueue_many( + job_kind_vs_error_msg + .keys() + .map(|&jkind| Job::builder(jkind).queue(local).args([local]).build()) + .collect::>(), + ) .await .unwrap(); - // let's consume all the jobs from the queue and fail them "in different ways" - for _ in 0..9 { + // ... consume all the jobs from the queue and _fail_ them + // "in different ways" (see our worker setup above) + for _ in 0..njobs { assert!(w.run_one(0, &[local]).await.unwrap()); } - // let's make sure all the jobs are re-enqueued + // let's now make sure all the jobs are re-enqueued c.requeue( MutationTarget::Retries, MutationFilter::builder().pattern(pattern.as_str()).build(), @@ -923,8 +909,11 @@ async fn test_panic_and_errors_in_handler() { self.chan.send(job).await } } - let (tx, mut rx) = tokio::sync::mpsc::channel(9); + let (tx, mut rx) = tokio::sync::mpsc::channel(njobs); let tx = sync::Arc::new(tx); + + // unlike the previus worker, this one is not failing the jobs, + // it is rather helping us to inspect them let mut w = Worker::builder() .register("panic_SYNC_handler_str", JobHandler::new(tx.clone())) .register("panic_SYNC_handler_String", JobHandler::new(tx.clone())) @@ -942,14 +931,44 @@ async fn test_panic_and_errors_in_handler() { .await .unwrap(); - for _ in 0..9 { + for _ in 0..njobs { assert!(w.run_one(0, &[local]).await.unwrap()); // reminder: we've requeued the failed jobs } - let mut jobs = Vec::with_capacity(9); - rx.recv_many(jobs.as_mut(), 9).await; - dbg!(jobs); - // TODO: Inspect the Job::failure for each case to see various error messages + // Let's await till the worker sends the jobs to us. + // + // Mote that if a tokio task inside `Worker::run_job` in cancelled(1), we may not receive a job + // via the channel and so `rx.recv_many` will just hang (and so the entire test run), + // hence a timeout we are adding here. + // + // (1)If you are curious, this can be easily reproduced inside the `Callback::Async(_)` arm + // of `Worker::run_job`, by swapping this line: + // ```rust + // spawn(processing_task).await + // ``` + // for something like: + // ```rust + // let handle = spawn(processing_task); + // handle.abort(); + // handle.await + // ``` + // and then running this test. + let mut jobs: Vec = Vec::with_capacity(njobs); + let nreceived = + tokio::time::timeout(Duration::from_secs(5), rx.recv_many(jobs.as_mut(), njobs)) + .await + .expect("all jobs to be recieved within the timeout period"); + + // all the jobs are now in the test's main thread + assert_eq!(nreceived, njobs); + + // let's verify that errors messages in each job's `Failure` are as expected + for job in jobs { + assert_eq!( + job.failure_message().as_ref(), + job_kind_vs_error_msg.get(job.kind()) + ); + } } #[tokio::test(flavor = "multi_thread")] @@ -1032,7 +1051,6 @@ async fn mutation_requeue_jobs() { assert_eq!(job.id(), &job_id); // sanity check let failure_info = job.failure().as_ref().unwrap(); - eprintln!("{:?}", &failure_info); assert_eq!(failure_info.retry_count, 0); assert_eq!( failure_info.retry_remaining, From e5bc76d0d05402fd1b331d3c7a07fe97fccfe93b Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Fri, 27 Dec 2024 17:18:57 +0400 Subject: [PATCH 10/18] Add to CHANGELOG.md. Fix clippy warning --- CHANGELOG.md | 1 + src/proto/single/mod.rs | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a43b567f..1ac76533 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support Faktory's `MUTATE` API ([#87]) - Make `Failure` struct public ([#89]) +- `Job::failure_message` convenience method ([#89]) ### Changed diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 7cdd54b4..8ed3042e 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -297,9 +297,7 @@ impl Job { /// /// To access the entire failure, if any, use [`Job::failure`]. pub fn failure_message(&self) -> Option<&str> { - self.failure() - .as_ref() - .and_then(|f| f.message.as_ref().map(|m| m.as_str())) + self.failure().as_ref().and_then(|f| f.message.as_deref()) } } From 1e1914c44e521a6d756fc7886ff7b94946fb8037 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavie=C5=82=20Michalkievi=C4=8D?= Date: Mon, 30 Dec 2024 16:58:55 +0400 Subject: [PATCH 11/18] Update tests/real/community.rs Co-authored-by: Jon Gjengset --- tests/real/community.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/real/community.rs b/tests/real/community.rs index 47e0fa47..c5e8c870 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1029,7 +1029,7 @@ async fn mutation_requeue_jobs() { // ... this time, instead of failing the job this time, let's // create a new woker that will just send the job // to the test thread so that we can inspect and - // assert on the failure + // assert on the failure from the first run let (tx, rx) = sync::mpsc::channel(); let tx = sync::Arc::new(sync::Mutex::new(tx)); let mut w = WorkerBuilder::default() From 7425ac6c2d164abb834c7616f5c439872b60de20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavie=C5=82=20Michalkievi=C4=8D?= Date: Mon, 30 Dec 2024 16:59:12 +0400 Subject: [PATCH 12/18] Update src/proto/single/mod.rs Co-authored-by: Jon Gjengset --- src/proto/single/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 8ed3042e..35210321 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -212,7 +212,7 @@ impl JobBuilder { #[derive(Serialize, Deserialize, Debug, Clone)] #[non_exhaustive] pub struct Failure { - /// [`Number`](Job::retry) of times this job can be retried. + /// [Number](Job::retry) of times this job can be retried. pub retry_count: usize, /// Number of remaining retry attempts. From 5cb4bd3055048b0fe3e5f0eb07a202cbbd5703c5 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 30 Dec 2024 18:39:40 +0400 Subject: [PATCH 13/18] Fix docs to Failure --- src/proto/single/mod.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 35210321..5320df5f 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -212,34 +212,37 @@ impl JobBuilder { #[derive(Serialize, Deserialize, Debug, Clone)] #[non_exhaustive] pub struct Failure { - /// [Number](Job::retry) of times this job can be retried. + /// Number of times this job has been retried. pub retry_count: usize, /// Number of remaining retry attempts. + /// + /// This is the difference between how many times this job + /// _can_ be retried (see [`Job::retry`]) and the number of retry + /// attempts that have already been made (see [`Failure::retry_count`]). #[serde(rename = "remaining")] pub retry_remaining: usize, /// Last time this job failed. pub failed_at: DateTime, - #[serde(skip_serializing_if = "Option::is_none")] /// When this job will be retried. /// /// This will be `None` if there are no retry /// attempts (see [`Failure::retry_remaining`]) left. - pub next_at: Option>, #[serde(skip_serializing_if = "Option::is_none")] + pub next_at: Option>, /// Error message, if any. - pub message: Option, #[serde(skip_serializing_if = "Option::is_none")] - #[serde(rename = "errtype")] + pub message: Option, /// Error kind, if known. + #[serde(rename = "errtype")] pub kind: Option, - #[serde(skip_serializing_if = "Option::is_none")] /// Stack trace from last failure, if any. + #[serde(skip_serializing_if = "Option::is_none")] pub backtrace: Option>, } @@ -285,8 +288,8 @@ impl Job { } /// Data about this job's most recent failure. - pub fn failure(&self) -> &Option { - &self.failure + pub fn failure(&self) -> Option<&Failure> { + self.failure.as_ref() } /// Error message for this job, if any. From f1d4ea31f99db23d25838443b46e3b3e8f935771 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 30 Dec 2024 18:52:16 +0400 Subject: [PATCH 14/18] Nuke Job::failure_message --- CHANGELOG.md | 1 - src/proto/single/mod.rs | 11 ----------- tests/real/community.rs | 9 ++++----- tests/real/utils.rs | 2 ++ 4 files changed, 6 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ac76533..a43b567f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support Faktory's `MUTATE` API ([#87]) - Make `Failure` struct public ([#89]) -- `Job::failure_message` convenience method ([#89]) ### Changed diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 5320df5f..94ff8d17 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -291,17 +291,6 @@ impl Job { pub fn failure(&self) -> Option<&Failure> { self.failure.as_ref() } - - /// Error message for this job, if any. - /// - /// A convenience method that, internally, will check if this - /// job has got a [`Failure`] and - if it does - will access - /// the failure's message (see [`Failure::message`]). - /// - /// To access the entire failure, if any, use [`Job::failure`]. - pub fn failure_message(&self) -> Option<&str> { - self.failure().as_ref().and_then(|f| f.message.as_deref()) - } } pub async fn write_command( diff --git a/tests/real/community.rs b/tests/real/community.rs index c5e8c870..358fcc1a 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -964,10 +964,9 @@ async fn test_panic_and_errors_in_handler() { // let's verify that errors messages in each job's `Failure` are as expected for job in jobs { - assert_eq!( - job.failure_message().as_ref(), - job_kind_vs_error_msg.get(job.kind()) - ); + let error_message_got = job.failure().as_ref().unwrap().message.as_ref().unwrap(); + let error_message_expected = *job_kind_vs_error_msg.get(job.kind()).unwrap(); + assert_eq!(error_message_got, error_message_expected); } } @@ -1050,7 +1049,7 @@ async fn mutation_requeue_jobs() { assert_eq!(job.id(), &job_id); // sanity check - let failure_info = job.failure().as_ref().unwrap(); + let failure_info = job.failure().unwrap(); assert_eq!(failure_info.retry_count, 0); assert_eq!( failure_info.retry_remaining, diff --git a/tests/real/utils.rs b/tests/real/utils.rs index 1ae08534..b14d24ac 100644 --- a/tests/real/utils.rs +++ b/tests/real/utils.rs @@ -1,3 +1,5 @@ +use faktory::Job; + #[macro_export] macro_rules! skip_check { () => { From 3ad1ea0f40666728bc6ca6643754eefd24396359 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 30 Dec 2024 18:52:41 +0400 Subject: [PATCH 15/18] Nuke Job::failure_message --- tests/real/utils.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/real/utils.rs b/tests/real/utils.rs index b14d24ac..1ae08534 100644 --- a/tests/real/utils.rs +++ b/tests/real/utils.rs @@ -1,5 +1,3 @@ -use faktory::Job; - #[macro_export] macro_rules! skip_check { () => { From 409e386c3b04540706b744fa04897b562ea34923 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 30 Dec 2024 19:02:28 +0400 Subject: [PATCH 16/18] Restore comment in test_panic_and_errors_in_handler test --- tests/real/community.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/real/community.rs b/tests/real/community.rs index 358fcc1a..48095260 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -879,7 +879,11 @@ async fn test_panic_and_errors_in_handler() { .unwrap(); // ... consume all the jobs from the queue and _fail_ them - // "in different ways" (see our worker setup above) + // "in different ways" (see our worker setup above); + // we _did_ consume and process the job, the processing result itself though + // was a failure; however, a panic in the handler was "intercepted" and communicated + // to the Faktory server via the FAIL command; + // note how the test run is not interrupted with a panic for _ in 0..njobs { assert!(w.run_one(0, &[local]).await.unwrap()); } From 80fe6940cc1b6bc8c2ca3ebf251dacb22da39245 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 30 Dec 2024 19:17:23 +0400 Subject: [PATCH 17/18] Add to the tests docs --- tests/real/community.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/real/community.rs b/tests/real/community.rs index 48095260..794ab7eb 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -880,10 +880,16 @@ async fn test_panic_and_errors_in_handler() { // ... consume all the jobs from the queue and _fail_ them // "in different ways" (see our worker setup above); + // // we _did_ consume and process the job, the processing result itself though // was a failure; however, a panic in the handler was "intercepted" and communicated - // to the Faktory server via the FAIL command; - // note how the test run is not interrupted with a panic + // to the Faktory server via the FAIL command, the error message and the backtrace, if any, + // will then be available to a Web UI user or to the worker consumes the retried job + // (if `Job::retry` is Some and > 0 and there are `Failure::retry_remaining` attempts left) + // in this job's `failure` field; see how we are consuming the retried jobs later + // in this test and examin the failure details; + // + // also note how the test run is not interrupted here with a panic for _ in 0..njobs { assert!(w.run_one(0, &[local]).await.unwrap()); } @@ -897,7 +903,7 @@ async fn test_panic_and_errors_in_handler() { .unwrap(); // now, let's create a worker who will only send jobs to the - // test's main thread to make some assertions + // test's main thread to make some assertions; struct JobHandler { chan: Arc>, } @@ -941,7 +947,7 @@ async fn test_panic_and_errors_in_handler() { // Let's await till the worker sends the jobs to us. // - // Mote that if a tokio task inside `Worker::run_job` in cancelled(1), we may not receive a job + // Note that if a tokio task inside `Worker::run_job` in cancelled(1), we may not receive a job // via the channel and so `rx.recv_many` will just hang (and so the entire test run), // hence a timeout we are adding here. // From 0576abad3d7356529dbb4bc7592ed083afb1cd1b Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 31 Dec 2024 17:47:56 +0400 Subject: [PATCH 18/18] Do not make Failure::kind pub --- src/proto/single/mod.rs | 5 ++++- tests/real/community.rs | 7 ++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 94ff8d17..8cd13ebf 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -237,9 +237,12 @@ pub struct Failure { #[serde(skip_serializing_if = "Option::is_none")] pub message: Option, + // This is Some("unknown") most of the time, and we are not making + // it public for now, see discussion: + // https://github.com/jonhoo/faktory-rs/pull/89#discussion_r1899423130 /// Error kind, if known. #[serde(rename = "errtype")] - pub kind: Option, + pub(crate) kind: Option, /// Stack trace from last failure, if any. #[serde(skip_serializing_if = "Option::is_none")] diff --git a/tests/real/community.rs b/tests/real/community.rs index 794ab7eb..9035749a 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -819,7 +819,10 @@ async fn test_panic_and_errors_in_handler() { ]); let njobs = job_kind_vs_error_msg.keys().len(); - // clean up + // clean up is needed when re-using the same Faktory container, since the + // Faktory server could have re-scheduled (or might be doing it right now) + // the failed jobs from the previous test run; to keep things clean, we are + // force-rescheduling and immediatey dropping any remainders let local = "test_panic_and_errors_in_handler"; let mut c = Client::connect().await.unwrap(); let pattern = format!(r#"*\"args\":\[\"{}\"\]*"#, local); @@ -1068,8 +1071,6 @@ async fn mutation_requeue_jobs() { assert_lt!(failure_info.failed_at, Utc::now()); assert_gt!(failure_info.failed_at, test_started_at); assert!(failure_info.next_at.is_some()); - assert_eq!(failure_info.kind.as_ref().unwrap(), "unknown"); // see Fail::generic - assert_eq!(failure_info.message.as_ref().unwrap(), panic_message); assert!(failure_info.backtrace.is_none()); }