From 9a23317461225844a067d2ffeb33993c24e701a9 Mon Sep 17 00:00:00 2001 From: ishaan-jaff Date: Thu, 8 Feb 2024 10:01:56 -0800 Subject: [PATCH 1/3] (feat) fix s3 cache --- litellm/utils.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/litellm/utils.py b/litellm/utils.py index ad56fd576935..feb952dbd895 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -2786,6 +2786,12 @@ async def wrapper_async(*args, **kwargs): result, *args, **kwargs ) ) + elif isinstance(litellm.cache.cache, S3Cache): + threading.Thread( + target=litellm.cache.add_cache, + args=(result,) + args, + kwargs=kwargs, + ).start() else: asyncio.create_task( litellm.cache.async_add_cache( From 8a615cd1258ba1bafce7a1c5349c6ecd766e3af4 Mon Sep 17 00:00:00 2001 From: ishaan-jaff Date: Thu, 8 Feb 2024 10:04:10 -0800 Subject: [PATCH 2/3] (test) async s3 cache --- litellm/tests/test_caching.py | 52 +++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/litellm/tests/test_caching.py b/litellm/tests/test_caching.py index 8433941e90f2..5887aba12c02 100644 --- a/litellm/tests/test_caching.py +++ b/litellm/tests/test_caching.py @@ -696,6 +696,58 @@ async def call2(): # test_s3_cache_acompletion_stream_azure() +@pytest.mark.asyncio +async def test_s3_cache_acompletion_azure(): + import asyncio + import logging + import tracemalloc + + tracemalloc.start() + logging.basicConfig(level=logging.DEBUG) + + try: + litellm.set_verbose = True + random_word = generate_random_word() + messages = [ + { + "role": "user", + "content": f"write a one sentence poem about: {random_word}", + } + ] + litellm.cache = Cache( + type="s3", s3_bucket_name="cache-bucket-litellm", s3_region_name="us-west-2" + ) + print("s3 Cache: test for caching, streaming + completion") + + response1 = await litellm.acompletion( + model="azure/chatgpt-v-2", + messages=messages, + max_tokens=40, + temperature=1, + ) + print(response1) + + time.sleep(2) + + response2 = await litellm.acompletion( + model="azure/chatgpt-v-2", + messages=messages, + max_tokens=40, + temperature=1, + ) + + print(response2) + + assert response1.id == response2.id + + litellm.cache = None + litellm.success_callback = [] + litellm._async_success_callback = [] + except Exception as e: + print(e) + raise e + + # test_redis_cache_acompletion_stream_bedrock() # redis cache with custom keys def custom_get_cache_key(*args, **kwargs): From 99b9a7bfec496a388ec57bdc2c80c19e2c881816 Mon Sep 17 00:00:00 2001 From: ishaan-jaff Date: Thu, 8 Feb 2024 10:07:03 -0800 Subject: [PATCH 3/3] (test) load test s3 cache --- litellm/tests/test_load_test_router_s3.py | 94 +++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 litellm/tests/test_load_test_router_s3.py diff --git a/litellm/tests/test_load_test_router_s3.py b/litellm/tests/test_load_test_router_s3.py new file mode 100644 index 000000000000..ed3df5f5d1a5 --- /dev/null +++ b/litellm/tests/test_load_test_router_s3.py @@ -0,0 +1,94 @@ +# import sys, os +# import traceback +# from dotenv import load_dotenv +# import copy + +# load_dotenv() +# sys.path.insert( +# 0, os.path.abspath("../..") +# ) # Adds the parent directory to the system path +# import asyncio +# from litellm import Router, Timeout +# import time +# from litellm.caching import Cache +# import litellm + +# litellm.cache = Cache( +# type="s3", s3_bucket_name="cache-bucket-litellm", s3_region_name="us-west-2" +# ) + +# ### Test calling router with s3 Cache + + +# async def call_acompletion(semaphore, router: Router, input_data): +# async with semaphore: +# try: +# # Use asyncio.wait_for to set a timeout for the task +# response = await router.acompletion(**input_data) +# # Handle the response as needed +# print(response) +# return response +# except Timeout: +# print(f"Task timed out: {input_data}") +# return None # You may choose to return something else or raise an exception + + +# async def main(): +# # Initialize the Router +# model_list = [ +# { +# "model_name": "gpt-3.5-turbo", +# "litellm_params": { +# "model": "gpt-3.5-turbo", +# "api_key": os.getenv("OPENAI_API_KEY"), +# }, +# }, +# { +# "model_name": "gpt-3.5-turbo", +# "litellm_params": { +# "model": "azure/chatgpt-v-2", +# "api_key": os.getenv("AZURE_API_KEY"), +# "api_base": os.getenv("AZURE_API_BASE"), +# "api_version": os.getenv("AZURE_API_VERSION"), +# }, +# }, +# ] +# router = Router(model_list=model_list, num_retries=3, timeout=10) + +# # Create a semaphore with a capacity of 100 +# semaphore = asyncio.Semaphore(100) + +# # List to hold all task references +# tasks = [] +# start_time_all_tasks = time.time() +# # Launch 1000 tasks +# for _ in range(500): +# task = asyncio.create_task( +# call_acompletion( +# semaphore, +# router, +# { +# "model": "gpt-3.5-turbo", +# "messages": [{"role": "user", "content": "Hey, how's it going?"}], +# }, +# ) +# ) +# tasks.append(task) + +# # Wait for all tasks to complete +# responses = await asyncio.gather(*tasks) +# # Process responses as needed +# # Record the end time for all tasks +# end_time_all_tasks = time.time() +# # Calculate the total time for all tasks +# total_time_all_tasks = end_time_all_tasks - start_time_all_tasks +# print(f"Total time for all tasks: {total_time_all_tasks} seconds") + +# # Calculate the average time per response +# average_time_per_response = total_time_all_tasks / len(responses) +# print(f"Average time per response: {average_time_per_response} seconds") +# print(f"NUMBER OF COMPLETED TASKS: {len(responses)}") + + +# # Run the main function +# asyncio.run(main())