Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Freeze when using Arc<RateLimiter> from multiple await points on same task #35

Open
idobenamram opened this issue Sep 26, 2024 · 13 comments

Comments

@idobenamram
Copy link

Hey, This might just be miss using the feature but I came across a weird problem.

I create a RateLimter:

    let limiter = Arc::new(
        leaky_bucket::RateLimiter::builder()
            .refill(some_num)
            .max(sum_num * 50)
            .interval(std::time::Duration::from_secs(1))
            .build(),
    );

I give a the cloned Arc To each table

first_table::build().with_limiter(limiter.clone());
second_table::build().with_limiter(limiter.clone());
third_table::build().with_limiter(limiter.clone());

I have a single task that tries to insert to all tables some amount of rows like so.

 tokio::try_join!(
                    first_table.insert(rows),
                    second_table.insert(rows),
                    third.insert(rows),
);

In the insert function we try to first acquire the limiter for the amount of bytes we want to write.

...
limiter.acquire(size).await;
...
// insert rows

at some point it looks like all acquires freeze and the task is no longer making progress.

I read the part about the implementation detail here. and i am wondering if the freeze has something to do with the core switching functionality.

Any help would be appreciated!
Thanks

@udoprog
Copy link
Owner

udoprog commented Sep 27, 2024

Hi!

If you can post a minimal isolated reproduction that would be great. If you can add tracing to your project, you can enable the tracing feature in leaky-bucket, it might be possible to suss out what's going on.

You should be able to use leaky_bucket=trace to only see traces for leaky-bucket.

@udoprog udoprog added the bug Something isn't working label Sep 27, 2024
@idobenamram
Copy link
Author

Hi! currently it happened in production so its kinda hard to know why (tho it is in an isolated tokio task), it was working for quite some time (~ a week) before getting stuck. i will try this week to reproduce it but it might be a little hard.
ill also try to add tracing this week which might give us a clue. if you have any more suggestions as to why or what could trigger this it would be great!

@idobenamram
Copy link
Author

idobenamram commented Sep 28, 2024

also i will say, as a hunch, that this is something pretty new because we have been working with your project for a while and i have only seen this now. I am wondering if it might be new versions of stuff (rust, tokio, other deps)

@idobenamram
Copy link
Author

i have been trying to reproduce it with a simple code, but have not been able to...

use std::sync::Arc;

use rand::{rngs::StdRng, Rng, SeedableRng};

async fn inner_task(task_num: usize, limiter: Arc<leaky_bucket::RateLimiter>) -> Result<(), ()> {
    let mut thread_rng = rand::rngs::StdRng::from_entropy();

    let random = thread_rng.gen_range(10_000..100_000);
    tracing::info!("acquiring {task_num} - {random}");
    limiter.acquire(random).await;
    tracing::info!("acquired {task_num}");

    tokio::time::sleep(std::time::Duration::from_millis(100)).await;

    Ok(())
}

async fn my_task() {
    let some_num = 100_000_000;
    let limiter = Arc::new(
        leaky_bucket::RateLimiter::builder()
            .refill(some_num)
            .max(some_num * 50)
            .interval(std::time::Duration::from_secs(1))
            .build(),
    );

    loop {
        tokio::try_join!(
            inner_task(1, limiter.clone()),
            inner_task(2, limiter.clone()),
            inner_task(3, limiter.clone()),
            inner_task(4, limiter.clone()),
            inner_task(5, limiter.clone()),
        );
    }
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    tracing::info!("starting");
    let handle = tokio::spawn(my_task());
    handle.await;
    tracing::info!("done");
}

it ran for like 4 days with no problem. not sure how else to reproduce.

@udoprog
Copy link
Owner

udoprog commented Oct 6, 2024

How are you narrowing down where the freeze is happening? How do you know it's in leaky-bucket and not in the database connection?

@idobenamram
Copy link
Author

we have a time limit on our insert so we can see if it fails.

@udoprog
Copy link
Owner

udoprog commented Oct 9, 2024

Assuming the timeout is on top of both layers (rate limiter + database), wouldn't the timeout trigger if the stall is in the database layer as well not allowing you to distinguish them?

Cause if the timeout is in the database interaction, then the rate limiter necessarily have to have let the request through to start the timeout no?

@idobenamram
Copy link
Author

idobenamram commented Oct 9, 2024

yes, but the timeout is only on the insert, meaning it looks something like

fn insert(rows: _) {
 
    // first try to acquire
    self.limiter.acquire(size).await; <- i think we are stuck here

    // once we acquire we can try to insert
   self.table.insert_rows(rows, timeout) <- this can time out


}

@idobenamram
Copy link
Author

we currently also work around this problem by adding a timeout on the acquire

@udoprog
Copy link
Owner

udoprog commented Oct 9, 2024

yes, but the timeout is only on the insert, meaning it looks something like

fn insert_rows(rows: _) {
    // first try to acquire
    self.limiter.acquire(size).await; <- i think we are stuck here

    // once we acquire we can try to insert
   self.table.insert(rows, timeout) <- this can time out
}

I think this is what I'm not entirely getting, you said you diagnosed the problem initially by hitting the second timeout, but you can only hit the second timeout if you have passed the rate limiter. So from this, how did you know the rate limiter was blocked?

we currently also work around this problem by adding a timeout on the acquire

Since the rate limiter is expected to block, if we want to affirm there is a bug in this scenario when hitting a timeout we'd also have to make sure the rate limiter isn't just throttling as expected which it reasonably could do if it's trying to acquire enough tokens. In this case if it's empty, and you are acquiring some_num * 50 tokens with your configuration it is expected to take 50 seconds (you have it configured to refill some_num every second). More if more tasks are waiting.

@idobenamram
Copy link
Author

im sorry, i think the problem is im not clear enough. I will try to explain again, this time from the beginning.
We have a metric that shows how many bytes are written to the database using prometheus. Originally what we saw is that randomly at some point the metric stopped going up.
Our insertions are in their own isolated task which receives data to insert via a broadcast channel.
We have a metric that shows the current state of the channel (on both sides) and we can see that on the sending side its totally full, meaning some receiver is not keeping up, in this case we only have 2 and we quickly see that the other one is working as expected.

This leaves us with the insertion task which is stuck for some reason.

this is the insertion loop:

 loop {
        match events_rx.recv().await {
            Ok(Events {
                events, 
                ...
            }) => {
               ... turn events into rows .... cannot block

                if let Err(e) = tokio::try_join!(
                   first_table.insert(rows),
                    second_table.insert(rows),
                    third.insert(rows),
                   ...
                ) {
                    tracing::error!(e=?e, "Failed to insert rows to BQ");
                }
            }
            Err(RecvError::Closed) => {
                tracing::error!("writer received a closed channel event");
                break;
            }
            Err(RecvError::Lagged(skipped)) => {
                tracing::error!(
                    skipped = skipped,
                    "writer received a lagged channel event"
                );
                crate::metrics::EVENT_LAGGED
                    .with_label_values(&["database"])
                    .inc();
            }
        }
        crate::metrics::EVENT_QUEUE_RX_SIZE
            .with_label_values(&["database"])
            .set(events_rx.len() as i64);
}

now, the only 2 places that can get stuck are the channel recv and the try_join, because the other task is able to continue reading from the channel i think it is less likely to be the culprit, which leaves us with try_join.
here is the insertion function:

async fn insert(rows) {
   ... calculate size ...

    // we have limiter
    if let Some(limiter) = limiter {
        limiter.acquire(size).await;
    }

    // we have metrics feature enabled which shows us how many bytes are written
    #[cfg(feature = "metrics")]
    if let Some(metrics) = metrics {
        metrics.inc_by(size as u64);
    }

        // this is just a post which has a timeout
        client
        .insert_all(project_id, dataset_id, table_id, req)
        .await
}

based on this we concluded (tho never proved) that we are getting stuck on the acquire(size) call.
I tried reproducing with the small example code i wrote and let it run for 4 days (on my mac), but it didn't work.
also the problem doesn't happen often in production as it can run for weeks without occurring. and so its very hard to debug.

hopefully that clears up any miss understanding, i will try it again, but this time running on the same hardware that production runs on instead of my mac, and make sure i have exactly the same versions as in production. also im tracking production and waiting for it to happen again, maybe. this time i can connect a debugger, or dump the state.

sorry for the trouble

@udoprog
Copy link
Owner

udoprog commented Oct 14, 2024

All right, so what I suggest you do is to add more instrumentation.

For each step you suspect there might be a freeze, add logs like this:

let id = random_transaction_id();
let span = tracing::info_span!(Level::INFO, "transaction", ?id);

let task = async {
    tracing::info!("start");

    if let Some(limiter) = limiter {
        tracing::info!(?size, "checking limiter");
        // 
    }

    if let Some(metrics) = metrics {
        tracing::info!(?size, "recording metrics");
        metrics.inc_by(size as u64);
    }

    tracing::info!("inserting into database");

    // 

    tracing::info!("done");
};

task.instrument(span).await

You can then divert "transaction" into a separate log file, and the next time you see a freeze, you can process it with a script to find any id which does not have a complete history (from the start to done message), that should tell you where it froze.

Since I'm not sure what to do now, I'll mark this as needs info.

@udoprog udoprog added needs info and removed bug Something isn't working labels Oct 14, 2024
@idobenamram
Copy link
Author

hi! wanted to update you that i think we found the culprit for the hang and its not Limiter. We experienced the hang again, and this time took a core dump (ya not the best way), and we saw that all threads are stuck and one thread is continuously waiting on select. We think the issue is with the insert more specifically it looks like this might be a known issue in hyper: hyperium/hyper#2312. there is a quick fix mention in the issue i linked we are waiting to see if we still see the hang.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants