Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression: RedisStorage in v0.6.0 #466

Closed
AzHicham opened this issue Nov 28, 2024 · 25 comments · Fixed by #472
Closed

Regression: RedisStorage in v0.6.0 #466

AzHicham opened this issue Nov 28, 2024 · 25 comments · Fixed by #472

Comments

@AzHicham
Copy link
Contributor

AzHicham commented Nov 28, 2024

Hello,

I started to test the latest release 0.6.0 with Redis and found a regression.

I noticed that all tasks present in the redis queue NAMESPACE:active are fetched right away by a worker even though I'm using the following config:

let config = Config::default()
        .set_namespace(T::BROKER_NAMESPACE_JOB)
        .set_buffer_size(1)   // **Setting a buffer size of 1 to not fetch more than a task, allowing potential other workers to fetch them**
        .set_poll_interval(Duration::from_millis(1000))
        .set_max_retries(1)
        .set_keep_alive(Duration::from_secs(120));

let product_worker_name = format!("Worker-{}-{}", T::PRODUCT_ID, random::<u32>());

let product_worker = WorkerBuilder::new(product_worker_name)
        .layer(ConcurrencyLimitLayer::new(1)) // Allow running 1 job per worker at a time
        .data(backend.clone())
        .chain(|svc| svc.check_service_clone::<Identity>())
        .backend(RedisStorage::new_with_config(
            redis_connection.clone(),
            config,
        ))
        .build_fn(handle_task::<T, HttpDispatcherClient, HttpSlideClient>);

I think the regression is coming from this line :

let res = self.fetch_next(worker.id()).await;

IMO, we should have a safety here in order to not fetch from Redis in case the buffer is full ??
WDYT ?

EDIT: In order to do that I think we need to use Sender/Receiver providing methode like is_full ? or at least len & size
A good candidate could be async_channel ?

@geofmureithi
Copy link
Owner

Hmm, we might not need to use async_channel. We could just compare the worker task count with the buffer size, This would be a quick fix too. If you can apply a quick PR that would be nice,
Another fix might be adding an atomic bool in Worker context for is_ready. Both would be straight forward. Let me know your thoughts.

@AzHicham
Copy link
Contributor Author

I'll try that ;)

@AzHicham
Copy link
Contributor Author

TBH I don't know how to connect the task_count to buffer_size.

I can set a cond like this worker.task_count() == 0 then fetch from redis.
This is nice in my case because I have a buffer_size of 1 and I use a ConcurrencyLimitLayer::new(1) but in the case
I have a ConcurrencyLimitLayer::new(10) for instance I don't want the task_count to reach 0 before fetching new tasks from Redis

@geofmureithi
Copy link
Owner

ConcurrencyLimitLayer::new(10) just means the worker cannot be executing more than 10 tasks at a time.
Our goal here is that we need not to fetch when the service is not ready. This was the previous behavior. But currently we are using tower::CallAll which is possibly why consuming everything, coz the stream is being consumed to the end.
In all essence, we should use ConcurrencyLayer rather than buffer size. Buffer size is specific to redis and is about how many jobs to fetch at a time. I think I have some ideas on how to get this behavior back, let me write something tonight and we can discuss it tomorrow.

@AzHicham
Copy link
Contributor Author

OK thanks for your help !!!

@AzHicham
Copy link
Contributor Author

AzHicham commented Nov 28, 2024

The thing I don't clearly understand if the definition of is_ready, for me it's:
Is the worker able to start a new task ? This may depend on ConcurrencyLimitLayer.

Indeed if instead of tower::CallAll we can use another mechanism in order to not start task until the workze is_ready, eg able to start new task, then it should solve this issue

EDIT: IMO buffer_size should not be specific to redis.
IMO this is a parameters that can influence latency / performance.
In order to achieve something like 1M tasks/sec for instance one must fetch taks by batch of 1K tasks for example and not 1 by one. This should be true for redis, rmq & sql db

@geofmureithi
Copy link
Owner

Meanwhile, could you check something for me?
Modify the tower::CallAllUnordered to tower::CallAll in Worker::poll_jobs in apalis-core and see if it works

@geofmureithi
Copy link
Owner

EDIT: IMO buffer_size should not be specific to redis.

I should have said its backend specific since some backends like messagequeues already handle this for themselves.
Workers just take Stream and consume them. To control this from a worker perspective, you need to use ConcurencyLayer

@AzHicham
Copy link
Contributor Author

Meanwhile, could you check something for me?
Modify the tower::CallAllUnordered to tower::CallAll in Worker::poll_jobs in apalis-core and see if it works

Same behavior with tower::CallAll. All tasks are fetched from active and put into inflight :/

@geofmureithi
Copy link
Owner

Ok, I already have something in mind. Hopefully its a single change :). Give me a few hrs

@geofmureithi
Copy link
Owner

Hey, I have a small layer added to worker which checks if worker is ready in the branch fix/poll-when-ready
Could you apply the check, on redis before fetch next, combine it with concurrency and see if it works?

@AzHicham
Copy link
Contributor Author

AzHicham commented Nov 29, 2024

Hello,

Still not working, but I think you forget to push some changes ? Because on this branch I only see the layer, but not how the AtomicBool is updated.

Edit: my bad I see it!!

@geofmureithi
Copy link
Owner

I need you to do something like:

if worker.is_ready() {
  fetch_next().await
}

In redis storage. Are you sure its not working?

@AzHicham
Copy link
Contributor Author

AzHicham commented Nov 29, 2024

It's getting better, with :

is_ready: Arc::new(AtomicBool::new(true)),

and

if worker.is_ready() {
  fetch_next().await
}

Fetch occurs only when the worker is not busy.
But still with a buffer_size of 1, a task is fetched while the worker is running.
Eg:
I push 3 tasks.
1 task is launched (concurrency still limited to 1)
1 task is i think the buffer
1 is in the redis active queue

@geofmureithi
Copy link
Owner

Good!, I might need to use load(Acquire) this may fix it. But we are headed there.

@AzHicham
Copy link
Contributor Author

AzHicham commented Nov 29, 2024

Another weired behaviour but may be I'll open another dedicated issue is when I send a SIGTERM/SIGINT while using this code:

...
monitor.run_with_signal(shutdown_signal()).await?;

pub(crate) async fn shutdown_signal() -> Result<(), io::Error> {
    let mut sigterm = sigterm();
    let mut sigint = sigint();
    select! {
        biased;
        _ = sigterm.recv() => info!("SIGTERM signal. Exit now !!"),
        _ = sigint.recv() => info!("SIGINT signal. Exit now !!"),
    }
    Ok(())
}

Then the worker finish to handle all tasks and then exit, even the tasks that is still in the Redis active queue. (Following my previous comment all 3 tasks are done).

Note: this code works with apalis v0.5.5.

@geofmureithi
Copy link
Owner

geofmureithi commented Nov 29, 2024 via email

@geofmureithi
Copy link
Owner

Hey, I think this has been resolved with the current push.

@geofmureithi
Copy link
Owner

Also I have created this issue tower-rs/tower#801 which would also relates to this.

@geofmureithi geofmureithi linked a pull request Dec 1, 2024 that will close this issue
@AzHicham
Copy link
Contributor Author

AzHicham commented Dec 1, 2024

Hello,
Thanks a lot for your work, unfortunately I have two issues (I checkout commit 408fc4f)

  • only one of my two workers get registered into Redis consumers queue no issue on master.
  • I removed the one I do not use for my test but tasks are not getting handled by my worker. I flushed the entire redis db just to be sure.
    I also put a dbg!() into the loop { select! } but nothing show up

@geofmureithi
Copy link
Owner

geofmureithi commented Dec 2, 2024

Lol, look at what I did:

    /// Start running the worker
    pub fn start(&self) {
        self.state.running.store(false, Ordering::Relaxed);
        self.state.is_ready.store(false, Ordering::Relaxed);
        self.emit(Event::Start);
    }

Start sets it to false 😢

@AzHicham
Copy link
Contributor Author

AzHicham commented Dec 2, 2024

haha thx I'll test against the fix ;)

@AzHicham
Copy link
Contributor Author

AzHicham commented Dec 2, 2024

Everything works as expected !!!

My test:

  • Push 3 tasks on redis queue
  • start a worker -> 1 job is launched against the 1st task.
  • before the end I send a SIGTERM.
  • the job finish then the worker shutdown.
  • In Redis I still have 2 tasks remaining.
  • I re-launch a worker and anothe job is launched against the 2nd task.

Nothing to say except Good job !!!!

Thanks for your help !!

@AzHicham
Copy link
Contributor Author

AzHicham commented Dec 9, 2024

Hello @geofmureithi

Sorry to bother you, I just wanted to know when you're gonna release a patch version ?

Thank you

@geofmureithi
Copy link
Owner

@AzHicham a new version has been released

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants