Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OpenAI Rate limiting #1805

Merged
merged 24 commits into from
Nov 29, 2023
Merged
Changes from 2 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b7711b6
Implement adaptive rate limiter for OpenAI
anticorrelator Nov 27, 2023
48006c8
Add adaptive rate limiter to Bedrock model
anticorrelator Nov 27, 2023
cbc355b
Use a sensible default maximum request rate
anticorrelator Nov 27, 2023
c35f111
Ruff 🐶
anticorrelator Nov 27, 2023
ff885af
Mark test as xfail after llama_index update
anticorrelator Nov 27, 2023
bd3d311
Do not retry on rate limit errors with tenacity
anticorrelator Nov 27, 2023
cc72439
Merge remote-tracking branch 'origin' into dustin/implement-ratelimiter
anticorrelator Nov 27, 2023
19e0dff
Remove xfail after llama_index version lock
anticorrelator Nov 27, 2023
af535c7
Use events and locks instead of nesting asyncio.run
anticorrelator Nov 27, 2023
cffcc72
Ensure that events are always set after rate limit handling
anticorrelator Nov 27, 2023
61ff1d3
Retry on httpx ReadTimeout errors
anticorrelator Nov 28, 2023
fee9743
Update rate limiters with verbose generation info
anticorrelator Nov 28, 2023
e06c7aa
Improve end of queue handling in AsyncExecutor
anticorrelator Nov 28, 2023
ecbfcd2
Merge remote-tracking branch 'origin' into dustin/implement-ratelimiter
anticorrelator Nov 28, 2023
01e203a
improve types to remove the need for casts (#1817)
axiomofjoy Nov 28, 2023
1f4abe1
Improve interrupt handling
anticorrelator Nov 28, 2023
a2798c8
Exit early from queue.join on termination events
anticorrelator Nov 28, 2023
7b4d66e
Properly cancel running tasks
anticorrelator Nov 28, 2023
9f3cd2e
Add pytest-asyncio to hatch env
anticorrelator Nov 28, 2023
ddc562c
Do not await cancelled tasks
anticorrelator Nov 28, 2023
f9f2753
Improve task_done marking logic
anticorrelator Nov 28, 2023
ef0cbda
Merge remote-tracking branch 'origin' into dustin/implement-ratelimiter
anticorrelator Nov 29, 2023
5657ec7
Merge branch 'main' into dustin/implement-ratelimiter
anticorrelator Nov 29, 2023
eef2e55
Increase default concurrency
anticorrelator Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions src/phoenix/experimental/evals/models/rate_limiters.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def on_rate_limit_error(self, request_start_time: float) -> None:
# do not reduce the rate for concurrent requests
return
self.rate *= self.rate_reduction_factor
print(f"Reducing rate to {self.rate} after rate limit error")
anticorrelator marked this conversation as resolved.
Show resolved Hide resolved

# the enforcement window determines the minimum rate
self.rate = max(self.rate, 1 / self.enforcement_window)
Expand Down Expand Up @@ -142,6 +143,9 @@ def __init__(
rate_increase_factor=rate_increase_factor,
cooldown_seconds=cooldown_seconds,
)
self._rate_limit_handling = asyncio.Event()
self._rate_limit_handling.set() # allow requests to start immediately
self._rate_limit_handling_lock = asyncio.Lock()

def limit(
self, fn: Callable[ParameterSpec, GenericType]
Expand All @@ -158,7 +162,7 @@ def wrapper(*args: Any, **kwargs: Any) -> GenericType:
try:
request_start_time = time.time()
self._throttler.wait_until_ready()
return cast(GenericType, fn(*args, **kwargs)) # type: ignore
return fn(*args, **kwargs)
except self._rate_limit_error:
self._throttler.on_rate_limit_error(request_start_time)
continue
Expand All @@ -170,24 +174,25 @@ def alimit(self, fn: AsyncCallable) -> AsyncCallable:
@wraps(fn)
async def wrapper(*args: Any, **kwargs: Any) -> GenericType:
try:
await self._rate_limit_handling.wait()
await self._throttler.async_wait_until_ready()
anticorrelator marked this conversation as resolved.
Show resolved Hide resolved
request_start_time = time.time()
return cast(GenericType, await fn(*args, **kwargs))
except self._rate_limit_error:
self._throttler.on_rate_limit_error(request_start_time)
return self._block_and_retry_awaitable(fn, *args, **kwargs)
async with self._rate_limit_handling_lock:
self._rate_limit_handling.clear() # prevent new requests from starting
self._throttler.on_rate_limit_error(request_start_time)
try:
for _attempt in range(self._max_rate_limit_retries):
try:
request_start_time = time.time()
self._throttler.wait_until_ready()
return await fn(*args, **kwargs) # type: ignore
except self._rate_limit_error:
self._throttler.on_rate_limit_error(request_start_time)
continue
finally:
self._rate_limit_handling.set() # allow new requests to start
raise self._rate_limit_error(f"Exceeded max ({self._max_rate_limit_retries}) retries")

return cast(AsyncCallable, wrapper)

def _block_and_retry_awaitable(
self, fn: AsyncCallable, *args: Any, **kwargs: Any
) -> GenericType: # type: ignore
for _attempt in range(self._max_rate_limit_retries):
try:
self._throttler.wait_until_ready()
request_start_time = time.time()
return cast(GenericType, asyncio.run(fn(*args, **kwargs)))
except self._rate_limit_error:
self._throttler.on_rate_limit_error(request_start_time)
continue
raise self._rate_limit_error("Exceeded max retries")
Loading