Skip to content

Commit

Permalink
Merge pull request #1891 from BerriAI/litellm_make_s3_cache_async
Browse files Browse the repository at this point in the history
[FEAT] 78% Faster s3 Cache⚡️- Proxy/ litellm.acompletion/ litellm.Router.acompletion
  • Loading branch information
ishaan-jaff authored Feb 8, 2024
2 parents 7c83ba0 + 99b9a7b commit 996dc46
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 0 deletions.
52 changes: 52 additions & 0 deletions litellm/tests/test_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,58 @@ async def call2():
# test_s3_cache_acompletion_stream_azure()


@pytest.mark.asyncio
async def test_s3_cache_acompletion_azure():
import asyncio
import logging
import tracemalloc

tracemalloc.start()
logging.basicConfig(level=logging.DEBUG)

try:
litellm.set_verbose = True
random_word = generate_random_word()
messages = [
{
"role": "user",
"content": f"write a one sentence poem about: {random_word}",
}
]
litellm.cache = Cache(
type="s3", s3_bucket_name="cache-bucket-litellm", s3_region_name="us-west-2"
)
print("s3 Cache: test for caching, streaming + completion")

response1 = await litellm.acompletion(
model="azure/chatgpt-v-2",
messages=messages,
max_tokens=40,
temperature=1,
)
print(response1)

time.sleep(2)

response2 = await litellm.acompletion(
model="azure/chatgpt-v-2",
messages=messages,
max_tokens=40,
temperature=1,
)

print(response2)

assert response1.id == response2.id

litellm.cache = None
litellm.success_callback = []
litellm._async_success_callback = []
except Exception as e:
print(e)
raise e


# test_redis_cache_acompletion_stream_bedrock()
# redis cache with custom keys
def custom_get_cache_key(*args, **kwargs):
Expand Down
94 changes: 94 additions & 0 deletions litellm/tests/test_load_test_router_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# import sys, os
# import traceback
# from dotenv import load_dotenv
# import copy

# load_dotenv()
# sys.path.insert(
# 0, os.path.abspath("../..")
# ) # Adds the parent directory to the system path
# import asyncio
# from litellm import Router, Timeout
# import time
# from litellm.caching import Cache
# import litellm

# litellm.cache = Cache(
# type="s3", s3_bucket_name="cache-bucket-litellm", s3_region_name="us-west-2"
# )

# ### Test calling router with s3 Cache


# async def call_acompletion(semaphore, router: Router, input_data):
# async with semaphore:
# try:
# # Use asyncio.wait_for to set a timeout for the task
# response = await router.acompletion(**input_data)
# # Handle the response as needed
# print(response)
# return response
# except Timeout:
# print(f"Task timed out: {input_data}")
# return None # You may choose to return something else or raise an exception


# async def main():
# # Initialize the Router
# model_list = [
# {
# "model_name": "gpt-3.5-turbo",
# "litellm_params": {
# "model": "gpt-3.5-turbo",
# "api_key": os.getenv("OPENAI_API_KEY"),
# },
# },
# {
# "model_name": "gpt-3.5-turbo",
# "litellm_params": {
# "model": "azure/chatgpt-v-2",
# "api_key": os.getenv("AZURE_API_KEY"),
# "api_base": os.getenv("AZURE_API_BASE"),
# "api_version": os.getenv("AZURE_API_VERSION"),
# },
# },
# ]
# router = Router(model_list=model_list, num_retries=3, timeout=10)

# # Create a semaphore with a capacity of 100
# semaphore = asyncio.Semaphore(100)

# # List to hold all task references
# tasks = []
# start_time_all_tasks = time.time()
# # Launch 1000 tasks
# for _ in range(500):
# task = asyncio.create_task(
# call_acompletion(
# semaphore,
# router,
# {
# "model": "gpt-3.5-turbo",
# "messages": [{"role": "user", "content": "Hey, how's it going?"}],
# },
# )
# )
# tasks.append(task)

# # Wait for all tasks to complete
# responses = await asyncio.gather(*tasks)
# # Process responses as needed
# # Record the end time for all tasks
# end_time_all_tasks = time.time()
# # Calculate the total time for all tasks
# total_time_all_tasks = end_time_all_tasks - start_time_all_tasks
# print(f"Total time for all tasks: {total_time_all_tasks} seconds")

# # Calculate the average time per response
# average_time_per_response = total_time_all_tasks / len(responses)
# print(f"Average time per response: {average_time_per_response} seconds")
# print(f"NUMBER OF COMPLETED TASKS: {len(responses)}")


# # Run the main function
# asyncio.run(main())
6 changes: 6 additions & 0 deletions litellm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2786,6 +2786,12 @@ async def wrapper_async(*args, **kwargs):
result, *args, **kwargs
)
)
elif isinstance(litellm.cache.cache, S3Cache):
threading.Thread(
target=litellm.cache.add_cache,
args=(result,) + args,
kwargs=kwargs,
).start()
else:
asyncio.create_task(
litellm.cache.async_add_cache(
Expand Down

0 comments on commit 996dc46

Please sign in to comment.