diff --git a/CHANGELOG.md b/CHANGELOG.md index 862ab583..a43b567f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Support Faktory's `MUTATE` API ([#87]) +- Make `Failure` struct public ([#89]) ### Changed @@ -22,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security [#87]: https://github.com/jonhoo/faktory-rs/pull/87 +[#89]: https://github.com/jonhoo/faktory-rs/pull/89 ## [0.13.0] - 2024-10-27 diff --git a/src/lib.rs b/src/lib.rs index 02a88aec..5fc30fd5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -106,8 +106,8 @@ mod worker; pub use crate::error::Error; pub use crate::proto::{ - Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, MutationFilter, - MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId, + Client, Connection, DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId, + MutationFilter, MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId, }; pub use crate::worker::{JobRunner, StopDetails, StopReason, Worker, WorkerBuilder}; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 24fe7533..7f16a061 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -12,8 +12,8 @@ pub use client::{Client, Connection}; mod single; pub use single::{ - DataSnapshot, FaktoryState, Job, JobBuilder, JobId, MutationFilter, MutationFilterBuilder, - MutationTarget, ServerSnapshot, WorkerId, + DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId, MutationFilter, + MutationFilterBuilder, MutationTarget, ServerSnapshot, WorkerId, }; pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl}; diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 2cbe4087..8cd13ebf 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -124,6 +124,8 @@ pub struct Job { /// Defaults to 25. #[serde(skip_serializing_if = "Option::is_none")] #[builder(default = "Some(JOB_DEFAULT_RETRY_COUNT)")] + // TODO: this should probably be a usize, see Failure::retry_count + // TODO: and Failure::retry_remaining. This can go to 0.14 release pub retry: Option, /// The priority of this job from 1-9 (9 is highest). @@ -206,19 +208,45 @@ impl JobBuilder { } } +/// Details on a job's failure. #[derive(Serialize, Deserialize, Debug, Clone)] +#[non_exhaustive] pub struct Failure { - retry_count: usize, - failed_at: String, - #[serde(skip_serializing_if = "Option::is_none")] - next_at: Option, + /// Number of times this job has been retried. + pub retry_count: usize, + + /// Number of remaining retry attempts. + /// + /// This is the difference between how many times this job + /// _can_ be retried (see [`Job::retry`]) and the number of retry + /// attempts that have already been made (see [`Failure::retry_count`]). + #[serde(rename = "remaining")] + pub retry_remaining: usize, + + /// Last time this job failed. + pub failed_at: DateTime, + + /// When this job will be retried. + /// + /// This will be `None` if there are no retry + /// attempts (see [`Failure::retry_remaining`]) left. #[serde(skip_serializing_if = "Option::is_none")] - message: Option, + pub next_at: Option>, + + /// Error message, if any. #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option, + + // This is Some("unknown") most of the time, and we are not making + // it public for now, see discussion: + // https://github.com/jonhoo/faktory-rs/pull/89#discussion_r1899423130 + /// Error kind, if known. #[serde(rename = "errtype")] - kind: Option, + pub(crate) kind: Option, + + /// Stack trace from last failure, if any. #[serde(skip_serializing_if = "Option::is_none")] - backtrace: Option>, + pub backtrace: Option>, } impl Job { @@ -263,8 +291,8 @@ impl Job { } /// Data about this job's most recent failure. - pub fn failure(&self) -> &Option { - &self.failure + pub fn failure(&self) -> Option<&Failure> { + self.failure.as_ref() } } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 2b6aab89..3547d438 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -360,7 +360,22 @@ impl Worker { let fail = match e { Failed::BadJobType(jt) => Fail::generic(jid, format!("No handler for {}", jt)), Failed::Application(e) => Fail::generic_with_backtrace(jid, e), - Failed::HandlerPanic(e) => Fail::generic_with_backtrace(jid, e), + Failed::HandlerPanic(e) => { + if e.is_cancelled() { + Fail::generic(jid, "job processing was cancelled") + } else if e.is_panic() { + let panic_obj = e.into_panic(); + if panic_obj.is::() { + Fail::generic(jid, *panic_obj.downcast::().unwrap()) + } else if panic_obj.is::<&'static str>() { + Fail::generic(jid, *panic_obj.downcast::<&'static str>().unwrap()) + } else { + Fail::generic(jid, "job processing panicked") + } + } else { + Fail::generic_with_backtrace(jid, e) + } + } }; self.worker_states.register_failure(worker, fail.clone()); self.c.issue(&fail).await?.read_ok().await?; diff --git a/tests/real/community.rs b/tests/real/community.rs index b5646db0..9035749a 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1,12 +1,17 @@ -use crate::{assert_gte, skip_check}; +use crate::{assert_gt, assert_gte, assert_lt, skip_check}; use chrono::Utc; use faktory::{ - Client, Job, JobBuilder, JobId, MutationFilter, MutationTarget, StopReason, Worker, + Client, Job, JobBuilder, JobId, JobRunner, MutationFilter, MutationTarget, StopReason, Worker, WorkerBuilder, WorkerId, }; +use rand::Rng; use serde_json::Value; +use std::collections::HashMap; +use std::panic::panic_any; +use std::sync::Arc; use std::time::Duration; use std::{io, sync}; +use tokio::sync::mpsc::error::SendError; use tokio::time::{self as tokio_time}; use tokio_util::sync::CancellationToken; @@ -678,7 +683,7 @@ async fn test_jobs_created_with_builder() { use std::future::Future; use std::pin::Pin; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, Sender}; fn process_hard_task( sender: sync::Arc>, ) -> Box< @@ -795,46 +800,195 @@ async fn test_jobs_with_blocking_handlers() { } #[tokio::test(flavor = "multi_thread")] -async fn test_panic_in_handler() { +async fn test_panic_and_errors_in_handler() { skip_check!(); - let local = "test_panic_in_handler"; + let job_kind_vs_error_msg = HashMap::from([ + ("panic_SYNC_handler_str", "panic_SYNC_handler_str"), + ("panic_SYNC_handler_String", "panic_SYNC_handler_String"), + ("panic_SYNC_handler_int", "job processing panicked"), + ("error_from_SYNC_handler", "error_from_SYNC_handler"), + ("panic_ASYNC_handler_str", "panic_ASYNC_handler_str"), + ("panic_ASYNC_handler_String", "panic_ASYNC_handler_String"), + ("panic_ASYNC_handler_int", "job processing panicked"), + ("error_from_ASYNC_handler", "error_from_ASYNC_handler"), + ( + "no_handler_registered_for_this_jobtype_initially", + "No handler for no_handler_registered_for_this_jobtype_initially", + ), + ]); + let njobs = job_kind_vs_error_msg.keys().len(); + + // clean up is needed when re-using the same Faktory container, since the + // Faktory server could have re-scheduled (or might be doing it right now) + // the failed jobs from the previous test run; to keep things clean, we are + // force-rescheduling and immediatey dropping any remainders + let local = "test_panic_and_errors_in_handler"; + let mut c = Client::connect().await.unwrap(); + let pattern = format!(r#"*\"args\":\[\"{}\"\]*"#, local); + c.requeue( + MutationTarget::Retries, + MutationFilter::builder().pattern(pattern.as_str()).build(), + ) + .await + .unwrap(); + c.queue_remove(&[local]).await.unwrap(); let mut w = Worker::builder::() - .register_blocking_fn("panic_SYNC_handler", |_j| { - panic!("Panic inside sync the handler..."); + // sync handlers + .register_blocking_fn("panic_SYNC_handler_str", |_j| { + panic!("panic_SYNC_handler_str"); + }) + .register_blocking_fn("panic_SYNC_handler_String", |_j| { + panic_any("panic_SYNC_handler_String".to_string()); + }) + .register_blocking_fn("panic_SYNC_handler_int", |_j| { + panic_any(0); + }) + .register_blocking_fn("error_from_SYNC_handler", |_j| { + Err::<(), io::Error>(io::Error::new( + io::ErrorKind::Other, + "error_from_SYNC_handler", + )) + }) + // async handlers + .register_fn("panic_ASYNC_handler_str", |_j| async move { + panic!("panic_ASYNC_handler_str"); + }) + .register_fn("panic_ASYNC_handler_String", |_j| async move { + panic_any("panic_ASYNC_handler_String".to_string()); + }) + .register_fn("panic_ASYNC_handler_int", |_j| async move { + panic_any(0); }) - .register_fn("panic_ASYNC_handler", |_j| async move { - panic!("Panic inside async handler..."); + .register_fn("error_from_ASYNC_handler", |_j| async move { + Err::<(), io::Error>(io::Error::new( + io::ErrorKind::Other, + "error_from_ASYNC_handler", + )) }) .connect() .await .unwrap(); - let mut c = Client::connect().await.unwrap(); - - c.enqueue(Job::builder("panic_SYNC_handler").queue(local).build()) - .await - .unwrap(); - + // let's enqueue jobs first time and ... + c.enqueue_many( + job_kind_vs_error_msg + .keys() + .map(|&jkind| Job::builder(jkind).queue(local).args([local]).build()) + .collect::>(), + ) + .await + .unwrap(); + + // ... consume all the jobs from the queue and _fail_ them + // "in different ways" (see our worker setup above); + // // we _did_ consume and process the job, the processing result itself though // was a failure; however, a panic in the handler was "intercepted" and communicated - // to the Faktory server via the FAIL command; - // note how the test run is not interrupted with a panic - assert!(w.run_one(0, &[local]).await.unwrap()); + // to the Faktory server via the FAIL command, the error message and the backtrace, if any, + // will then be available to a Web UI user or to the worker consumes the retried job + // (if `Job::retry` is Some and > 0 and there are `Failure::retry_remaining` attempts left) + // in this job's `failure` field; see how we are consuming the retried jobs later + // in this test and examin the failure details; + // + // also note how the test run is not interrupted here with a panic + for _ in 0..njobs { + assert!(w.run_one(0, &[local]).await.unwrap()); + } + + // let's now make sure all the jobs are re-enqueued + c.requeue( + MutationTarget::Retries, + MutationFilter::builder().pattern(pattern.as_str()).build(), + ) + .await + .unwrap(); + + // now, let's create a worker who will only send jobs to the + // test's main thread to make some assertions; + struct JobHandler { + chan: Arc>, + } + impl JobHandler { + pub fn new(chan: Arc>) -> Self { + Self { chan } + } + } + #[async_trait::async_trait] + impl JobRunner for JobHandler { + type Error = SendError; + async fn run(&self, job: Job) -> Result<(), Self::Error> { + self.chan.send(job).await + } + } + let (tx, mut rx) = tokio::sync::mpsc::channel(njobs); + let tx = sync::Arc::new(tx); - c.enqueue(Job::builder("panic_ASYNC_handler").queue(local).build()) + // unlike the previus worker, this one is not failing the jobs, + // it is rather helping us to inspect them + let mut w = Worker::builder() + .register("panic_SYNC_handler_str", JobHandler::new(tx.clone())) + .register("panic_SYNC_handler_String", JobHandler::new(tx.clone())) + .register("panic_SYNC_handler_int", JobHandler::new(tx.clone())) + .register("error_from_SYNC_handler", JobHandler::new(tx.clone())) + .register("panic_ASYNC_handler_str", JobHandler::new(tx.clone())) + .register("panic_ASYNC_handler_String", JobHandler::new(tx.clone())) + .register("panic_ASYNC_handler_int", JobHandler::new(tx.clone())) + .register("error_from_ASYNC_handler", JobHandler::new(tx.clone())) + .register( + "no_handler_registered_for_this_jobtype_initially", + JobHandler::new(tx.clone()), + ) + .connect() .await .unwrap(); - // same for async handler, note how the test run is not interrupted with a panic - assert!(!w.is_terminated()); - assert!(w.run_one(0, &[local]).await.unwrap()); + for _ in 0..njobs { + assert!(w.run_one(0, &[local]).await.unwrap()); // reminder: we've requeued the failed jobs + } + + // Let's await till the worker sends the jobs to us. + // + // Note that if a tokio task inside `Worker::run_job` in cancelled(1), we may not receive a job + // via the channel and so `rx.recv_many` will just hang (and so the entire test run), + // hence a timeout we are adding here. + // + // (1)If you are curious, this can be easily reproduced inside the `Callback::Async(_)` arm + // of `Worker::run_job`, by swapping this line: + // ```rust + // spawn(processing_task).await + // ``` + // for something like: + // ```rust + // let handle = spawn(processing_task); + // handle.abort(); + // handle.await + // ``` + // and then running this test. + let mut jobs: Vec = Vec::with_capacity(njobs); + let nreceived = + tokio::time::timeout(Duration::from_secs(5), rx.recv_many(jobs.as_mut(), njobs)) + .await + .expect("all jobs to be recieved within the timeout period"); + + // all the jobs are now in the test's main thread + assert_eq!(nreceived, njobs); + + // let's verify that errors messages in each job's `Failure` are as expected + for job in jobs { + let error_message_got = job.failure().as_ref().unwrap().message.as_ref().unwrap(); + let error_message_expected = *job_kind_vs_error_msg.get(job.kind()).unwrap(); + assert_eq!(error_message_got, error_message_expected); + } } #[tokio::test(flavor = "multi_thread")] async fn mutation_requeue_jobs() { skip_check!(); + let test_started_at = Utc::now(); + let max_retries = rand::thread_rng().gen_range(2..25); + let panic_message = "Failure should be recorded"; // prepare a client and clean up the queue // to ensure there are no left-overs @@ -852,14 +1006,17 @@ async fn mutation_requeue_jobs() { // prepare a worker that will fail the job unconditionally let mut worker = Worker::builder::() .register_fn(local, move |_job| async move { - panic!("Failure should be recorded"); + panic_any(panic_message); }) .connect() .await .unwrap(); // enqueue a job - let job = JobBuilder::new(local).queue(local).build(); + let job = JobBuilder::new(local) + .queue(local) + .retry(max_retries) + .build(); let job_id = job.id().clone(); client.enqueue(job).await.unwrap(); @@ -872,7 +1029,7 @@ async fn mutation_requeue_jobs() { let had_one = worker.run_one(0, &[local]).await.unwrap(); assert!(!had_one); - // ... we can force it + // ... we can force it, so let's requeue the job and ... client .requeue( MutationTarget::Retries, @@ -881,11 +1038,41 @@ async fn mutation_requeue_jobs() { .await .unwrap(); - // the job has been re-enqueued and we consumed it again - let had_one = worker.run_one(0, &[local]).await.unwrap(); - assert!(had_one); + // ... this time, instead of failing the job this time, let's + // create a new woker that will just send the job + // to the test thread so that we can inspect and + // assert on the failure from the first run + let (tx, rx) = sync::mpsc::channel(); + let tx = sync::Arc::new(sync::Mutex::new(tx)); + let mut w = WorkerBuilder::default() + .hostname("tester".to_string()) + .wid(WorkerId::new(local)) + .register_fn(local, move |j| { + let tx = sync::Arc::clone(&tx); + Box::pin(async move { + tx.lock().unwrap().send(j).unwrap(); + Ok::<(), io::Error>(()) + }) + }) + .connect() + .await + .unwrap(); + assert!(w.run_one(0, &[local]).await.unwrap()); + let job = rx.recv().unwrap(); - // TODO: Examine the job's failure (will need a dedicated PR) + assert_eq!(job.id(), &job_id); // sanity check + + let failure_info = job.failure().unwrap(); + assert_eq!(failure_info.retry_count, 0); + assert_eq!( + failure_info.retry_remaining, + max_retries as usize - failure_info.retry_count + ); + assert_lt!(failure_info.failed_at, Utc::now()); + assert_gt!(failure_info.failed_at, test_started_at); + assert!(failure_info.next_at.is_some()); + assert_eq!(failure_info.message.as_ref().unwrap(), panic_message); + assert!(failure_info.backtrace.is_none()); } #[tokio::test(flavor = "multi_thread")]