diff --git a/docs/adding-embedding-integration.md b/docs/adding-embedding-integration.md index 5beac5150..991cf527c 100644 --- a/docs/adding-embedding-integration.md +++ b/docs/adding-embedding-integration.md @@ -31,13 +31,17 @@ integration. Update the tests to account for the new function. The vectorizer worker reads the database's vectorizer configuration at runtime and turns it into a `pgai.vectorizer.Config`. -To add a new integration, add a new embedding class with fields corresponding -to the database's jsonb configuration to `pgai/vectorizer/embeddings.py`. See +To add a new integration, add a new file containing the embedding class +with fields corresponding to the database's jsonb configuration into the +[embedders directory] directory. See the existing implementations for examples of how to do this. Implement the `Embedder` class' abstract methods. Use first-party python libraries for the integration, if available. If no first-party python libraries are available, use direct HTTP requests. +Remember to include the import line of your recently created class into the +[embedders \_\_init\_\_.py]. + Add tests which perform end-to-end testing of the new integration. There are two options for handling API calls to the integration API: @@ -49,6 +53,8 @@ used conservatively. We will determine on a case-by-case basis what level of testing we would like. [vcr.py]:https://vcrpy.readthedocs.io/en/latest/ +[embedders directory]:/projects/pgai/pgai/vectorizer/embedders +[embedders \_\_init\_\_.py]:/projects/pgai/pgai/vectorizer/embedders/__init__.py ## Documentation diff --git a/projects/pgai/justfile b/projects/pgai/justfile index 94b91a557..34ed41ba2 100644 --- a/projects/pgai/justfile +++ b/projects/pgai/justfile @@ -39,6 +39,10 @@ test: lint: @uv run ruff check ./ +# Run ruff linter checks and fix all auto-fixable issues +lint-fix: + @uv run ruff check ./ --fix + # Run pyright type checking type-check: @uv run pyright ./ diff --git a/projects/pgai/pgai/vectorizer/embedders/__init__.py b/projects/pgai/pgai/vectorizer/embedders/__init__.py new file mode 100644 index 000000000..8eda5c1b4 --- /dev/null +++ b/projects/pgai/pgai/vectorizer/embedders/__init__.py @@ -0,0 +1,3 @@ +from .ollama import Ollama as Ollama +from .openai import OpenAI as OpenAI +from .voyageai import VoyageAI as VoyageAI diff --git a/projects/pgai/pgai/vectorizer/embedders/ollama.py b/projects/pgai/pgai/vectorizer/embedders/ollama.py new file mode 100644 index 000000000..0c0b42881 --- /dev/null +++ b/projects/pgai/pgai/vectorizer/embedders/ollama.py @@ -0,0 +1,158 @@ +import os +from collections.abc import Mapping, Sequence +from functools import cached_property +from typing import ( + Any, + Literal, +) + +import ollama +from pydantic import BaseModel +from typing_extensions import TypedDict, override + +from ..embeddings import ( + BatchApiCaller, + Embedder, + EmbeddingResponse, + EmbeddingVector, + StringDocument, + Usage, + logger, +) + + +# Note: this is a re-declaration of ollama.Options, which we are forced to do +# otherwise pydantic complains (ollama.Options subclasses typing.TypedDict): +# pydantic.errors.PydanticUserError: Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.12. # noqa +class OllamaOptions(TypedDict, total=False): + # load time options + numa: bool + num_ctx: int + num_batch: int + num_gpu: int + main_gpu: int + low_vram: bool + f16_kv: bool + logits_all: bool + vocab_only: bool + use_mmap: bool + use_mlock: bool + embedding_only: bool + num_thread: int + + # runtime options + num_keep: int + seed: int + num_predict: int + top_k: int + top_p: float + tfs_z: float + typical_p: float + repeat_last_n: int + temperature: float + repeat_penalty: float + presence_penalty: float + frequency_penalty: float + mirostat: int + mirostat_tau: float + mirostat_eta: float + penalize_newline: bool + stop: Sequence[str] + + +class Ollama(BaseModel, Embedder): + """ + Embedder that uses Ollama to embed documents into vector representations. + + Attributes: + implementation (Literal["ollama"]): The literal identifier for this + implementation. + model (str): The name of the Ollama model used for embeddings. + base_url (str): The base url used to access the Ollama API. + options (dict): Additional ollama-specific runtime options + keep_alive (str): How long to keep the model loaded after the request + """ + + implementation: Literal["ollama"] + model: str + base_url: str | None = None + options: OllamaOptions | None = None + keep_alive: str | None = None # this is only `str` because of the SQL API + + @override + async def embed(self, documents: list[str]) -> Sequence[EmbeddingVector]: + """ + Embeds a list of documents into vectors using Ollama's embeddings API. + + Args: + documents (list[str]): A list of documents to be embedded. + + Returns: + Sequence[EmbeddingVector | ChunkEmbeddingError]: The embeddings or + errors for each document. + """ + await logger.adebug(f"Chunks produced: {len(documents)}") + return await self._batcher.batch_chunks_and_embed(documents) + + @cached_property + def _batcher(self) -> BatchApiCaller[StringDocument]: + return BatchApiCaller(self._max_chunks_per_batch(), self.call_embed_api) + + @override + def _max_chunks_per_batch(self) -> int: + # Note: the chosen default is arbitrary - Ollama doesn't place a limit + return int( + os.getenv("PGAI_VECTORIZER_OLLAMA_MAX_CHUNKS_PER_BATCH", default="2048") + ) + + @override + async def setup(self): + client = ollama.AsyncClient(host=self.base_url) + try: + await client.show(self.model) + except ollama.ResponseError as e: + if f"model '{self.model}' not found" in e.error: + logger.warn( + f"pulling ollama model '{self.model}', this may take a while" + ) + await client.pull(self.model) + + async def call_embed_api(self, documents: str | list[str]) -> EmbeddingResponse: + response = await ollama.AsyncClient(host=self.base_url).embed( + model=self.model, + input=documents, + options=self.options, + keep_alive=self.keep_alive, + ) + usage = Usage( + prompt_tokens=response["prompt_eval_count"], + total_tokens=response["prompt_eval_count"], + ) + return EmbeddingResponse(embeddings=response["embeddings"], usage=usage) + + async def _model(self) -> Mapping[str, Any]: + """ + Gets the model details from the Ollama API + :return: + """ + return await ollama.AsyncClient(host=self.base_url).show(self.model) + + async def _context_length(self) -> int | None: + """ + Gets the context_length of the configured model, if available + """ + model = await self._model() + architecture = model["model_info"].get("general.architecture", None) + if architecture is None: + logger.warn(f"unable to determine architecture for model '{self.model}'") + return None + context_key = f"{architecture}.context_length" + # see https://github.com/ollama/ollama/blob/712d63c3f06f297e22b1ae32678349187dccd2e4/llm/ggml.go#L116-L118 # noqa + model_context_length = model["model_info"][context_key] + # the context window can be configured, so pull the value from the config + num_ctx = ( + float("inf") + if self.options is None + else self.options.get("num_ctx", float("inf")) + ) + return min(model_context_length, num_ctx) diff --git a/projects/pgai/pgai/vectorizer/embedders/openai.py b/projects/pgai/pgai/vectorizer/embedders/openai.py new file mode 100644 index 000000000..7c4e9aa5c --- /dev/null +++ b/projects/pgai/pgai/vectorizer/embedders/openai.py @@ -0,0 +1,202 @@ +import re +from collections.abc import Sequence +from functools import cached_property +from typing import Any, Literal + +import openai +import tiktoken +from openai import resources +from pydantic import BaseModel +from typing_extensions import override + +from ..embeddings import ( + ApiKeyMixin, + BatchApiCaller, + ChunkEmbeddingError, + Embedder, + EmbeddingResponse, + EmbeddingVector, + StringDocument, + TokenDocument, + Usage, + logger, +) + +TOKEN_CONTEXT_LENGTH_ERROR = "chunk exceeds model context length" + +openai_token_length_regex = re.compile( + r"This model's maximum context length is (\d+) tokens" +) + + +class OpenAI(ApiKeyMixin, BaseModel, Embedder): + """ + Embedder that uses OpenAI's API to embed documents into vector representations. + + Attributes: + implementation (Literal["openai"]): The literal identifier for this + implementation. + model (str): The name of the OpenAI model used for embeddings. + dimensions (int | None): Optional dimensions for the embeddings. + user (str | None): Optional user identifier for OpenAI API usage. + """ + + implementation: Literal["openai"] + model: str + dimensions: int | None = None + user: str | None = None + + @cached_property + def _openai_dimensions(self) -> int | openai.NotGiven: + if self.model == "text-embedding-ada-002": + if self.dimensions != 1536: + raise ValueError("dimensions must be 1536 for text-embedding-ada-002") + return openai.NOT_GIVEN + return self.dimensions if self.dimensions is not None else openai.NOT_GIVEN + + @cached_property + def _openai_user(self) -> str | openai.NotGiven: + return self.user if self.user is not None else openai.NOT_GIVEN + + @cached_property + def _embedder(self) -> resources.AsyncEmbeddings: + return openai.AsyncOpenAI(api_key=self._api_key, max_retries=3).embeddings + + @override + def _max_chunks_per_batch(self) -> int: + return 2048 + + async def call_embed_api(self, documents: list[TokenDocument]) -> EmbeddingResponse: + response = await self._embedder.create( + input=documents, + model=self.model, + dimensions=self._openai_dimensions, + user=self._openai_user, + encoding_format="float", + ) + usage = Usage( + prompt_tokens=response.usage.prompt_tokens, + total_tokens=response.usage.total_tokens, + ) + return EmbeddingResponse( + embeddings=[r.embedding for r in response.data], usage=usage + ) + + @cached_property + def _batcher(self) -> BatchApiCaller[TokenDocument]: + return BatchApiCaller(self._max_chunks_per_batch(), self.call_embed_api) + + @override + async def embed( + self, documents: list[StringDocument] + ) -> Sequence[EmbeddingVector | ChunkEmbeddingError]: + """ + Embeds a list of documents into vectors using OpenAI's embeddings API. + The documents are first encoded into tokens before being embedded. + + If a request to generate embeddings fails because one or more chunks + exceed the model's token limit, the offending chunks are filtered out + and the request is retried. The returned result will contain a + ChunkEmbeddingError in place of an EmbeddingVector for the chunks that + exceeded the model's token limit. + + Args: + documents (list[str]): A list of documents to be embedded. + + Returns: + Sequence[EmbeddingVector | ChunkEmbeddingError]: The embeddings or + errors for each document. + """ + encoded_documents = await self._encode(documents) + await logger.adebug(f"Chunks produced: {len(documents)}") + try: + return await self._batcher.batch_chunks_and_embed(encoded_documents) + except openai.BadRequestError as e: + body = e.body + if not isinstance(body, dict): + raise e + if "message" not in body: + raise e + msg: Any = body["message"] + if not isinstance(msg, str): + raise e + + m = openai_token_length_regex.match(msg) + if not m: + raise e + model_token_length = int(m.group(1)) + return await self._filter_by_length_and_embed( + model_token_length, encoded_documents + ) + + async def _filter_by_length_and_embed( + self, model_token_length: int, encoded_documents: list[list[int]] + ) -> Sequence[EmbeddingVector | ChunkEmbeddingError]: + """ + Filters out documents that exceed the model's token limit and embeds + the valid ones. Chunks that exceed the limit are replaced in the + response with an ChunkEmbeddingError instead of an EmbeddingVector. + + Args: + model_token_length (int): The token length limit for the model. + encoded_documents (list[list[int]]): A list of encoded documents. + + Returns: + Sequence[EmbeddingVector | ChunkEmbeddingError]: EmbeddingVector + for the chunks that were successfully embedded, ChunkEmbeddingError + for the chunks that exceeded the model's token limit. + """ + valid_documents: list[list[int]] = [] + invalid_documents_idxs: list[int] = [] + for i, doc in enumerate(encoded_documents): + if len(doc) > model_token_length: + invalid_documents_idxs.append(i) + else: + valid_documents.append(doc) + + assert len(valid_documents) + len(invalid_documents_idxs) == len( + encoded_documents + ) + + response = await self._batcher.batch_chunks_and_embed(valid_documents) + + embeddings: list[ChunkEmbeddingError | list[float]] = [] + for i in range(len(encoded_documents)): + if i in invalid_documents_idxs: + embedding = ChunkEmbeddingError( + error=TOKEN_CONTEXT_LENGTH_ERROR, + error_details=f"chunk exceeds the {self.model} model context length of {model_token_length} tokens", # noqa + ) + else: + embedding = response.pop(0) + embeddings.append(embedding) + + return embeddings + + async def _encode(self, documents: list[str]) -> list[list[int]]: + """ + Encodes a list of documents into a list of tokenized documents, using + the corresponding encoder for the model. + + Args: + documents (list[str]): A list of text documents to be tokenized. + + Returns: + list[list[int]]: A list of tokenized documents. + """ + total_tokens = 0 + encoded_documents: list[list[int]] = [] + for document in documents: + if self.model.endswith("001"): + # See: https://github.com/openai/openai-python/issues/418#issuecomment-1525939500 + # replace newlines, which can negatively affect performance. + document = document.replace("\n", " ") + tokenized = self._encoder.encode_ordinary(document) + total_tokens += len(tokenized) + encoded_documents.append(tokenized) + await logger.adebug(f"Total tokens in batch: {total_tokens}") + return encoded_documents + + @cached_property + def _encoder(self) -> tiktoken.Encoding: + return tiktoken.encoding_for_model(self.model) diff --git a/projects/pgai/pgai/vectorizer/embedders/voyageai.py b/projects/pgai/pgai/vectorizer/embedders/voyageai.py new file mode 100644 index 000000000..17db83230 --- /dev/null +++ b/projects/pgai/pgai/vectorizer/embedders/voyageai.py @@ -0,0 +1,72 @@ +from collections.abc import Sequence +from functools import cached_property +from typing import Literal + +import voyageai +import voyageai.error +from pydantic import BaseModel +from typing_extensions import override + +from ..embeddings import ( + ApiKeyMixin, + BatchApiCaller, + Embedder, + EmbeddingResponse, + EmbeddingVector, + StringDocument, + Usage, + logger, +) + + +class VoyageAI(ApiKeyMixin, BaseModel, Embedder): + """ + Embedder that uses Voyage AI to embed documents into vector representations. + + Attributes: + implementation (Literal["voyageai"]): The literal identifier for this + implementation. + model (str): The name of the Voyage AU model used for embeddings. + input_type ("document" | "query" | None): Set the input type of the + items to be embedded. If set, improves retrieval quality. + + """ + + implementation: Literal["voyageai"] + model: str + input_type: Literal["document"] | Literal["query"] | None = None + + @override + async def embed(self, documents: list[str]) -> Sequence[EmbeddingVector]: + """ + Embeds a list of documents into vectors using the VoyageAI embeddings API. + + Args: + documents (list[str]): A list of documents to be embedded. + + Returns: + Sequence[EmbeddingVector | ChunkEmbeddingError]: The embeddings or + errors for each document. + """ + await logger.adebug(f"Chunks produced: {len(documents)}") + return await self._batcher.batch_chunks_and_embed(documents) + + @cached_property + def _batcher(self) -> BatchApiCaller[StringDocument]: + return BatchApiCaller(self._max_chunks_per_batch(), self.call_embed_api) + + @override + def _max_chunks_per_batch(self) -> int: + return 128 + + async def call_embed_api(self, documents: list[str]) -> EmbeddingResponse: + response = await voyageai.AsyncClient(api_key=self._api_key).embed( + documents, + model=self.model, + input_type=self.input_type, + ) + usage = Usage( + prompt_tokens=response.total_tokens, + total_tokens=response.total_tokens, + ) + return EmbeddingResponse(embeddings=response.embeddings, usage=usage) diff --git a/projects/pgai/pgai/vectorizer/embeddings.py b/projects/pgai/pgai/vectorizer/embeddings.py index c7ba5aeeb..a40f91f02 100644 --- a/projects/pgai/pgai/vectorizer/embeddings.py +++ b/projects/pgai/pgai/vectorizer/embeddings.py @@ -1,37 +1,12 @@ import math -import os -import re import time from abc import ABC, abstractmethod -from collections.abc import Awaitable, Callable, Mapping, Sequence +from collections.abc import Awaitable, Callable, Sequence from dataclasses import dataclass -from functools import cached_property -from typing import ( - Any, - Generic, - Literal, - TypeAlias, - TypeVar, -) - -import ollama -import openai +from typing import Generic, TypeAlias, TypeVar + import structlog -import tiktoken -import voyageai -import voyageai.error from ddtrace import tracer -from openai import resources -from pydantic import BaseModel -from typing_extensions import TypedDict, override - -MAX_RETRIES = 3 - -TOKEN_CONTEXT_LENGTH_ERROR = "chunk exceeds model context length" - -openai_token_length_regex = re.compile( - r"This model's maximum context length is (\d+) tokens" -) logger = structlog.get_logger() @@ -301,368 +276,3 @@ async def print_stats(self): total_chunks=self.total_chunks, chunks_per_second=self.chunks_per_second(), ) - - -class OpenAI(ApiKeyMixin, BaseModel, Embedder): - """ - Embedder that uses OpenAI's API to embed documents into vector representations. - - Attributes: - implementation (Literal["openai"]): The literal identifier for this - implementation. - model (str): The name of the OpenAI model used for embeddings. - dimensions (int | None): Optional dimensions for the embeddings. - user (str | None): Optional user identifier for OpenAI API usage. - """ - - implementation: Literal["openai"] - model: str - dimensions: int | None = None - user: str | None = None - - @cached_property - def _openai_dimensions(self) -> int | openai.NotGiven: - if self.model == "text-embedding-ada-002": - if self.dimensions != 1536: - raise ValueError("dimensions must be 1536 for text-embedding-ada-002") - return openai.NOT_GIVEN - return self.dimensions if self.dimensions is not None else openai.NOT_GIVEN - - @cached_property - def _openai_user(self) -> str | openai.NotGiven: - return self.user if self.user is not None else openai.NOT_GIVEN - - @cached_property - def _embedder(self) -> resources.AsyncEmbeddings: - return openai.AsyncOpenAI( - api_key=self._api_key, max_retries=MAX_RETRIES - ).embeddings - - @override - def _max_chunks_per_batch(self) -> int: - return 2048 - - async def call_embed_api(self, documents: list[TokenDocument]) -> EmbeddingResponse: - response = await self._embedder.create( - input=documents, - model=self.model, - dimensions=self._openai_dimensions, - user=self._openai_user, - encoding_format="float", - ) - usage = Usage( - prompt_tokens=response.usage.prompt_tokens, - total_tokens=response.usage.total_tokens, - ) - return EmbeddingResponse( - embeddings=[r.embedding for r in response.data], usage=usage - ) - - @cached_property - def _batcher(self) -> BatchApiCaller[TokenDocument]: - return BatchApiCaller(self._max_chunks_per_batch(), self.call_embed_api) - - @override - async def embed( - self, documents: list[StringDocument] - ) -> Sequence[EmbeddingVector | ChunkEmbeddingError]: - """ - Embeds a list of documents into vectors using OpenAI's embeddings API. - The documents are first encoded into tokens before being embedded. - - If a request to generate embeddings fails because one or more chunks - exceed the model's token limit, the offending chunks are filtered out - and the request is retried. The returned result will contain a - ChunkEmbeddingError in place of an EmbeddingVector for the chunks that - exceeded the model's token limit. - - Args: - documents (list[str]): A list of documents to be embedded. - - Returns: - Sequence[EmbeddingVector | ChunkEmbeddingError]: The embeddings or - errors for each document. - """ - encoded_documents = await self._encode(documents) - await logger.adebug(f"Chunks produced: {len(documents)}") - try: - return await self._batcher.batch_chunks_and_embed(encoded_documents) - except openai.BadRequestError as e: - body = e.body - if not isinstance(body, dict): - raise e - if "message" not in body: - raise e - msg: Any = body["message"] - if not isinstance(msg, str): - raise e - - m = openai_token_length_regex.match(msg) - if not m: - raise e - model_token_length = int(m.group(1)) - return await self._filter_by_length_and_embed( - model_token_length, encoded_documents - ) - - async def _filter_by_length_and_embed( - self, model_token_length: int, encoded_documents: list[list[int]] - ) -> Sequence[EmbeddingVector | ChunkEmbeddingError]: - """ - Filters out documents that exceed the model's token limit and embeds - the valid ones. Chunks that exceed the limit are replaced in the - response with an ChunkEmbeddingError instead of an EmbeddingVector. - - Args: - model_token_length (int): The token length limit for the model. - encoded_documents (list[list[int]]): A list of encoded documents. - - Returns: - Sequence[EmbeddingVector | ChunkEmbeddingError]: EmbeddingVector - for the chunks that were successfully embedded, ChunkEmbeddingError - for the chunks that exceeded the model's token limit. - """ - valid_documents: list[list[int]] = [] - invalid_documents_idxs: list[int] = [] - for i, doc in enumerate(encoded_documents): - if len(doc) > model_token_length: - invalid_documents_idxs.append(i) - else: - valid_documents.append(doc) - - assert len(valid_documents) + len(invalid_documents_idxs) == len( - encoded_documents - ) - - response = await self._batcher.batch_chunks_and_embed(valid_documents) - - embeddings: list[ChunkEmbeddingError | list[float]] = [] - for i in range(len(encoded_documents)): - if i in invalid_documents_idxs: - embedding = ChunkEmbeddingError( - error=TOKEN_CONTEXT_LENGTH_ERROR, - error_details=f"chunk exceeds the {self.model} model context length of {model_token_length} tokens", # noqa - ) - else: - embedding = response.pop(0) - embeddings.append(embedding) - - return embeddings - - async def _encode(self, documents: list[str]) -> list[list[int]]: - """ - Encodes a list of documents into a list of tokenized documents, using - the corresponding encoder for the model. - - Args: - documents (list[str]): A list of text documents to be tokenized. - - Returns: - list[list[int]]: A list of tokenized documents. - """ - total_tokens = 0 - encoded_documents: list[list[int]] = [] - for document in documents: - if self.model.endswith("001"): - # See: https://github.com/openai/openai-python/issues/418#issuecomment-1525939500 - # replace newlines, which can negatively affect performance. - document = document.replace("\n", " ") - tokenized = self._encoder.encode_ordinary(document) - total_tokens += len(tokenized) - encoded_documents.append(tokenized) - await logger.adebug(f"Total tokens in batch: {total_tokens}") - return encoded_documents - - @cached_property - def _encoder(self) -> tiktoken.Encoding: - return tiktoken.encoding_for_model(self.model) - - -# Note: this is a re-declaration of ollama.Options, which we are forced to do -# otherwise pydantic complains (ollama.Options subclasses typing.TypedDict): -# pydantic.errors.PydanticUserError: Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.12. # noqa -class OllamaOptions(TypedDict, total=False): - # load time options - numa: bool - num_ctx: int - num_batch: int - num_gpu: int - main_gpu: int - low_vram: bool - f16_kv: bool - logits_all: bool - vocab_only: bool - use_mmap: bool - use_mlock: bool - embedding_only: bool - num_thread: int - - # runtime options - num_keep: int - seed: int - num_predict: int - top_k: int - top_p: float - tfs_z: float - typical_p: float - repeat_last_n: int - temperature: float - repeat_penalty: float - presence_penalty: float - frequency_penalty: float - mirostat: int - mirostat_tau: float - mirostat_eta: float - penalize_newline: bool - stop: Sequence[str] - - -class Ollama(BaseModel, Embedder): - """ - Embedder that uses Ollama to embed documents into vector representations. - - Attributes: - implementation (Literal["ollama"]): The literal identifier for this - implementation. - model (str): The name of the Ollama model used for embeddings. - base_url (str): The base url used to access the Ollama API. - options (dict): Additional ollama-specific runtime options - keep_alive (str): How long to keep the model loaded after the request - """ - - implementation: Literal["ollama"] - model: str - base_url: str | None = None - options: OllamaOptions | None = None - keep_alive: str | None = None # this is only `str` because of the SQL API - - @override - async def embed(self, documents: list[str]) -> Sequence[EmbeddingVector]: - """ - Embeds a list of documents into vectors using Ollama's embeddings API. - - Args: - documents (list[str]): A list of documents to be embedded. - - Returns: - Sequence[EmbeddingVector | ChunkEmbeddingError]: The embeddings or - errors for each document. - """ - await logger.adebug(f"Chunks produced: {len(documents)}") - return await self._batcher.batch_chunks_and_embed(documents) - - @cached_property - def _batcher(self) -> BatchApiCaller[StringDocument]: - return BatchApiCaller(self._max_chunks_per_batch(), self.call_embed_api) - - @override - def _max_chunks_per_batch(self) -> int: - # Note: the chosen default is arbitrary - Ollama doesn't place a limit - return int( - os.getenv("PGAI_VECTORIZER_OLLAMA_MAX_CHUNKS_PER_BATCH", default="2048") - ) - - @override - async def setup(self): - client = ollama.AsyncClient(host=self.base_url) - try: - await client.show(self.model) - except ollama.ResponseError as e: - if f"model '{self.model}' not found" in e.error: - logger.warn( - f"pulling ollama model '{self.model}', this may take a while" - ) - await client.pull(self.model) - - async def call_embed_api(self, documents: str | list[str]) -> EmbeddingResponse: - response = await ollama.AsyncClient(host=self.base_url).embed( - model=self.model, - input=documents, - options=self.options, - keep_alive=self.keep_alive, - ) - usage = Usage( - prompt_tokens=response["prompt_eval_count"], - total_tokens=response["prompt_eval_count"], - ) - return EmbeddingResponse(embeddings=response["embeddings"], usage=usage) - - async def _model(self) -> Mapping[str, Any]: - """ - Gets the model details from the Ollama API - :return: - """ - return await ollama.AsyncClient(host=self.base_url).show(self.model) - - async def _context_length(self) -> int | None: - """ - Gets the context_length of the configured model, if available - """ - model = await self._model() - architecture = model["model_info"].get("general.architecture", None) - if architecture is None: - logger.warn(f"unable to determine architecture for model '{self.model}'") - return None - context_key = f"{architecture}.context_length" - # see https://github.com/ollama/ollama/blob/712d63c3f06f297e22b1ae32678349187dccd2e4/llm/ggml.go#L116-L118 # noqa - model_context_length = model["model_info"][context_key] - # the context window can be configured, so pull the value from the config - num_ctx = ( - float("inf") - if self.options is None - else self.options.get("num_ctx", float("inf")) - ) - return min(model_context_length, num_ctx) - - -class VoyageAI(ApiKeyMixin, BaseModel, Embedder): - """ - Embedder that uses Voyage AI to embed documents into vector representations. - - Attributes: - implementation (Literal["voyageai"]): The literal identifier for this - implementation. - model (str): The name of the Voyage AU model used for embeddings. - input_type ("document" | "query" | None): Set the input type of the - items to be embedded. If set, improves retrieval quality. - - """ - - implementation: Literal["voyageai"] - model: str - input_type: Literal["document"] | Literal["query"] | None = None - - @override - async def embed(self, documents: list[str]) -> Sequence[EmbeddingVector]: - """ - Embeds a list of documents into vectors using the VoyageAI embeddings API. - - Args: - documents (list[str]): A list of documents to be embedded. - - Returns: - Sequence[EmbeddingVector | ChunkEmbeddingError]: The embeddings or - errors for each document. - """ - await logger.adebug(f"Chunks produced: {len(documents)}") - return await self._batcher.batch_chunks_and_embed(documents) - - @cached_property - def _batcher(self) -> BatchApiCaller[StringDocument]: - return BatchApiCaller(self._max_chunks_per_batch(), self.call_embed_api) - - @override - def _max_chunks_per_batch(self) -> int: - return 128 - - async def call_embed_api(self, documents: list[str]) -> EmbeddingResponse: - response = await voyageai.AsyncClient(api_key=self._api_key).embed( - documents, - model=self.model, - input_type=self.input_type, - ) - usage = Usage( - prompt_tokens=response.total_tokens, - total_tokens=response.total_tokens, - ) - return EmbeddingResponse(embeddings=response.embeddings, usage=usage) diff --git a/projects/pgai/pgai/vectorizer/vectorizer.py b/projects/pgai/pgai/vectorizer/vectorizer.py index d7ea0e5e6..d57c249f5 100644 --- a/projects/pgai/pgai/vectorizer/vectorizer.py +++ b/projects/pgai/pgai/vectorizer/vectorizer.py @@ -22,7 +22,8 @@ LangChainCharacterTextSplitter, LangChainRecursiveCharacterTextSplitter, ) -from .embeddings import ChunkEmbeddingError, Ollama, OpenAI, VoyageAI +from .embedders import Ollama, OpenAI, VoyageAI +from .embeddings import ChunkEmbeddingError from .formatting import ChunkValue, PythonTemplate from .processing import ProcessingDefault