Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When returning an error from a job, 'last_error' is not set and 'status' is in Done #182

Closed
ShockleyJE opened this issue Oct 11, 2023 · 8 comments
Assignees
Labels
enhancement New feature or request

Comments

@ShockleyJE
Copy link

ShockleyJE commented Oct 11, 2023

Hello, I have been evaluating apalis w/ postgres as an executor and have a reproducible error where failures in a job are not recorded as failed in the jobs table. As such, I'm not sure with how to proceed with having the job retried on failure.

Consider the following code snippet:

In this snippet I create a message, based on the job/ TokenDetails, and attempt to publish the message to a destination queue. I return OK or an Err based on the result of whether the message was able to be published

impl Job for TokenDetails {
    const NAME: &'static str = "apalis::TokenDetails";
}

/// Create the SQS client, and send TokenDetails as the message content
pub async fn send_token_event_example(job: TokenDetails, _ctx: JobContext) -> Result<(), JobError>  {
    ....

    match queue_client.send_one(&message).await {
        Ok(_) => {
            log::info!("Message sent successfully from job");
            Ok(())
        },
        Err(e) => {
            //apalis reference: https://github.com/geofmureithi/apalis/blob/master/examples/sentry/src/main.rs#L93
            log::error!("Message send failure from job. Is your AWS default environment region correct? Error: {:#?}", e);
            Err(JobError::Failed(Box::new(e)))
        },
    }
}

I can trigger an error in this process by changing my AWS default region. Via logs I confirm this is the case that my behavior is causing an error:

{"v":0,"name":"authn_service","msg":"Error when sending message via SQS lib: <....>}

{"v":0,"name":"authn_service","msg":"Message send failure from job. Is your AWS default environment region correct?"}

But there is no error recorded in jobs.last_error

job id job_type status attempts max_attempts run_at last_error lock_at lock_by done_at
... JID-01HCFZGTDGH674EX2SSAGWCKHZ apalis::TokenDetails Done 0 25 2023-10-11 18:20:55.604719+00 2023-10-11 18:20:55.716267+00 tasty-orange-0 2023-10-11 18:20:56.270764+00

For reference, this is how I am creating the apalis worker. futures are polled later on with the API actix-web server


                    let worker = Monitor::new()
                        // Start with an initial worker count of one
                        .register_with_count(1, move |c| {
                            WorkerBuilder::new(format!("tasty-orange-{c}"))
                                // .layer(TraceLayer::new())
                                .layer(BufferLayer::<JobRequest<TokenDetails>>::new(250))
                                .with_storage_config(worker_pg.clone(), |cfg| {
                                    cfg
                                        // Set the buffer size to 10 ( Pick 10 jobs per query)
                                        .buffer_size(10)
                                        // Lower the fetch interval because postgres is waiting for notifications
                                        .fetch_interval(Duration::from_millis(200))
                                })
                                .build_fn(send_token_event_example)
                        })
                        .run();
                    futures.push(Box::pin(worker));

@geofmureithi geofmureithi self-assigned this Oct 12, 2023
@geofmureithi geofmureithi added the enhancement New feature or request label Oct 12, 2023
@geofmureithi
Copy link
Owner

Currently the functionality is not implemented implicitly (I can add that in 0.5).
To do what you need. Consider using a custom Layer and calling update_with_id.

@ShockleyJE
Copy link
Author

OK. I am not committed to marking a job Failed and having it re-executed implicitly (though, implictly has a nice feel to it). If this has to be done explicitly, that's fine too! I just need to be able to mark a job as Failed and have it re-executed at all.

Since I failed to find an example for how to do this explicitly in /examples, I assumed re-executing on failure "just worked" implicitly based on how the result is returned in the sentry example.

I would happily submit a PR back with an example in either case if you wouldn't mind giving me a starting point for re-executing a job on failure, either explicitly or implicitly.

@ShockleyJE
Copy link
Author

I was able to work around this by adding a cron service, and updating to 0.4.5, to rerun the individual job logic for each item that hasn't been marked as received by the downstream system.

Haven't had the opportunity to figure out the custom layers & explicit method yet but will update when I can

@geofmureithi
Copy link
Owner

I encourage you to look at https://github.com/geofmureithi/apalis/blob/master/packages/apalis-core/src/layers/ack/mod.rs
Ideally you would want to:

  1. Create a Retry trait
  2. Create a RetryLayer which would take in a type that implements Retry
  3. Implement Retry for the different backends

You can start with the first two for the PR and then we can discuss.

@geofmureithi
Copy link
Owner

To be honest, we might be just needing a combination of Ack and Retry in the same layer.

@geofmureithi
Copy link
Owner

#215 (comment) Also applies here.

@geofmureithi
Copy link
Owner

Currently AckLayer just handles success, This can be easily be added.

@geofmureithi
Copy link
Owner

This has been resolved and tested in #374

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants