Skip to content

Conversation

@grll
Copy link

@grll grll commented May 15, 2025

Add a quite popular feature request: Rate Limit and Retry support for models.

It is implemented as a wrapper like the InstrumentedModel. It leverages aiolimiter for a simple implementation of the leaking bucket algorithm for rate limit while retry leverage the tenacity library.

Usage:

```python
model = RateLimitedModel(
    "anthropic:claude-3-7-sonnet-latest",
    limiter=AsyncLimiter(1, 1),
    retryer=AsyncRetrying(stop=stop_after_attempt(3)),
)
agent = Agent(model=model)
```

@grll
Copy link
Author

grll commented May 15, 2025

Fixes #1732
Fixes #782

@DouweM
Copy link
Collaborator

DouweM commented May 19, 2025

@grll Nice work! Looks like we still have some failing tests, let me know if you'd like some guidance there.

As for the code, can we please reduce the duplication a bit by moving the if self.limiter:/else stuff to an inline function called from both sides of the if self.retry:/else branch?

Copy link

@tekumara tekumara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this is nicely done.

@grll @DouweM what do you think about having a GCRA implementation as implemented by something like https://github.com/ZhuoZhuoCrayon/throttled-py ?

It's a more efficient variant of the leaky bucket algorithm without its downsides (ie: doesn't need a background "drip" process). See https://brandur.org/rate-limiting

@grll
Copy link
Author

grll commented May 20, 2025

@grll Nice work! Looks like we still have some failing tests, let me know if you'd like some guidance there.

As for the code, can we please reduce the duplication a bit by moving the if self.limiter:/else stuff to an inline function called from both sides of the if self.retry:/else branch?

Hey thanks for review, I will have a look was indeed struggling a bit with the test in the CI while it was working ok locally need to investigate a bit more. I can also have a look at the refactor you suggested!

@grll
Copy link
Author

grll commented May 20, 2025

+1 this is nicely done.

@grll @DouweM what do you think about having a GCRA implementation as implemented by something like https://github.com/ZhuoZhuoCrayon/throttled-py ?

It's a more efficient variant of the leaky bucket algorithm without its downsides (ie: doesn't need a background "drip" process). See https://brandur.org/rate-limiting

Hey thanks for the suggestion, the initial goal with using aiolimiter was to keep things as simple as possible but we could instead integrated with throttled-py in order to let user choose from various algorithm / redis backend as well.

@ZhuoZhuoCrayon
Copy link

ZhuoZhuoCrayon commented May 21, 2025

+1 this is nicely done.
@grll @DouweM what do you think about having a GCRA implementation as implemented by something like https://github.com/ZhuoZhuoCrayon/throttled-py ?
It's a more efficient variant of the leaky bucket algorithm without its downsides (ie: doesn't need a background "drip" process). See https://brandur.org/rate-limiting

Hey thanks for the suggestion, the initial goal with using aiolimiter was to keep things as simple as possible but we could instead integrated with throttled-py in order to let user choose from various algorithm / redis backend as well.

@grll I am the developer of throttled-py. throttled-py provides flexible current limiting strategies and storage backend configurations. I am very willing to participate in the discussion and implementation of this PR.

@grll
Copy link
Author

grll commented May 21, 2025

+1 this is nicely done.
@grll @DouweM what do you think about having a GCRA implementation as implemented by something like https://github.com/ZhuoZhuoCrayon/throttled-py ?
It's a more efficient variant of the leaky bucket algorithm without its downsides (ie: doesn't need a background "drip" process). See https://brandur.org/rate-limiting

Hey thanks for the suggestion, the initial goal with using aiolimiter was to keep things as simple as possible but we could instead integrated with throttled-py in order to let user choose from various algorithm / redis backend as well.

@grll I am the developer of throttled-py. throttled-py provides flexible current limiting strategies and storage backend configurations. I am very willing to participate in the discussion and implementation of this PR.

Hey @ZhuoZhuoCrayon thanks for jumping in and great work in throttled-py. I am concerned about one thing here is that asyncio support is limited in the current implementatino of throttled-py. What is the current status around asyncio?

@ZhuoZhuoCrayon
Copy link

+1 this is nicely done.
@grll @DouweM what do you think about having a GCRA implementation as implemented by something like https://github.com/ZhuoZhuoCrayon/throttled-py ?
It's a more efficient variant of the leaky bucket algorithm without its downsides (ie: doesn't need a background "drip" process). See https://brandur.org/rate-limiting

Hey thanks for the suggestion, the initial goal with using aiolimiter was to keep things as simple as possible but we could instead integrated with throttled-py in order to let user choose from various algorithm / redis backend as well.

@grll I am the developer of throttled-py. throttled-py provides flexible current limiting strategies and storage backend configurations. I am very willing to participate in the discussion and implementation of this PR.

Hey @ZhuoZhuoCrayon thanks for jumping in and great work in throttled-py. I am concerned about one thing here is that asyncio support is limited in the current implementatino of throttled-py. What is the current status around asyncio?

@grll Features are in the development branch and a stable version will be released this weekend

@grll
Copy link
Author

grll commented May 24, 2025

@DouweM or @dmontagu any thoughts on the above?

@ZhuoZhuoCrayon
Copy link

ZhuoZhuoCrayon commented May 25, 2025

Hi, @grll and @tekumara !

throttled-py asynchronous support has been officially released in v2.1.0.

The core API is the same for synchronous and asynchronous code, just replace from throttled import ... with from throttled.asyncio import ... in your code:

import asyncio
from throttled.asyncio import RateLimiterType, Throttled, rate_limiter, store, utils

throttle = Throttled(
    using=RateLimiterType.GCRA.value,
    quota=rate_limiter.per_sec(1_000, burst=1_000),
    store=store.MemoryStore(),
)


async def call_api() -> bool:
    result = await throttle.limit("/ping", cost=1)
    return result.limited


async def main():
    benchmark: utils.Benchmark = utils.Benchmark()
    denied_num: int = sum(await benchmark.async_serial(call_api, 100_000))
    print(f"❌ Denied: {denied_num} requests")

if __name__ == "__main__":
    asyncio.run(main())

I think GCRA has lower performance overhead and smoother throttling. At the same time, making the storage backend optional can increase subsequent scalability. Anyway, adding throttling and retries to the model is very cool, thank you!

@DouweM
Copy link
Collaborator

DouweM commented May 26, 2025

@ZhuoZhuoCrayon Thanks a ton, throttled looks great!

@grll Are you up for changing this PR to use throttled instead?

@grll
Copy link
Author

grll commented May 26, 2025

@ZhuoZhuoCrayon Thanks a ton, throttled looks great!

@grll Are you up for changing this PR to use throttled instead?

Sure happy to!

@ZhuoZhuoCrayon
Copy link

Hi @grll ,
Could you share the current progress on this PR?
Is there anything I can assist with to move it forward? 😊

@grll
Copy link
Author

grll commented Jun 2, 2025

@ZhuoZhuoCrayon thanks for the reminder, busy times. I am on it now! I will switch throttled and fix the CI :)

@grll
Copy link
Author

grll commented Jun 2, 2025

@ZhuoZhuoCrayon

I have made the changes but it seems throttled is not working as expected unless I am doing something wrong:

#!/usr/bin/env python3
import asyncio
import time
from throttled.asyncio import RateLimiterType, Throttled, rate_limiter, store


async def test_throttled():
    # Create a throttle that allows 2 requests per second
    throttle = Throttled(
        using=RateLimiterType.GCRA.value,
        quota=rate_limiter.per_sec(2),
        store=store.MemoryStore(),
    )

    print("Testing Throttled rate limiter...")
    start_time = time.time()

    # Make 5 sequential requests
    for i in range(5):
        await throttle.limit('default', cost=1)
        print(f"Request {i+1} completed at {time.time() - start_time:.2f}s")

    total_time = time.time() - start_time
    print(f"\nTotal time for 5 requests at 2/sec: {total_time:.2f}s")
    print(f"Expected time: ~2.0s (5 requests / 2 per sec - 1)")


if __name__ == "__main__":
    asyncio.run(test_throttled())

Produces:

Testing Throttled rate limiter...
Request 1 completed at 0.00s
Request 2 completed at 0.00s
Request 3 completed at 0.00s
Request 4 completed at 0.00s
Request 5 completed at 0.00s

Total time for 5 requests at 2/sec: 0.00s
Expected time: ~2.0s (5 requests / 2 per sec - 1)

@ZhuoZhuoCrayon
Copy link

ZhuoZhuoCrayon commented Jun 2, 2025

@grll

In function call mode, limit returns RateLimitResult immediately. You can use RateLimitResult.limited to determine whether the rate limit is triggered:

    # Make 5 sequential requests
    for i in range(5):
        await result = throttle.limit('default', cost=1)
        # Here's the problem.
        assert result.limited

You can specify a timeout to enable wait-and-retry behavior, The rate limiter will retry automatically, returns the final RateLimitResult when the request is allowed or timeout reached:

    # Make 5 sequential requests
    for i in range(5):
        # Set timeout=5 to enable wait-and-retry (max wait 5 second), returns the final RateLimitResult.
        await result = throttle.limit('default', cost=1, timeout=5)
        assert result.limited

In addition, in decorator, and context manager modes, triggering current limiting will throw LimitedError:

#!/usr/bin/env python3
import asyncio
import time
from throttled.asyncio import RateLimiterType, Throttled, rate_limiter, store, exceptions


async def test_throttled():
    # Create a throttle that allows 2 requests per second
    throttle = Throttled(
        key='default',
        # You can use `timeout` to enable wait-retry mode.
        # timeout=1,
        using=RateLimiterType.GCRA.value,
        quota=rate_limiter.per_sec(2),
        store=store.MemoryStore(),
    )

    print("Testing Throttled rate limiter...")
    start_time = time.time()

    # Make 5 sequential requests
    for i in range(5):
        # If no timeout is set, exceptions.LimitedError will be thrown on the third execution.
        async with throttle:
            print(f"Request {i+1} completed at {time.time() - start_time:.2f}s")

    total_time = time.time() - start_time
    print(f"\nTotal time for 5 requests at 2/sec: {total_time:.2f}s")


if __name__ == "__main__":
    asyncio.run(test_throttled())

@ZhuoZhuoCrayon
Copy link

ZhuoZhuoCrayon commented Jun 2, 2025

@grll per_sec(2) is equivalent to per_sec(2, burst=2), which means allows 2 requests per second, and allows 2 burst requests(🪣 Bucket's capacity). In other words, this limiter will consume the burst after 2 requests. If timeout>=0.5 is set, the above example will complete all requests in 1.5 seconds (the burst is consumed immediately, and the 3 requests will be filled in the subsequent 1.5s)

#!/usr/bin/env python3
import asyncio
import time
from throttled.asyncio import RateLimiterType, Throttled, rate_limiter, store


async def test_throttled():
    # Create a throttle that allows 2 requests per second
    throttle = Throttled(
        using=RateLimiterType.GCRA.value,
        quota=rate_limiter.per_sec(2),
        store=store.MemoryStore(),
    )

    print("Testing Throttled rate limiter...")
    start_time = time.time()

    # Make 5 sequential requests
    for i in range(5):
        # ⏳ Set timeout=0.5 to enable wait-and-retry (max wait 0.5 second)
        await throttle.limit('default', cost=1, timeout=0.5)
        print(f"Request {i+1} completed at {time.time() - start_time:.2f}s")

    total_time = time.time() - start_time
    print(f"\nTotal time for 5 requests at 2/sec: {total_time:.2f}s")
    print(f"Expected time: ~1.5s")


if __name__ == "__main__":
    asyncio.run(test_throttled())
Testing Throttled rate limiter...
------------- Burst----------------------------
Request 1 completed at 0.00s
Request 2 completed at 0.00s
-----------------------------------------------
------------ Refill: 0.5 tokens per second ------
Request 3 completed at 0.50s
Request 4 completed at 1.00s
Request 5 completed at 1.50s
-----------------------------------------------

Total time for 5 requests at 2/sec: 1.50s
Expected time: ~1.5s

grll added 3 commits June 2, 2025 23:40
Replace aiolimiter with throttled-py (>=2.2.0) as the rate limiting library
for RateLimitedModel. Throttled provides more flexible rate limiting with
different algorithms (GCRA, FixedWindow) and better control over blocking
behavior.
- Change from aiolimiter.AsyncLimiter to throttled.asyncio.Throttled
- Update usage from context manager to explicit limit() calls
- Add key, cost, and timeout parameters to request methods
- Set default timeout to 30s to enable blocking behavior
- Update docstring with new usage example

The timeout parameter is crucial - without it or with -1, the rate limiter
returns immediately instead of waiting for capacity.
- Replace AsyncLimiter with Throttled in all tests
- Remove unused imports (AsyncMock, patch)
- Update rate limiter configurations to use GCRA algorithm
- Fix concurrent requests test to verify actual rate limiting
- Adjust timing expectations to account for GCRA burst behavior

The concurrent requests test now properly verifies that rate limits are
enforced by measuring actual execution time.
@grll grll force-pushed the grll/rate-limited branch from ab12b79 to 33796b4 Compare June 2, 2025 21:42
grll added 3 commits June 2, 2025 23:45
- Add test for RuntimeError when retryer is exhausted without exceptions
- Test FailingModel properties (system, model_name) to cover lines 34, 38
- Test SuccessResponseStream properties (model_name, timestamp) to cover lines 90, 94
- Achieve 100% test coverage for rate_limited.py and test_rate_limited.py
- Use string 'gcra' instead of RateLimiterType.GCRA enum
- Import MemoryStore from throttled.asyncio.store
- Replace unused loop variable 'i' with '_'
- Fix __aiter__ override to properly implement async iteration protocol
- Update docstring examples to match correct usage
@grll
Copy link
Author

grll commented Jun 2, 2025

@DouweM look like we are ready for a review!

PS: That's definetly the toughest CI I have ever seen but I guess coming from typing experts no surprise here. Anyway I got to the bottom of it.

raise RuntimeError('Model request failed after all retries')
else:
if self.limiter:
await self.limiter.limit(key, cost, timeout)
Copy link

@ZhuoZhuoCrayon ZhuoZhuoCrayon Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DouweM @grll

Could we consider verifying the rate limit result(RateLimitResult.limited=False When the rate limit is still exceeded after the timeout) at this point and raising a LimitedError (or another exception compliant with the pydantic-ai specification)?

For example:

if self.limiter:
    await result = self.limiter.limit(key, cost, timeout)
    # 💡 Check if the limit has been exceeded.
    if result.limited:
        raise RuntimeError('Rate limit exceeded.')
return await super().request(...)

I am concerned that skipping the check and still executing the request after exceeding the rate limit may cause the model to encounter unpredictable third-party exception errors.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good point I will adjust

@grll
Copy link
Author

grll commented Jun 6, 2025

@DouweM any chance we get some review soon on this? I will address the point above but wanted to also combine with your feedback if possible

model_request_parameters: ModelRequestParameters,
key: str = 'default',
cost: int = 1,
timeout: float | None = 30.0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to move these to the initializer? Or could they be values on the limiter that's passed? Users don't call request and request_stream directly, so they wouldn't have an obvious way to set these values.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can move the parameters to the initialization stage:

throttle = Throttled(
    using='gcra',
    quota=rate_limiter.per_sec(1_000, burst=1_000),
    # store can be omitted, the global MemoryStore is provided by default.
    # store=MemoryStore(),
    key='default',
    cost=1,
    timeout=30
)

messages: list[ModelMessage],
model_settings: ModelSettings | None,
model_request_parameters: ModelRequestParameters,
key: str = 'default',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the implication of this value being the same for all models (unless it's overwritten)? Should we use a different value for each model/agent?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@grll @DouweM The target of rate limiting is the LLMs API, which is different for each model and needs to be independent. Can we use self.model_name as the default key unless a specific key already exists for the limiter?

if self.limiter:
    # 💡 Priority: self.limiter.key > self.model_name.
    await result = self.limiter.limit(self.limiter.key or self.model_name)
    ....
...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZhuoZhuoCrayon I think that makes sense, but let's add in the provider as well: {self.system}:{self.model_name}

"opentelemetry-api>=1.28.0",
"typing-inspection>=0.4.0",
"tenacity>=9.1.2",
"throttled-py>=2.2.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kludex What do you think of these being included by default? Should we put them in an optional dependency group?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add these to a new optional dependency group called rate_limiting, and add a try:/except ImportError: block to models/rate_limited.py like we have on providers/openai.py that suggests installing with that dependency group.

from pydantic_ai.usage import Usage


class FailingModel(Model):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already do some testing with fake failing models in tests/models/test_fallback.py. Could we borrow the approach taken there using FunctionModel?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update your uv, reset uv.lock to the version from main, and run make install again? We should see only a handful of changes here for the new packages, not "2,071 additions, 2,049 deletions" :)

@ZhuoZhuoCrayon
Copy link

@grll Besides making dependencies optional, perhaps we could address other reviews first?

@DouweM @Kludex Looking forward to your suggestions regarding the discussion above. 😊

@DouweM
Copy link
Collaborator

DouweM commented Jun 10, 2025

@ZhuoZhuoCrayon What exactly did you want our take on? 😄 I assumed the conversation between you and @grll about how throttled works would just affect the implementation here, but let me know if there's something in particular you needed our opinion on!

@DouweM
Copy link
Collaborator

DouweM commented Jun 11, 2025

@grll I had a chat about this with @Kludex, and we've got a few thoughts!

  1. Does this actually address running into request rate limiting error frequently for openAI models #782 in the most appropriate way? I'd expect us to want to specifically handle HTTP status 429 and look at the response headers to know when the API wants us to try again. That'd require some model-class specific logic, but it'd work out of the box, would wait exactly as much as necessary, and not require you to come up with your own throttling rules like quota=rate_limiter.per_sec(2).

    As implemented, the RateLimitedModel effectively lets you make your own model/agent rate limited from the perspective of its end users, which could potentially be useful in some scenarios, but it doesn't seem to address the model API rate limiting issue directly or optimally.

  2. Letting the user specify that a model should retry its request a couple of times when it fails for non-rate-limiting reasons, similar to OpenAI's max_retries mentioned in Easier way of configuring max_retries for OpenAI and Azure clients/providers #1732, can be useful if APIs are unstable. For that simple case, using a separate model class ideally wouldn't be necessary: we could add a new request_retries option on Agent, like we already have output_retries and retries (which should really be tool_retries).

If we try to separately 1) allow request retries and 2) handle API rate limits, I think we'd end up with a different implementation than what we got here by conflating them as they are.

What do you think? My bad for letting you run in this direction for a bit before properly thinking it through 😅

@grll
Copy link
Author

grll commented Jun 12, 2025

@grll I had a chat about this with @Kludex, and we've got a few thoughts!

  1. Does this actually address running into request rate limiting error frequently for openAI models #782 in the most appropriate way? I'd expect us to want to specifically handle HTTP status 429 and look at the response headers to know when the API wants us to try again. That'd require some model-class specific logic, but it'd work out of the box, would wait exactly as much as necessary, and not require you to come up with your own throttling rules like quota=rate_limiter.per_sec(2).

    As implemented, the RateLimitedModel effectively lets you make your own model/agent rate limited from the perspective of its end users, which could potentially be useful in some scenarios, but it doesn't seem to address the model API rate limiting issue directly or optimally.

  2. Letting the user specify that a model should retry its request a couple of times when it fails for non-rate-limiting reasons, similar to OpenAI's max_retries mentioned in Easier way of configuring max_retries for OpenAI and Azure clients/providers #1732, can be useful if APIs are unstable. For that simple case, using a separate model class ideally wouldn't be necessary: we could add a new request_retries option on Agent, like we already have output_retries and retries (which should really be tool_retries).

If we try to separately 1) allow request retries and 2) handle API rate limits, I think we'd end up with a different implementation than what we got here by conflating them as they are.

What do you think? My bad for letting you run in this direction for a bit before properly thinking it through 😅

Hey @DouweM no worries at all, this week is really packed with events, but I have in my todos to review your proposal to split. One thing that came to mind is that OpenAI is one thing but I am not sure all providers populate retry headers in a consistent and similar way so we might have challenges there

@Kludex
Copy link
Member

Kludex commented Jun 12, 2025

Those headers are widely used. Every serious provider should have it.

@DouweM DouweM mentioned this pull request Jun 16, 2025
@DouweM
Copy link
Collaborator

DouweM commented Jul 7, 2025

@grll I'm closing this PR as it's been a few weeks with no response; if you're still interested in implementing this feel free to reopen it!

@DouweM DouweM closed this Jul 7, 2025
@grll
Copy link
Author

grll commented Jul 8, 2025

@grll I'm closing this PR as it's been a few weeks with no response; if you're still interested in implementing this feel free to reopen it!

@DouweM I still have it in the back of my mind but need to dedicate a bit of time on it. Is it something that you would need still?

@DouweM
Copy link
Collaborator

DouweM commented Jul 8, 2025

@grll If we could make it use the rate limit headers, definitely!

@DouweM
Copy link
Collaborator

DouweM commented Jul 23, 2025

#2282 implements support for HTTP 429 with respect for the Retry-After header

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants