Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apalis max retries strategy #503

Open
zeljkoX opened this issue Jan 21, 2025 · 14 comments
Open

Apalis max retries strategy #503

zeljkoX opened this issue Jan 21, 2025 · 14 comments

Comments

@zeljkoX
Copy link

zeljkoX commented Jan 21, 2025

Thank you for your work on this great package!

I have been trying to use it and would like to clarify how the retry mechanism works in the background.

I am sharing my setup and handler example below.

The issue I am experiencing with the latest apalis and apalis-redis packages is that I am unable to set the maximum retries when the handler fails. When a job fails with the error Error: Failed, it keeps retrying in a loop, even when Config.set_max_retries(2) is set.

I would like to clarify whether this is a bug or a misunderstanding of the library's internals.

I have also used a retry policy, and that seems to work; I can see that the Attempts field gets incremented. However, once it reaches the maximum number of retries defined in the retry policy, it starts over from the beginning and continues in a loop.

One thing I noticed is that when logging RedisContext from the handler, the max_attempts value is set to 0:

Context: RedisContext { max_attempts: 0, lock_by: Some(WorkerId { name: "request_handler" }), run_at: None }

I would like to get information about how to track the number of retries and potentially return an Abort error from the handler, or have this handled automatically.

Additionally, I am interested in learning about the best practices for handling "dead" jobs.

pub async fn setup_workers(app_state: ThinData<AppState>) -> Result<()> {
    let conn = apalis_redis::connect(ServerConfig::from_env().redis_url.clone())
        .await
        .expect("Could not connect to Redis Jobs DB");

    let config = Config::default().set_namespace("test").set_max_retries(2);

    info!("config :{}?", config.get_max_retries());

    let mut storage = RedisStorage::new_with_config(conn, config);

    storage
        .push(
            Request::new("", ""),
        )
        .await?;

    let request_queue_worker = WorkerBuilder::new("request_handler")
        .layer(ErrorHandlingLayer::new())
        .enable_tracing()
        .catch_panic()
        .data(app_state.clone())
        .backend(storage)
        .build_fn(example_request_handler);

    let monitor_future = Monitor::new()
        .register(request_queue_worker)
        .on_event(|e| {
            let worker_id = e.id();
            match e.inner() {
                Event::Engage(task_id) => {
                    info!("Worker [{worker_id}] got a job with id: {task_id}");
                }
                Event::Error(e) => {
                    error!("Worker [{worker_id}] encountered an error: {e}");
                }

                Event::Exit => {
                    info!("Worker [{worker_id}] exited");
                }
                Event::Idle => {
                    info!("Worker [{worker_id}] is idle");
                }
                Event::Start => {
                    info!("Worker [{worker_id}] started");
                }
                Event::Stop => {
                    info!("Worker [{worker_id}] stopped");
                }
                _ => {}
            }
        })
        .shutdown_timeout(Duration::from_millis(5000))
        .run_with_signal(async {
            let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
                .expect("Failed to create SIGINT signal");
            let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
                .expect("Failed to create SIGTERM signal");

            info!("Monitor started");

            tokio::select! {
                _ = sigint.recv() => info!("Received SIGINT."),
                _ = sigterm.recv() => info!("Received SIGTERM."),
            };

            info!("Monitor shutting down");

            Ok(())
        });
    tokio::spawn(async move {
        if let Err(e) = monitor_future.await {
            error!("Monitor error: {}", e);
        }
    });
    info!("Monitor shutdown complete");
    Ok(())
}

Handler Fn

pub async fn example_request_handler(
    job: Job<TransactionRequest>,
    state: Data<ThinData<AppState>>,
    attempt: Attempt,
    worker: Worker<Context>,
    task_id: TaskId,
    ctx: RedisContext,
) -> Result<(), Error> {
    info!("Handling transaction request: {:?}", job.data);
    info!("Attempt: {:?}", attempt);
    info!("Worker: {:?}", worker);
    info!("Task ID: {:?}", task_id);
    info!("Context: {:?}", ctx);

    let result = handle_request(job.data, state).await;

    match result {
        Ok(_) => {
            info!("request handled successfully");
            #[allow(clippy::needless_return)]
            return Ok(());
        }
        Err(e) => {
            info!("request failed: {:?}", e);
            #[allow(clippy::needless_return)]
            return Err(Error::Failed(Arc::new(
                "Failed to handle transaction request".into(),
            )));
        }
    }
}

pub async fn handle_request(
    request: Request,
    state: Data<ThinData<AppState>>,
) -> Result<()> {
     
   // logic that throws 

    Ok(())
}

@geofmureithi
Copy link
Owner

This seems like a bug.

Your code is not complete though, there is no retry mechanism in your code.

@zeljkoX
Copy link
Author

zeljkoX commented Jan 22, 2025

This seems like a bug.

Your code is not complete though, there is no retry mechanism in your code.

Thanks for your answer!

If you refer to RetryPolicy, infinite loop with processing job happens with and without.
When retry policy is added Attempt field is incremented. I have tried RetryPolicy and backoff implementation from issue comments.

Worker:

.retry(RetryPolicy::default())

However, when number of retries reaches limit it just starts from beginning. Seems like job gets added again.
From the logs i noticed that it is retried every 30s.
I was able to change frequency of retry when changing Config default value enqueue_scheduled:

Config:

let config = Config::default()
      .set_namespace(namespace)
      .set_max_retries(5)
      .set_enqueue_scheduled(Duration::from_secs(10));

Just to confirm, job should be retried for number Config max_retries in case when Error:Failed is returned even when Worker retry policy is not used?

Regarding RetryPolicy I am wondering, while retrying and waiting for timeout is worker occupied and waiting for timeout to retry or it is serving other jobs in meantime?

@geofmureithi
Copy link
Owner

Hmm this is an interesting case. It is possible that the job is killed but that is not handled correctly. I do have a PR open for retries tests. #498

@geofmureithi
Copy link
Owner

Just to answer your question, It should run the number of retries in the policy. In this case it should be 5. Ideally you would have 5 retries in memory, then the storage can decide whether to put it back into the queue. This can repeat until max retries. This behaviour is not standardized but am hoping to do so in the PR currently shared.
You might be able to permanently kill a job by returning Err(Error::Kill) this should stop any further retries.

@reneklacan
Copy link

I just noticed that we are experiencing same behaviour:

I have also used a retry policy, and that seems to work; I can see that the Attempts field gets incremented. However, once it reaches the maximum number of retries defined in the retry policy, it starts over from the beginning and continues in a loop.

Jobs that should only be retried once are being retrying forever

@geofmureithi
Copy link
Owner

I am working on a test for retries to ensure all storage conform with specific retry behavior on this PR.
@reneklacan If you can provide a code sample of configs and layer part of code that would be nice.

@reneklacan
Copy link

@geofmureithi I think the issue is related to

// TODO: Increase the attempts

Which you mentioned on other issue, that retries are not synced with backend.

I was able to replicate it with just the default retry layer + policy with the job that returns Err

@geofmureithi
Copy link
Owner

I will work to fix this ASAP

@reneklacan
Copy link

@geofmureithi After encountering so many issues of various kinds, I wonder if it wouldn't be beneficial to explicitly say on top ofthe readme that Apalis is not production-ready yet (sub v1 might imply it but it wouldn't hurt to be more explicit about it)

@geofmureithi
Copy link
Owner

I know when you experience bugs it can be frustrating but the core parts of the project such as Worker and Request are production ready. That has been the biggest focus as it affects everyone. The issues you are facing are related to apalis-redis. I intend to split the packages into different repos in the next minor release. apalis is production ready but apalis-redis is not. I can add the message in the apalis-redis repo.

@geofmureithi
Copy link
Owner

@reneklacan Also remember I had warned that using the default retry layer does not update the backend here and again here
All these issues you have mentioned, are known issues and documented.

@Tameflame
Copy link

Getting this exact error using "retry" and "apalis-redis".

I see that tower has it's own retry layer available, any guidance on how I can use it in the meantime?

Much appreciated for all the work you do, btw! Loving this crate!

@geofmureithi
Copy link
Owner

@Tameflame I am almost done with #498 then I will fix the layer to work as expected. There is an example in #399 But I dont recommend to use it as is since it does not sync with the backend. I will do a small PR and we should be good with this ASAP :)

@geofmureithi
Copy link
Owner

@zeljkoX Can you verify this is resolved in #498

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants