Skip to content

Commit

Permalink
fix(proxy_cli.py-&&-proxy_server.py): bump reset budget intervals and…
Browse files Browse the repository at this point in the history
… fix pool limits for prisma connections
  • Loading branch information
krrishdholakia committed Feb 7, 2024
1 parent b6adeec commit 4a0df3c
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 18 deletions.
17 changes: 16 additions & 1 deletion litellm/proxy/proxy_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime
import importlib
from dotenv import load_dotenv
import urllib.parse as urlparse

sys.path.append(os.getcwd())

Expand All @@ -17,6 +18,15 @@
telemetry = None


def append_query_params(url, params):
parsed_url = urlparse.urlparse(url)
parsed_query = urlparse.parse_qs(parsed_url.query)
parsed_query.update(params)
encoded_query = urlparse.urlencode(parsed_query, doseq=True)
modified_url = urlparse.urlunparse(parsed_url._replace(query=encoded_query))
return modified_url


def run_ollama_serve():
try:
command = ["ollama", "serve"]
Expand Down Expand Up @@ -416,6 +426,12 @@ def _make_openai_completion():

if os.getenv("DATABASE_URL", None) is not None:
try:
### add connection pool + pool timeout args
params = {"connection_limit": 500, "pool_timeout": 60}
database_url = os.getenv("DATABASE_URL")
modified_url = append_query_params(database_url, params)
os.environ["DATABASE_URL"] = modified_url
###
subprocess.run(["prisma"], capture_output=True)
is_prisma_runnable = True
except FileNotFoundError:
Expand Down Expand Up @@ -522,6 +538,5 @@ def load(self):
).run() # Run gunicorn



if __name__ == "__main__":
run_server()
6 changes: 5 additions & 1 deletion litellm/proxy/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,10 @@ async def update_database(
f"Enters prisma db call, response_cost: {response_cost}, token: {token}; user_id: {user_id}"
)

### [TODO] STEP 1: GET KEY + USER SPEND ### (key, user)

### [TODO] STEP 2: UPDATE SPEND ### (key, user, spend logs)

### UPDATE USER SPEND ###
async def _update_user_db():
"""
Expand Down Expand Up @@ -1922,7 +1926,7 @@ async def startup_event():
### START BUDGET SCHEDULER ###
scheduler = AsyncIOScheduler()
interval = random.randint(
7, 14
597, 605
) # random interval, so multiple workers avoid resetting budget at the same time
scheduler.add_job(reset_budget, "interval", seconds=interval, args=[prisma_client])
scheduler.start()
Expand Down
8 changes: 1 addition & 7 deletions litellm/proxy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,7 @@ def __init__(self, database_url: str, proxy_logging_obj: ProxyLogging):
# Now you can import the Prisma Client
from prisma import Prisma # type: ignore

self.db = Prisma(
http={
"limits": httpx.Limits(
max_connections=1000, max_keepalive_connections=100
)
}
) # Client to connect to Prisma db
self.db = Prisma() # Client to connect to Prisma db

def hash_token(self, token: str):
# Hash the string using SHA-256
Expand Down
9 changes: 5 additions & 4 deletions tests/test_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,9 @@ async def retry_request(func, *args, _max_attempts=5, **kwargs):
@pytest.mark.asyncio
async def test_key_with_budgets():
"""
- Create key with budget and 5s duration
- Create key with budget and 5min duration
- Get 'reset_at' value
- wait 5s
- wait 10min (budget reset runs every 10mins.)
- Check if value updated
"""
from litellm.proxy.utils import hash_token
Expand All @@ -449,15 +449,16 @@ async def test_key_with_budgets():
reset_at_init_value = key_info["info"]["budget_reset_at"]
reset_at_new_value = None
i = 0
await asyncio.sleep(610)
while i < 3:
await asyncio.sleep(30)
key_info = await get_key_info(session=session, get_key=key, call_key=key)
reset_at_new_value = key_info["info"]["budget_reset_at"]
try:
assert reset_at_init_value != reset_at_new_value
break
except:
i + 1
await asyncio.sleep(5)
assert reset_at_init_value != reset_at_new_value


Expand All @@ -481,7 +482,7 @@ async def test_key_crossing_budget():

response = await chat_completion(session=session, key=key)
print("response 1: ", response)
await asyncio.sleep(2)
await asyncio.sleep(10)
try:
response = await chat_completion(session=session, key=key)
pytest.fail("Should have failed - Key crossed it's budget")
Expand Down
14 changes: 9 additions & 5 deletions tests/test_spend_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ async def test_spend_logs():
await get_spend_logs(session=session, request_id=response["id"])


@pytest.mark.skip(reason="High traffic load test, meant to be run locally")
@pytest.mark.asyncio
async def test_spend_logs_high_traffic():
"""
Expand Down Expand Up @@ -155,9 +156,12 @@ async def retry_request(func, *args, _max_attempts=5, **kwargs):
successful_completions = [c for c in chat_completions if c is not None]
print(f"Num successful completions: {len(successful_completions)}")
await asyncio.sleep(10)
response = await get_spend_logs(session=session, api_key=key)
print(f"response: {response}")
print(f"len responses: {len(response)}")
assert len(response) == n
print(n, time.time() - start, len(response))
try:
response = await retry_request(get_spend_logs, session=session, api_key=key)
print(f"response: {response}")
print(f"len responses: {len(response)}")
assert len(response) == n
print(n, time.time() - start, len(response))
except:
print(n, time.time() - start, 0)
raise Exception("it worked!")

0 comments on commit 4a0df3c

Please sign in to comment.