Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Rate Limiting #491

Open
theduke opened this issue Apr 10, 2019 · 17 comments
Open

Async Rate Limiting #491

theduke opened this issue Apr 10, 2019 · 17 comments

Comments

@theduke
Copy link
Contributor

theduke commented Apr 10, 2019

A lot of APIs have rate limits, and manually implementing a rate limiter over and over again is quite annoying to me.

There was some previous discussion in #169 but that was focused on a sync context.

@seanmonstar would you be open to a built in rate limiter?
I imagine it working like this:

  • keep a sliding list of past requests per domain which have rate limiting enabled ( token bucket or something simpler)
  • if limit is reached, delay requests as necessary by just returning a delayed future

A simple implementation without prioritization could cause cascading delays though. A fix for that would be a request queue.

The complexity for this would not be too high and I would really love that functionality.

An alternative solution to this would be to have a async middleware feature with a pre-request hook that can return a future. That way a third party crate could supply the functionality easily, but async middleware is probably a larger topic. (remotely related discussion regarding sync hooks here: #155)

@seanmonstar
Copy link
Owner

We've been working on a general middleware stack, and have a form of rate-limiting here: https://github.com/tower-rs/tower/blob/master/tower-limit/src/rate/service.rs

It'd probably be useful to make adjustments there.

@edrevo
Copy link

edrevo commented Jan 27, 2020

@seanmonstar I've seen examples on how to integrate the hyper client and tower. Is it also possible to do that with reqwest and tower? Is there any example code or documentation that I can use as a hint to build that integration?

Thanks for all of your work with warp, hyper, reqwest & co. It's really great to use your libs!

@seanmonstar
Copy link
Owner

@edrevo the easiest way is to just use tower::service_fn:

let client = reqwest::Client::new(); // or use builder

let svc = tower::service_fn(move |req| {
    client.execute(req)
});

@ardeaf
Copy link

ardeaf commented Feb 4, 2020

@seanmonstar Would you be able to provide a quick example on how to convert one of the reqwest examples into a rate-limited one?

ie, from the docs:

let client = reqwest::Client::new();
let res = client.post("http://httpbin.org/post")
    .body("the exact body that is sent")
    .send()
    .await?;

How would we refactor that into a rate limited request using tower-limit?

@Mathspy
Copy link
Contributor

Mathspy commented Apr 6, 2020

A bit late but here's an example of the above using tower if someone looks at this issue in the future

use reqwest::{Body, Method, Request, Url};
use std::time::Duration;
use tower::Service;
use tower::ServiceExt;

let client = reqwest::Client::new();
let mut svc = tower::ServiceBuilder::new()
    .rate_limit(100, Duration::new(10, 0)) // 100 requests every 10 seconds
    .service(tower::service_fn(move |req| client.execute(req)));

let mut req = Request::new(Method::POST, Url::parse("http://httpbin.org/post")?);
*req.body_mut() = Some(Body::from("the exact body that is sent"));

let res = svc.ready_and().await?.call(req).await?;

Definitely a bit more verbose but it works like a charm and the client and service needs to be setup only once anyway

@flexabyte
Copy link

flexabyte commented Aug 3, 2020

I've not had any success getting this working by storing the service in a struct... I can't find a proper way to declare the type of the Service. If I do:

struct ServiceStruct {
    service: dyn tower::Service<Response=HttpResponse,Error=HttpError,Future=Pin<Box<Result<HttpResponse,HttpError>>>>,
... more fields ...
}

let mut svc = tower::ServiceBuilder::new()
    .rate_limit(1, Duration::new(0, 1600)) // 1 request every 1600ms
    .service(tower::service_fn(move |req| client.execute(req)));

ServiceStruct { service: svc, ... more fields }    

But the type definition for tower::Service in the struct is wrong, and I can't find any usable documentation for declaring the type... has anyone had success with this?

UPDATE:

I had the wrong end of the stick with this one, but I got there in the end. For anyone looking to do something similar, here is the adapted code for storing a rate limit service in a struct:

use reqwest::{Body, Method, Request, Url};
use std::time::Duration;
use tower::Service;
use tower::ServiceExt;
use tower_limit::rate::RateLimit;
use tower_util::ServiceFn;

struct ServiceStruct<T> {
    service: RateLimit<ServiceFn<T>>,
}

#[tokio::main]
async fn main() {
    let client = reqwest::Client::new();
    let service = tower::ServiceBuilder::new()
        .rate_limit(100, Duration::new(10, 0)) // 100 requests every 10 seconds
        .service(tower::service_fn(move |req| client.execute(req)));

    let mut service_struct = ServiceStruct{service};

    let mut req = Request::new(Method::POST, Url::parse("http://httpbin.org/post").unwrap());
    *req.body_mut() = Some(Body::from("the exact body that is sent"));

    let res = service_struct.service.ready_and().await.unwrap().call(req).await.unwrap();
    println!("res: {:?}", res);
}

@svc-93
Copy link

svc-93 commented Jun 21, 2021

We've been working on a general middleware stack, and have a form of rate-limiting here: https://github.com/tower-rs/tower/blob/master/tower-limit/src/rate/service.rs

It'd probably be useful to make adjustments there.

This link seems to be non-functional now. Has there been any progress on this feature request? It'd be useful to have this available as an out-of-box addition!

@liamdawson
Copy link

@eute
Copy link

eute commented Aug 7, 2021

A bit late but here's an example of the above using tower if someone looks at this issue in the future

use reqwest::{Body, Method, Request, Url};
use std::time::Duration;
use tower::Service;
use tower::ServiceExt;

let client = reqwest::Client::new();
let mut svc = tower::ServiceBuilder::new()
    .rate_limit(100, Duration::new(10, 0)) // 100 requests every 10 seconds
    .service(tower::service_fn(move |req| client.execute(req)));

let mut req = Request::new(Method::POST, Url::parse("http://httpbin.org/post")?);
*req.body_mut() = Some(Body::from("the exact body that is sent"));

let res = svc.ready_and().await?.call(req).await?;

Definitely a bit more verbose but it works like a charm and the client and service needs to be setup only once anyway

How to use this in a multithreaded context where each thread uses the same tower service and needs to be constrained by the same rate limit?

@ilyazub
Copy link

ilyazub commented Nov 17, 2021

How to use this in a multithreaded context where each thread uses the same tower service and needs to be constrained by the same rate limit?

@eute Probably by using the rt-multi-thread feature of tokio.

# Cargo.toml
tokio = { version = "1.7.1", features = ["rt-multi-thread", "macros"] }
// src.main.rs

#[tokio::main]
async fn main() {}

The default "RuntimeFlavor" of tokio::main macros is multi_threaded. Source.

@sergeyshaykhullin
Copy link

@ilyazub I am getting the same problem as @eute. Service has to be mutable, so i can't share exclusive reference and operate concurrently

@deknowny
Copy link

deknowny commented May 19, 2022

If you are still finding a solution for client-side rate limiting, I made a little crate raliguard with Semaphore implementing fixed window algorithm to control execution times per a period. Here is an example of asynchronous usage where the semaphore is shared between threads. It also supports any async/await backends

use std::{thread, sync, time};

use raliguard::Semaphore;


// Create a semaphore with restriction `5 tasks per 1 second`
let original_sem = Semaphore::new(5, time::Duration::from_secs(1));

// Make it sharable between threads (or you can share between tasks)
let shared_sem = sync::Arc::new(
    sync::Mutex::new(original_sem)
);

// Spawn 15 threads
for _ in 0..15 {
    let cloned_sem = shared_sem.clone();
    let thread = thread::spawn(move || {
        // Lock mutex for exclusive usage
        let mut local_sem = cloned_sem.lock().unwrap();

        // Get required delay
        let calculated_delay = local_sem.calc_delay();
        
        // Release mutex, make semaphore available to use in another threads
        drop(local_sem);

        // If delay exists, sleep it
        if let Some(delay) = calculated_delay {
            thread::sleep(delay);
        }
        
        // Here you can do your requests or another stuff
    });
}

// Sleep 1 second. Only 10 threads will be completed at this time
// (first 5 with no delay and another 5 after a second)
thread::sleep(time::Duration::from_secs(1));

@Mathspy
Copy link
Contributor

Mathspy commented Aug 8, 2022

This is much much easier to do now after reqwest v0.11.11

Client now implements service so you can do this to return a service powered by reqwest from a function and store it into a struct

use std::time::Duration;

use reqwest::{Error, Request, Response};
use tower::Service;

fn example() -> impl Service<Request, Response = Response, Error = Error> {
    let client = reqwest::Client::new();

    tower::ServiceBuilder::new()
        .rate_limit(10, Duration::from_secs(5))
        .service(client)
}

struct Example<S>
where
    S: Service<Request, Response = Response, Error = Error>,
{
    service: S,
}

fn main() {
    let example = Example { service: example() };
}

If you need your service to be usable from multiple different contexts the easiest thing is to clone your service and pass the clones into those contexts. However, RateLimit prevents services from being cloned because then each service would have its own rate limits and that defeats the purpose of rate limits, so we can use a Buffer service (which happens to have a different error type) like so:

diff --git a/src/main.rs b/src/main.rs
index d39d616..8547d43 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,19 +1,20 @@
 use std::time::Duration;
 
 use reqwest::{Error, Request, Response};
-use tower::Service;
+use tower::{BoxError, Service};
 
-fn example() -> impl Service<Request, Response = Response, Error = Error> {
+fn example() -> impl Service<Request, Response = Response, Error = BoxError> {
     let client = reqwest::Client::new();
 
     tower::ServiceBuilder::new()
+        .buffer(100)
         .rate_limit(10, Duration::from_secs(5))
         .service(client)
 }
 
 struct Example<S>
 where
-    S: Service<Request, Response = Response, Error = Error>,
+    S: Service<Request, Response = Response, Error = BoxError>,
 {
     service: S,
 }

@nirvana-msu
Copy link

nirvana-msu commented Oct 5, 2022

Do I understand correctly, that RequestBuilder pattern is not compatible with using tower service? RequestBuilder is initialized with Client - which is fine when using reqwest on its own, but not when we need to use tower that wraps the client..

Setting body may be as easy as assigning to *req.body_mut(), however properly updating headers (as well as other things builder helps with) can be a lot more involved.

@seanmonstar is there a chance RequestBuilder could be decoupled from Client somehow? The only place it actually uses the client field is within send() method... building the request itself doesn't need the client at all. Obviously main concern here would be API backwards compatibility - otherwise what I want could be achieved e.g. by just wrapping client field with Option.

As a workaround I can actually use some dummy client when instantiating RequestBuilder, and then instead of using send() I can get request with build(), and then send using tower service. It's just slightly annoying that I need to pass in the dummy client which will never be used.

@nirvana-msu
Copy link

nirvana-msu commented Nov 27, 2022

RateLimit prevents services from being cloned because then each service would have its own rate limits and that defeats the purpose of rate limits, so we can use a Buffer service like so

Doesn't this introduce a race condition?

RateLimit does not reserve any capacity upon poll_ready, unlike e.g. ConcurrencyLimit. The only thing that prevents race condition when using RateLimit by itself is that poll_ready/call methods take &mut self, so you can't invoke those concurrently from different tasks without cloning (and this presumably is the main reason Clone wasn't implemented for them, unlike was done for ConcurrencyLimit).

Putting Buffer service in front with bound larger than 1 sidesteps that - it is now possible for multiple (up to bound) tasks to obtain permission from poll_ready, all without actually reserving any rate limit capacity - so when they proceed to call, there may not be enough capacity for them all (only capacity for one is guaranteed).

@xylonx
Copy link

xylonx commented Mar 20, 2023

In tower::ServiceBuilder, I have found an example declaring as "containing rate limiting, in-flight request limits, and a channel-backed, clonable Service". below is the the example codes, or you can find it here

ServiceBuilder::new()
    .buffer(5)
    .concurrency_limit(5)
    .rate_limit(5, Duration::from_secs(1))
    .service(svc);

Based on that, I have written my channel-based wrapper and it indeed works. I post it below and hope it can help someone. If it introduces some race conditions, please be easy to point it :)

use std::time::Duration;

use anyhow::{anyhow, Result};
use futures::{
    channel::{mpsc, oneshot},
    SinkExt, StreamExt,
};
use log::error;
use reqwest::{Client, Request, Response};
use tower::{Service, ServiceExt};

#[derive(Debug)]
pub struct LimitedRequestClient {
    request_tx: mpsc::Sender<(Request, oneshot::Sender<Result<Response>>)>,
}

impl LimitedRequestClient {
    /// [buffer] -> [concurrency req pool] - :{rate limit}: -> client.call()
    pub fn new(
        client: Client,
        channel_buffer_size: usize,
        request_buffer_size: usize,
        max_concurrency_number: usize,
        rate_limit_number: u64,
        rate_limit_duration: Duration,
    ) -> Self {
        let (tx, rx) =
            mpsc::channel::<(Request, oneshot::Sender<Result<Response>>)>(channel_buffer_size);

        tokio::spawn(async move {
            let service = tower::ServiceBuilder::new()
                .buffer(request_buffer_size)
                .concurrency_limit(max_concurrency_number)
                .rate_limit(rate_limit_number, rate_limit_duration)
                .service(client.clone());
            rx.for_each_concurrent(max_concurrency_number, move |(req, resp_tx)| {
                let mut inner_service = service.clone();
                async move {
                    let resp = match inner_service.ready().await {
                        Ok(srv) => match srv.call(req).await {
                            Ok(r) => Ok(r),
                            Err(e) => Err(anyhow!(
                                "LimitedRequestClient: service call request failed: {}",
                                e
                            )),
                        },
                        Err(e) => Err(anyhow!("LimitedRequestClient: service ready failed: {}", e)),
                    };
                    match resp_tx.send(resp) {
                        Ok(_) => (),
                        Err(_) => error!(
                            "LimitedRequestClient: send resp to resp_tx failed: channel closed"
                        ),
                    }
                }
            })
            .await // keep it in-flight
        });
        Self { request_tx: tx }
    }

    pub async fn request(&self, req: Request) -> Result<Response> {
        let (tx, rx) = oneshot::channel::<Result<Response>>();
        self.request_tx.clone().send((req, tx)).await?;
        rx.await?
    }
}

@JohnScience
Copy link

@nirvana-msu

I have a suggestion on what can be done about reqwest::RequestBuilder. See the discussion #2377.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests