Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenAI: Migrate to async client and enhance API support #219

Open
wants to merge 68 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
411edc6
feat(openai): Migrate to async client and enhance API support
Tostino Nov 13, 2024
9484652
Remove underscore prefix from all arguments.
Tostino Nov 14, 2024
ce17af4
Revert accidental changes to ai--0.4.0.sql
Tostino Nov 14, 2024
9c7c824
Whitespace fixes...darn IDE.
Tostino Nov 14, 2024
c8aacd7
Remove client create/destroy functions from the public interface. Add…
Tostino Nov 15, 2024
0146781
chore(main): release pgai 0.3.0 (#277)
github-actions[bot] Dec 10, 2024
77f3863
chore: release extension 0.6.0 (#285)
JamesGuthrie Dec 10, 2024
35e2fc8
chore: increment extension version to 0.6.1-dev
jgpruitt Dec 10, 2024
027b3f4
feat: allow superusers to create vectorizers on any table
jgpruitt Dec 10, 2024
bd83165
fix: handle empty `PG_BIN` (#286)
JamesGuthrie Dec 10, 2024
9f3b646
chore: increment extension lib version
jgpruitt Dec 10, 2024
1e7f5a5
feat: allow superusers to call _vectorizer_create_dependencies
jgpruitt Dec 10, 2024
aac3d83
fix: host networking is not supported on macos
jgpruitt Dec 10, 2024
ee86d35
fix: schema qualify type definitions, casts, and operators
jgpruitt Dec 10, 2024
74db2ba
docs: voyage ai quickstart (#276)
Askir Dec 11, 2024
aabbc68
docs: fix psql command and pstgres data volume in quickstarts (#293)
Askir Dec 12, 2024
19119e5
docs: fix formatting of table for voyageai (#294)
Askir Dec 12, 2024
113cc48
chore: include latest tag on releases (#295)
adolsalamanca Dec 12, 2024
7b2995b
feat: allow vectorizers to be granted to public
jgpruitt Dec 11, 2024
b22a77a
chore: configure latest tag in docker metadata step (#296)
JamesGuthrie Dec 12, 2024
1e226e8
docs: guide users to clone the latest release for installation
jgpruitt Dec 12, 2024
c3a2221
docs: use named volumes in quickstart guides (#300)
JamesGuthrie Dec 12, 2024
38c7b50
docs: fix RAG SQL example in readme
jgpruitt Dec 13, 2024
20714ff
chore: reorganize readme
cevian Dec 12, 2024
4098316
chore: readme fixes from PR comments
cevian Dec 13, 2024
2b41c9a
chore: more changes to readme
cevian Dec 13, 2024
dbac246
feat: pull missing ollama models (#301)
JamesGuthrie Dec 16, 2024
5441f2d
chore: fix broken pgai build by pinning hatchling (#308)
JamesGuthrie Dec 16, 2024
3f9736a
chore: support uv in extension install, use for dev (#309)
JamesGuthrie Dec 16, 2024
7d4c8ee
feat: add a semantic catalog for db objs and example sql
jgpruitt Dec 2, 2024
df399f8
feat: add ai.find_relevant_sql semantic catalog function
jgpruitt Dec 11, 2024
18ee335
feat: add ai.find_relevant_obj() functions
jgpruitt Dec 11, 2024
9edacfb
ci: pgspot chokes on valid code. disabling for now
jgpruitt Dec 11, 2024
8984cf9
fix: ignore ollama.base_url in test
jgpruitt Dec 11, 2024
f17388e
feat: only find relevant db objs that user has privs to
jgpruitt Dec 12, 2024
cf71602
feat: allow find_relevant_obj to be restricted to a given obj type
jgpruitt Dec 12, 2024
4fd12a6
feat: return dist and add max_dist filter to semantic_catalog funcs
jgpruitt Dec 13, 2024
405a208
chore: clean up event triggers to only update columns strictly required
jgpruitt Dec 13, 2024
56ecf53
chore: add foreign key constraints to semantic catalog on vectorizer
jgpruitt Dec 13, 2024
32b0eb7
chore: reorder arguments for semantic catalog functions
jgpruitt Dec 13, 2024
77df293
chore: support multiple objtype filters in find_relevant_obj()
jgpruitt Dec 13, 2024
85c9244
feat: add vectorizer_embed convenience function
jgpruitt Dec 13, 2024
6578edc
chore: make an immutable version of vectorizer_embed
jgpruitt Dec 16, 2024
5c17d89
chore: rename semantic_catalog.name to semantic_catalog.catalog_name
jgpruitt Dec 16, 2024
1f6d1a8
fix: exclude python system packages for versioned extension (#310)
JamesGuthrie Dec 17, 2024
7b4a916
fix: deprecation warning on re.split
MasterOdin Dec 17, 2024
1d42906
chore: remove pip caching
JamesGuthrie Dec 17, 2024
3c2a6a5
docs: remove openai mention from quickstart, fix opclasses in hnsw in…
Askir Dec 17, 2024
295c0f0
feat: construct a prompt for text-to-sql using relevant desc
jgpruitt Dec 17, 2024
17e3d28
feat: add a text_to_sql function
jgpruitt Dec 17, 2024
77673ee
chore: split embedders in individual files (#315)
smoya Dec 18, 2024
b7573b7
feat: load api keys from db in self hosted vectorizer (#311)
Askir Dec 18, 2024
423e8fd
test: spawn the ollama container through testcontainers (#318)
smoya Dec 18, 2024
ad81577
docs: add simple embedding evaluation script for pgai Vectorizer (#312)
jackyliang Dec 18, 2024
1e44ba2
docs: add Voyage AI evaluation code (#321)
jackyliang Dec 18, 2024
4f530f2
fix: openai wrapping text-to-sql query in markdown (#324)
MasterOdin Dec 18, 2024
39fcf93
docs: clarify 'destination' argument (#262)
smoya Dec 19, 2024
0230509
feat: add sqlalchemy vectorizer_relationship (#265)
Askir Dec 19, 2024
a4298c3
Code for evaluating open source embedding models (#305)
ihis-11 Dec 19, 2024
8c5baf7
test: use psycopg over psycopg2 for sqlalchemy tests (#326)
MasterOdin Dec 20, 2024
89039b2
chore: register postgres_params custom pytest.mark (#327)
MasterOdin Dec 23, 2024
6d89b42
feat(openai): Migrate to async client and enhance API support
Tostino Nov 13, 2024
c368831
Remove underscore prefix from all arguments.
Tostino Nov 14, 2024
76fe0cd
Revert accidental changes to ai--0.4.0.sql
Tostino Nov 14, 2024
88f32b8
Whitespace fixes...darn IDE.
Tostino Nov 14, 2024
b8772f6
Remove client create/destroy functions from the public interface. Add…
Tostino Nov 15, 2024
ab3e9b3
Merge remote-tracking branch 'origin/fix_openai_redux' into fix_opena…
Tostino Dec 26, 2024
6236f03
Fix breakage with secrets.
Tostino Dec 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 85 additions & 47 deletions projects/extension/ai/openai.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import asyncio
import openai
from datetime import datetime
from typing import Optional, Generator, Union
from typing import Optional, Any, Dict, Callable, Awaitable
from .secrets import reveal_secret


Expand All @@ -17,58 +18,95 @@ def get_openai_api_key(plpy, api_key_name: Optional[str] = None) -> str:

def get_openai_base_url(plpy) -> Optional[str]:
r = plpy.execute(
"select pg_catalog.current_setting('ai.openai_base_url', true) as base_url"
"SELECT pg_catalog.current_setting('ai.openai_base_url', true) AS base_url"
)
if len(r) == 0:
return None
return r[0]["base_url"]


def make_client(
plpy,
api_key: Optional[str] = None,
api_key_name: Optional[str] = None,
base_url: Optional[str] = None,
) -> openai.Client:
def make_async_client(
plpy,
api_key: Optional[str] = None,
api_key_name: Optional[str] = None,
organization: Optional[str] = None,
base_url: Optional[str] = None,
timeout: Optional[float] = None,
max_retries: Optional[int] = None,
default_headers: Optional[Dict[str, str]] = None,
default_query: Optional[Dict[str, Any]] = None,
http_client: Optional[Any] = None,
_strict_response_validation: Optional[bool] = None
) -> openai.AsyncOpenAI:
if api_key is None:
api_key = get_openai_api_key(plpy, api_key_name)
if base_url is None:
base_url = get_openai_base_url(plpy)
return openai.Client(api_key=api_key, base_url=base_url)


def list_models(
plpy,
api_key: Optional[str] = None,
api_key_name: Optional[str] = None,
base_url: Optional[str] = None,
) -> Generator[tuple[str, datetime, str], None, None]:
client = make_client(plpy, api_key, api_key_name, base_url)
from datetime import datetime, timezone

for model in client.models.list():
created = datetime.fromtimestamp(model.created, timezone.utc)
yield model.id, created, model.owned_by


def embed(
plpy,
model: str,
input: Union[str, list[str], list[int]],
api_key: Optional[str] = None,
api_key_name: Optional[str] = None,
base_url: Optional[str] = None,
dimensions: Optional[int] = None,
user: Optional[str] = None,
) -> Generator[tuple[int, list[float]], None, None]:
client = make_client(plpy, api_key, api_key_name, base_url)
args = {}
if dimensions is not None:
args["dimensions"] = dimensions
if user is not None:
args["user"] = user
response = client.embeddings.create(input=input, model=model, **args)
if not hasattr(response, "data"):
return None
for obj in response.data:
yield obj.index, obj.embedding

client_kwargs = prepare_kwargs({
"api_key": api_key,
"organization": organization,
"base_url": base_url,
"timeout": timeout,
"max_retries": max_retries,
"default_headers": default_headers,
"default_query": default_query,
"http_client": http_client,
"_strict_response_validation": _strict_response_validation
})

return openai.AsyncOpenAI(**client_kwargs)

def get_or_create_client(plpy, GD: Dict[str, Any], api_key: str = None, api_key_name: str = None, base_url: str = None) -> Any:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any number showing that creating the client is expensive (and thus worth it to store in GD)?. Does this allow connection reuse or something? And if it's the latter then how/when do connections get closed? Is there a keepalive timeout.

Storing the client in GD seems like a good amount of complexity and I'd like to find out what we are gaining/loosing for it.

Copy link
Author

@Tostino Tostino Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, benchmarks here: #116 (comment)

Also note, that there is a known issue with the 2nd (and 3rd...etc) call to the client for the api has some extra 40ms delay that doesn't happen when I have this code running outside of a pl/python environment (noted in the thread above). I really should have mentioned that directly in the PR. will edit to mention that fact that it needs to be identified. Once that is fixed, the benchmark numbers should look much better.

Even with the above issue, this is still much faster, and lower CPU than the original implementation where we recreate the client.

Note specifically the CPU reduction. Recreating the client is heavy on CPU, I know this from past projects but the benchmarks also bare this out.

I believe the connection is closed after the request is complete, and the client becomes ready for the next call. If the request is cancelled early, we attempt to kill things gracefully.

new_config = prepare_kwargs({'api_key': api_key, 'api_key_name': api_key_name, 'base_url': base_url})
old_config = GD.get('openai_client', {}).get('config', {})
merged_config = {**old_config, **new_config}

client_needs_update = (
'openai_client' not in GD or
'client' not in GD.get('openai_client', {}) or
client_config_changed(old_config, merged_config)
)

if client_needs_update:
client = make_async_client(plpy, **merged_config)
GD['openai_client'] = {'client': client, 'config': merged_config}
else:
client = GD['openai_client']['client']

return client


def process_json_input(input_value):
return json.loads(input_value) if input_value is not None else None


def is_query_cancelled(plpy):
try:
plpy.execute("SELECT 1")
return False
except plpy.SPIError:
return True


def execute_with_cancellation(plpy, client: openai.AsyncOpenAI, async_func: Callable[[openai.AsyncOpenAI, Dict[str, Any]], Awaitable[Dict[str, Any]]], **kwargs) -> Dict[str, Any]:
async def main():
task = asyncio.create_task(async_func(client, kwargs))
while not task.done():
if is_query_cancelled(plpy):
task.cancel()
raise plpy.SPIError("Query cancelled by user")
await asyncio.sleep(0.1) # 100ms
return await task

loop = asyncio.get_event_loop()
result = loop.run_until_complete(main())
return result


def prepare_kwargs(params: Dict[str, Any]) -> Dict[str, Any]:
return {k: v for k, v in params.items() if v is not None}


def client_config_changed(old_config: Dict[str, Any], new_config: Dict[str, Any]) -> bool:
return any(old_config.get(k) != new_config.get(k) for k in new_config)
4 changes: 2 additions & 2 deletions projects/extension/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
openai==1.44.0
tiktoken==0.7.0
openai==1.51.2
tiktoken==0.8.0
ollama==0.2.1
anthropic==0.29.0
cohere==5.5.8
Expand Down
4 changes: 2 additions & 2 deletions projects/extension/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ python_requires = >=3.10
packages = ai
# unfortunately, we cannot refer to the requirements.txt file with python 3.10
install_requires =
openai==1.44.0
tiktoken==0.7.0
openai==1.51.2
tiktoken==0.8.0
ollama==0.2.1
anthropic==0.29.0
cohere==5.5.8
Expand Down
Loading