Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Azure and Bedrock LLMs #6

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 110 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,112 @@
# flib

flib is a library for AI models and utils created by and for Fonction Labs
flib is a library for AI models and utilities created by and for Fonction Labs. It provides a collection of models for various AI tasks, including natural language processing and image segmentation.

## Table of Contents

- [Features](#features)
- [Installation](#installation)
- [Usage](#usage)
- [Models](#models)
- [Utilities](#utilities)
- [Testing](#testing)
- [Contributing](#contributing)
- [License](#license)

## Features

- Support for multiple AI models including OpenAI, Ollama, Bedrock, and more.
- Utilities for image processing and text chunking.
- Parallel processing capabilities for efficient model execution.

## Installation

To install the project, you can use Poetry. Make sure you have Poetry installed, then run:

```bash
poetry install
```

This will install all the required dependencies specified in the `pyproject.toml` file.

## Usage

### Models

The library includes several models for different tasks. Here are some examples of how to use them:

#### OpenAI GPT Model

```python
from flib.models.llms.openai import OpenAIGPTModel
import os

model = OpenAIGPTModel(model_name="gpt-3.5-turbo", api_key=os.environ["OPENAI_API_KEY"])
response = model.run(messages=[{"role": "user", "content": "Hello!"}])
print(response)
```

#### Ollama Model

```python
from flib.models.llms.ollama import OllamaModel

model = OllamaModel(model_name="mistral")
response = model.run(messages=[{"role": "user", "content": "Hello!"}])
print(response)
```

#### Bedrock Model

```python
from flib.models.llms.bedrock import BedRockLLMModel

model = BedRockLLMModel(model_name="mistral.mistral-large-2402-v1:0")
response = model.run(messages=[{"role": "user", "content": "Hello!"}])
print(response)
```

### Utilities

The library also provides utility functions for image processing and text chunking.

#### Image Processing

```python
from flib.utils.images import load_image, encode_image_base64

image = load_image("path/to/image.png")
encoded_image = encode_image_base64(image)
```

#### Text Chunking

```python
from flib.utils.chunk_text import get_text_chunks

text = "This is a test text for chunking."
chunks = get_text_chunks(text, chunk_size=10, chunk_overlap=2)
print(chunks)
```

## Testing

To run the tests, you can use pytest. Make sure you have pytest installed, then run:

```bash
pytest
```

## Contributing

Contributions are welcome! Please follow these steps to contribute:

1. Fork the repository.
2. Create a new branch (`git checkout -b feature-branch`).
3. Make your changes and commit them (`git commit -m 'Add new feature'`).
4. Push to the branch (`git push origin feature-branch`).
5. Create a new Pull Request.

## License

This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.
Empty file added flib/models/llms/__init__.py
Empty file.
82 changes: 82 additions & 0 deletions flib/models/llms/azure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import os
import json
from typing import Generator
from botocore.exceptions import ClientError
from flib.utils.parallel import ParallelTqdm
from joblib import delayed
from tqdm import tqdm
import itertools
from openai import AzureOpenAI
from azure.ai.inference import ChatCompletionsClient
from azure.core.credentials import AzureKeyCredential
from azure.ai.inference.models import ChatCompletionsResponseFormatJSON
from azure.ai.inference.models import SystemMessage, UserMessage, AssistantMessage
from .base_llm import BaseLLM
from .openai import OpenAIGPTModel


class AzureOpenaiModel(OpenAIGPTModel):
"""
A model for interacting with Azure OpenAI's chat completions.

Attributes:
model_name (str): The name of the Azure OpenAI model to use.
client (AzureOpenAI): The Azure OpenAI client for making API calls.
"""
def __init__(self, endpoint: str, model_name: str = "gpt-4o"):
self.model_name = model_name
self.client = get_azure_client(endpoint)


class AzureInferenceModel(BaseLLM):
def __init__(self, endpoint: str, model_name: str):
self.model_name = model_name
self.client = get_azure_completion_client(endpoint)

def run(
self, messages, temperature: float = 0.0, stream: bool = False, json_output: bool = False
) -> (Generator[str, str, None] | str):

if json_output:
response = self.client.complete(
messages=list(map(get_message_azure, messages)),
temperature=temperature,
stream=stream,
response_format=ChatCompletionsResponseFormatJSON()
)

else:
response = self.client.complete(
messages=list(map(get_message_azure, messages)),
temperature=temperature,
stream=stream,
)

if not stream:
return response.choices[0].message.content
else:
return parse_stream(response)

def get_azure_client(endpoint):
client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version='2024-06-01',
azure_endpoint=endpoint
)
return client

def get_azure_completion_client(endpoint):
client = ChatCompletionsClient(
endpoint=endpoint,
credential=AzureKeyCredential(os.environ["AZURE_INFERENCE_CREDENTIAL"]),
)
return client

def get_message_azure(message):
match message["role"]:
case "system":
return SystemMessage(content=messages["content"])
case "user":
return UserMessage(content=messages["content"])
case "assistant":
return AssistantMessage(content=messages["content"])
54 changes: 54 additions & 0 deletions flib/models/llms/base_llm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import abc
from typing import Generator

from ..base import BaseModel

class BaseLLM(BaseModel):

@abc.abstractmethod
def run(
self, messages, temperature: float = 0.0, stream: bool = False, json_output: bool = False
) -> (Generator[str, str, None] | str):
pass

def run_batch(self, list_messages, temperature: float = 0.0, stream: bool = False, json_output: bool = False, parallel: bool = False, n_jobs: int = 8):
"""
Runs the model in batch mode with the provided list of messages.

Args:
list_messages (list): A list of message lists to send to the model.
temperature (float): Sampling temperature for randomness in responses.
parallel (bool): Whether to run the requests in parallel.
n_jobs (int): Number of jobs to run in parallel.

Returns:
list: A list of responses from the model.
"""
if parallel:
return ParallelTqdm(n_jobs=n_jobs, prefer="threads", total_tasks=len(list_messages))(
delayed(self.run)(message, temperature=temperature, stream=stream, json_output=json_output) for message in list_messages
)
return [self.run(message, temperature) for message in list_messages]

class BaseEmbedding(BaseModel):

@abc.abstractmethod
def run(self, prompt: str) -> list[float]:
pass

def run_batch(self, prompts: list[str], parallel: bool = False) -> list[list[float]]:
"""
Generates embeddings for a batch of prompts.

Args:
prompts (list[str]): A list of prompts to generate embeddings for.
parallel (bool): Whether to run the requests in parallel.

Returns:
list[list[float]]: A list of generated embedding vectors.
"""
if parallel:
return ParallelTqdm(n_jobs=8, prefer="threads", total_tasks=len(prompts))(
delayed(self.run)(prompt) for prompt in prompts
)
return [self.run(prompt) for prompt in tqdm(prompts)]
113 changes: 113 additions & 0 deletions flib/models/llms/bedrock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import boto3
from botocore.config import Config
import json
from typing import Generator
from warnings import warn
from botocore.exceptions import ClientError
from flib.utils.parallel import ParallelTqdm
from joblib import delayed
from tqdm import tqdm
import itertools
from .base_llm import BaseLLM
from .utils import clean_json_output

class BedRockLLMModel(BaseLLM):
"""
A model for interacting with Amazon Bedrock's LLMs.

Attributes:
model_name (str): The name of the Bedrock model to use.
client: The Bedrock client for making API calls.
"""
def __init__(self, model_name: str):
self.model_name = model_name
self.client = get_bedrock_client()

def run(
self, messages: dict, temperature: float = 0.0, stream: bool = False, json_output: bool = False
) -> (Generator[str, str, None] | str):
"""
Runs the model with the provided messages and returns the generated response.

Args:
messages (dict): A dictionary of messages to send to the model.
temperature (float): Sampling temperature for randomness in responses.
stream (bool): Whether to stream the response.
json_output (bool): Whether to return the response in JSON format.

Returns:
(Generator[str, str, None] | str): The generated response from the model, either as a string or a generator.
"""
return get_llm_answer_bedrock(
messages=messages,
model_id=self.model_name,
bedrock=self.client,
temperature=temperature,
json_output=json_output,
stream=stream
)

def get_bedrock_client():
config = Config(read_timeout=1000)
return boto3.client(service_name="bedrock-runtime", config=config)

def get_embeddings_bedrock(prompt: str, model_id: str, bedrock):
json_request = {"inputText": prompt}
body = json.dumps(json_request)

try:
response = bedrock.invoke_model(body=body, modelId=model_id)
response_body = response.get('body').read()
embedding = json.loads(response_body)['embedding']
return embedding
except (ClientError, Exception) as e:
print(f"ERROR: Can't invoke '{model_id}'. Reason: {e}")
exit(1)


def get_llm_answer_bedrock(messages: str, model_id: str, bedrock, temperature: float = 0.0, json_output: bool = False, stream: bool = False) -> str:
native_request = {
'messages': messages
}
if json_output:
warn("Json output not available for Bedrock Models")

request = json.dumps(native_request)

if not stream:
try:
response = bedrock.invoke_model(modelId=model_id, body=request)
except (ClientError, Exception) as e:
print(f"ERROR: Can't invoke '{model_id}'. Reason: {e}")
exit(1)

model_response = json.loads(response["body"].read())

if json_output:
return clean_json_output(model_response["choices"][0]["message"]["content"])

return model_response["choices"][0]["message"]["content"]

else:
try:
streaming_response = bedrock.invoke_model_with_response_stream(
modelId=model_id, body=request
)

except (ClientError, Exception) as e:
print(f"ERROR: Can't invoke '{model_id}'. Reason: {e}")
exit(1)

return parse_stream(streaming_response["body"])


def parse_stream(stream):
for event in stream:
chunk = event.get('chunk')
message = json.loads(chunk.get("bytes").decode())
chunk = json.loads(event["chunk"]["bytes"])
chunk = chunk["choices"][0]
yield chunk["message"].get("content")

if chunk.get("stop_reason"):
return "\n \n"
Loading