Skip to content

Commit

Permalink
Merge pull request #33 from th33ngineers/ishaan-allow-custom-vectorDBs
Browse files Browse the repository at this point in the history
Add custom redis cache
  • Loading branch information
krrishdholakia authored Aug 2, 2023
2 parents fb4f7b1 + e06ad1d commit faab313
Show file tree
Hide file tree
Showing 42 changed files with 1,662 additions and 571 deletions.
Binary file modified .DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
with:
poetry-version: 1.3.2
- name: Install dependencies
run: poetry install
run: poetry install --all-extras
- name: Static code checks
run: |
pip install flake8
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

/.ruff_cache/
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ isort:
lint:
make black
make ruff
make isort

pre-commit:
pre-commit run --all-files
Expand Down
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,43 @@ Here's everything you can pass to reliableGPT
| `fallback_strategy` | list | Optional | You can define a custom fallback strategy of OpenAI models you want to try using. If you want to try one model several times, then just repeat that e.g. ['gpt-4', 'gpt-4', 'gpt-3.5-turbo'] will try gpt-4 twice before trying gpt-3.5-turbo |
| `model_limits_dir`| dict | Optional | Note: Required if using `queue_requests = True`, For models you want to handle rate limits for set model_limits_dir = {"gpt-3.5-turbo": {"max_token_capacity": 1000000, "max_request_capacity": 10000}} You can find your account rate limits here: https://platform.openai.com/account/rate-limits |
| `user_token`| string | Optional | Pass your user token if you want us to handle OpenAI Invalid Key Errors - we'll rotate through your stored keys (more on this below 👇) till we get one that works|
| `azure_fallback_strategy`| List[string] | Optional | Pass your backup azure deployment/engine id's. In case your requests start failing we'll switch to one of these (if you also pass in a backup openai key, we'll try the Azure endpoints before the raw OpenAI ones) |
| `backup_openai_key`| string | Optional | Pass your OpenAI API key if you're using Azure and want to switch to OpenAI in case your requests start failing |
| `caching` | bool | Optional | Cache your openai responses, Used as backup in case model fallback fails **or** overloaded queue (if you're servers are being overwhelmed with requests, it'll alert you and return cached responses, so that customer requests don't get dropped) |
| `max_threads` | int | Optional | Pass this in alongside `caching=True`, for it to handle the overloaded queue scenario |

# 👨‍🔬 Use Cases
## Use Caching around your Query Endpoint 🔥
If you're seeing high-traffic and want to make sure all your users get a response, wrap your query endpoint with reliableCache. It monitors for high-thread utilization and responds with cached responses.
### Step 1. Import reliableCache
```python
from reliablegpt import reliableCache
```
### Step 2. Initialize reliableCache
```python
# max_threads: the maximum number of threads you've allocated for flask to run (by default this is 1).
# query_arg: the variable name you're using to pass the user query to your endpoint (Assuming this is in the params/args)
# customer_instance_arg: unique identifier for that customer's instance (we'll put all cached responses for that customer within this bucket)
# user_email: [REQUIRED] your user email - we will alert you when you're seeing high utilization
cache = reliableCache(max_threads=20, query_arg="query", customer_instance_arg="instance_id", user_email="krrish@berri.ai")
```

e.g. The number of threads for this flask app is `50`
```python
if __name__ == "__main__":
from waitress import serve
serve(app, host="0.0.0.0", port=4000, threads=50)
```

### Step 3. Decorate your endpoint 🚀
```python
## Decorate your endpoint with cache.cache_wrapper, this monitors for ..
## .. high thread utilization and sends cached responses when that happens
@app.route("/test_func")
@cache.cache_wrapper
def test_fn():
# your endpoint logic
```

## Switch between Azure OpenAI and raw OpenAI
If you're using Azure OpenAI and facing issues like Read/Request Timeouts, Rate limits, etc. you can use reliableGPT 💪 to fall back to the raw OpenAI endpoints if your Azure OpenAI endpoint fails
Expand Down
Binary file added dist/reliableGPT-0.2.992-py3-none-any.whl
Binary file not shown.
Binary file added dist/reliableGPT-0.2.992.tar.gz
Binary file not shown.
Binary file added dist/reliableGPT-0.2.993-py3-none-any.whl
Binary file not shown.
Binary file added dist/reliableGPT-0.2.993.tar.gz
Binary file not shown.
Binary file added dist/reliableGPT-0.2.9933-py3-none-any.whl
Binary file not shown.
Binary file added dist/reliableGPT-0.2.9933.tar.gz
Binary file not shown.
Binary file added dist/reliableGPT-0.2.994-py3-none-any.whl
Binary file not shown.
Binary file added dist/reliableGPT-0.2.994.tar.gz
Binary file not shown.
Binary file added dist/reliableGPT-0.2.995-py3-none-any.whl
Binary file not shown.
Binary file added dist/reliableGPT-0.2.995.tar.gz
Binary file not shown.
504 changes: 504 additions & 0 deletions examples/UseModelFallBacks.ipynb

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "markdown",
"source": [
"# Using a hot (in-memory) + hosted cache to prevent dropped customer requests in prod (LLM Apps)."
],
"metadata": {
"id": "ICOXlbjlMh9c"
}
},
{
"cell_type": "markdown",
"source": [
"# Create a class to manage your Caching logic"
],
"metadata": {
"id": "3dynjvovLwaA"
}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "6YM2U0ScLTfo"
},
"outputs": [],
"source": [
"from typing import Any\n",
"import threading\n",
"from threading import active_count\n",
"import requests\n",
"import traceback\n",
"from flask import Flask, request\n",
"import json\n",
"from fuzzywuzzy import process\n",
"\n",
"class reliableCache:\n",
" def __init__(self, query_arg=None, customer_instance_arg=None, user_email=None, similarity_threshold=0.65, max_threads=100, verbose=False) -> None:\n",
" self.max_threads = max_threads\n",
" self.verbose = verbose\n",
" self.query_arg = query_arg\n",
" self.customer_instance_arg = customer_instance_arg\n",
" self.user_email = user_email\n",
" self.threshold = similarity_threshold\n",
" self.cache_wrapper_threads = {}\n",
" self.hot_cache = {}\n",
" pass\n",
"\n",
" def print_verbose(self, print_statement):\n",
" if self.verbose:\n",
" print(\"Cached Request: \" + str(print_statement))\n",
"\n",
" def add_cache(self, user_email, instance_id, input_prompt, response):\n",
" try:\n",
" self.print_verbose(f\"result being stored in cache: {response}\")\n",
" url = \"YOUR_HOSTED_CACHE_ENDPOINT/add_cache\"\n",
" querystring = {\n",
" \"customer_id\": \"temp5@xyz.com\",\n",
" \"instance_id\": instance_id,\n",
" \"user_email\": user_email,\n",
" \"input_prompt\": input_prompt,\n",
" \"response\": json.dumps({\"response\": response})\n",
" }\n",
" response = requests.post(url, params=querystring)\n",
" except:\n",
" pass\n",
"\n",
" def try_cache_request(self, user_email, instance_id, query=None):\n",
" try:\n",
" url = \"YOUR_HOSTED_CACHE_ENDPOINT/get_cache\"\n",
" querystring = {\n",
" \"customer_id\": \"temp5@xyz.com\",\n",
" \"instance_id\": instance_id,\n",
" \"user_email\": user_email,\n",
" \"input_prompt\": query,\n",
" \"threshold\": self.threshold\n",
" }\n",
" response = requests.get(url, params=querystring)\n",
" self.print_verbose(f\"response: {response.text}\")\n",
" extracted_result = response.json()[\"response\"]\n",
" print(f\"extracted_result: {extracted_result} \\n\\n original response: {response.json()}\")\n",
" return extracted_result\n",
" except:\n",
" pass\n",
" self.print_verbose(f\"cache miss!\")\n",
" return None\n",
"\n",
" def cache_wrapper(self, func):\n",
" def wrapper(*args, **kwargs):\n",
" query = request.args.get(\"query\") # the customer question\n",
" instance_id = request.args.get(self.customer_instance_arg) # the unique instance to put that customer query/response in\n",
" try:\n",
" if (self.user_email, instance_id) in self.hot_cache:\n",
" choices = self.hot_cache[(self.user_email, instance_id)]\n",
" most_similar_query = process.extractOne(query, choices)\n",
" if most_similar_query[1] > 70:\n",
" result = self.hot_cache[(self.user_email, instance_id, most_similar_query[0])]\n",
" return result\n",
" else:\n",
" result = func(*args, **kwargs)\n",
" # add response to cache\n",
" self.add_cache(self.user_email, instance_id=instance_id, input_prompt=query, response=result)\n",
" except Exception as e:\n",
" cache_result = self.try_cache_request(user_email=self.user_email, instance_id=instance_id, query=query)\n",
" if cache_result:\n",
" print(\"cache hit!\")\n",
" self.hot_cache[(self.user_email, instance_id, query)] = cache_result\n",
" if (self.user_email, instance_id) not in self.hot_cache:\n",
" self.hot_cache[(self.user_email, instance_id)] = []\n",
" self.hot_cache[(self.user_email, instance_id)].append(query)\n",
" return cache_result\n",
" else:\n",
" print(\"Cache miss!\")\n",
" raise e\n",
" self.print_verbose(f\"final result: {result}\")\n",
" return result\n",
" return wrapper\n",
"\n",
" def get_wrapper_thread_utilization(self):\n",
" self.print_verbose(f\"cache wrapper thread values: {self.cache_wrapper_threads.values()}\")\n",
" active_cache_threads = 0\n",
" for value in self.cache_wrapper_threads.values():\n",
" if value == True:\n",
" active_cache_threads += 1\n",
" # active_cache_threads = sum(self.cache_wrapper_threads.values())\n",
" self.print_verbose(f\"active_cache_threads: {active_cache_threads}\")\n",
" return active_cache_threads / self.max_threads"
]
},
{
"cell_type": "markdown",
"source": [
"# Wrap your query endpoint with it\n",
"\n",
"In our caching class, we designed our cache to be a decorator (cache_wrapper). This wraps the `berri_query` endpoint, and in case we return errors, this will catch that and return a cached response instead. We also check each request against a local / hot cache, to reduce dropped requests (useful during high-traffic scenarios)."
],
"metadata": {
"id": "RGg6xwGGL2Ih"
}
},
{
"cell_type": "code",
"source": [
"@app.route(\"/berri_query\")\n",
"@cache.cache_wrapper\n",
"def berri_query():\n",
" print('Request receieved: ', request)\n",
" # your execution logic\n",
" pass"
],
"metadata": {
"id": "YZADDSXPL41g"
},
"execution_count": null,
"outputs": []
}
]
}
Loading

0 comments on commit faab313

Please sign in to comment.