llmer
是一个轻量级的 Python 库,旨在简化大型语言模型(LLMs)应用中的复杂过程。它提供了用于并行处理、运行时管理、文件处理和Prompt格式化等常用的高级 API 和实用工具,从而不用每次都需要重复开发相关代码,简化工作。
- 模型调用: 支持OpenAI风格和Azure模型调用。
- 并行处理: 支持线程池并发、多线程并发、异步函数异步并发、非异步函数异步并发。
- 运行时管理: 提供timeout装饰器用于超时控制,并发安全锁装饰器保证并发过程中对数据修改的正确性。
- 文件工具: 提供对 YAML读取、文件转List(尤其适用jsonl)、List存文件、 图像转 Base64 编码的工具。
- 提示管理: 将 message 转成 ChatML 格式。
- 字符串解析:从模型的输出中解析出
json {}
。 - FastAPI服务部署:将任意函数,通过@deploy()装饰器即可快速便捷部署成FastApi服务。
- 更多功能: 敬请期待...
要安装 llmer
,使用以下命令:
pip install llmer
from llmer.parallel.thread_pool import ThreadPool
@ThreadPool(parallel_count=4)
def square(num):
return num ** 2
tasks = [{"num": i} for i in range(10)]
results = square(tasks)
print(results) # Output: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
LLMER主要包括以下功能,更多通用功能正在开发中,敬请期待:
Azure
: 用于调用托管于 Microsoft Azure 的语言模型。OpenAI
: 用于调用 GPT 系列模型以及按 OpenAI 风格部署的模型。
ThreadPool
: 使用线程池进行并发。MultiThread
: 使用多线程进行并发。AsyncParallel
: 针对异步函数的异步并行。AsyncExecutor
: 针对非异步函数的异步并行。
timeout
: 函数timeout装饰器(包括生成器也可以直接使用)。parallel_safe_lock
: 支持自定义超时设置的并行安全锁。
- 提供对 YAML读取、文件转List(尤其适用jsonl)、List存文件、 图像转 Base64 编码的工具。
- 将openai格式的message转成ChatML格式。
- 将LLM输出的json字符串解析成json对象,包括```json {}``这种格式的。
- 将任意函数,通过@deploy()装饰器即可快速便捷部署成FastApi服务。
llmer
的 model
模块提供了两个类**Azure
和OpenAI
**,分别支持Azure模型和OpenAI风格模型(GPT和按openai风格部署的模型)调用
Azure 可支持chat和函数调用。
使用举例:
from llmer.model import Azure
openai_api_key = "xxxx"
openai_model_name = "gpt-4o"
openai_api_version = 'xxxx'
openai_api_base = "xxxx"
headers = {}
gpt4o = Azure(
headers=headers,
api_key=openai_api_key,
api_version=openai_api_version,
endpoint=openai_api_base,
timeout=10,
retry=2
)
gpt_response = gpt4o.chat(
model=openai_model_name,
temperature=0.1,
stream=True,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Can you tell me the weather today?"},
],
response_format=None,
)
for chunk in gpt_response:
print(chunk)
同样支持Chat和函数调用(function call)
使用举例:
from llmer.model import Azure, OpenAI
claude_api_key = "xxxx"
claud_model_name = "anthropic.claude-3-5-sonnet-20240620-v1:0"
claude_api_base = "xxxx"
headers = {}
claude = OpenAI(
headers=headers,
api_key=claude_api_key,
endpoint=claude_api_base,
timeout=10,
retry=2
)
response = claude.chat(
model=claud_model_name,
temperature=0.1,
stream=True,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Can you tell me the weather today?"},
],
response_format=None,
)
for chunk in response:
print(chunk)
使用线程池方式实现并行。
使用举例:
from llmer.parallel.thread_pool import ThreadPool
@ThreadPool(parallel_count=3)
def add_one(num):
return num + 1
tasks = [{"num": i} for i in range(5)]
results = add_one(tasks)
print(results) # Output: [1, 2, 3, 4, 5]
使用多线程方式实现并行。
使用举例:
from llmer.parallel.multi_thread import MultiThread
@MultiThread(parallel_count=2)
def multiply(num):
return num * 2
tasks = [{"num": i} for i in range(4)]
results = multiply(tasks)
print(results) # Output: [0, 2, 4, 6]
对异步函数实现异步并发。
使用举例:
import asyncio
from llmer.parallel.async_parallel import AsyncParallel
@AsyncParallel(parallel_count=2)
async def async_task(num):
await asyncio.sleep(1)
return num * 10
tasks = [{"num": i} for i in range(4)]
results = asyncio.run(async_task(tasks))
print(results) # Output: [0, 10, 20, 30]
对非异步函数实现异步并发
使用举例:
import asyncio
import time
from llmer.parallel.async_executor import AsyncExecutor
@AsyncExecutor(parallel_count=2)
def async_task(num):
time.sleep(1)
return num * 5
tasks = [{"num": i} for i in range(4)]
results = asyncio.run(async_task(tasks))
print(results) # Output: [0, 5, 10, 15]
timeout
装饰器允许为函数设置执行的时间限制。如果函数超出了指定的时间,将引发 ExecutionTimeoutError
异常。
对于生成器函数(使用了yield
的函数)同样无缝支持。
使用举例:
import time
from llmer.runtime.context import timeout
from llmer.runtime.exceptions import ExecutionTimeoutError
@timeout(3) # Set timeout to 3 seconds
def long_running_function(x):
time.sleep(x) # Simulate a long task
try:
long_running_function(5)
except ExecutionTimeoutError as e:
print(e) # Output: Function 'long_running_function' timed out after 3 seconds
# 还支持超时时间覆写,比如下面的timeout=2将覆盖@timeout(3)
try:
long_running_function(5, timeout=2)
except ExecutionTimeoutError as e:
print(e) # Output: Function 'long_running_function' timed out after 2 seconds
parallel_safe_lock
装饰器确保函数只能在成功获取锁时执行。如果函数在指定的超时时间内无法获取锁,将引发 AcquireLockTimeoutError
异常。这在函数处理共享资源并需要防止并发执行时非常有用。
使用举例:
from llmer.runtime import parallel_safe_lock, AcquireLockTimeoutError
import threading
from time import sleep
lock = threading.Lock()
@parallel_safe_lock(lock, seconds=0.5)
def write_data(data: str):
print(f"Writing data: {data}")
sleep(0.6)
def task():
try:
write_data("some data")
except AcquireLockTimeoutError as e:
print(e) # Output: critical_section acquires lock, timeout exceeded 0.5
threads = []
for _ in range(3):
t = threading.Thread(target=task)
t.start()
threads.append(t)
for t in threads:
t.join()
# 同样,这里的超时时间也可以覆写,在调用write_data的时候write_data("some data", timeout=0.8)也能够修改原超时时间。
提供了文件处理的实用工具,例如YAML读取、文件转List(尤其适用jsonl)、List存文件、 图像转 Base64 编码。
file_to_list()
函数读取 JSONL 文件(其它任何文件均可,每一行转换成list中的一个元素),并将其内容作为字典列表返回。
path
(Optional[str]): JSONL 文件的路径
使用举例:
from llmer.file import file_to_list
# Example usage: Reading a JSONL file from the current script directory
data = file_to_list("data.jsonl")
print(data)
# Output: List of dictionaries read from the JSONL file
list_to_file()
函数将数据列表(例如字典、字符串等)保存到文件中。您可以指定打开文件的模式('w'
为写入,'a'
为追加)。
data
(List[Any]): 要保存到文件中的数据。它应该是一个列表,列表中的每个项目将被写入文件的一个新行。path
(Optional[str]): 保存文件的路径mode
(str, 默认 'w'): 打开文件的模式。'w'
将覆盖文件,'a'
将数据追加到文件末尾。
使用举例:
from llmer.file import list_to_file
# Example data to save
data = [{"name": "John", "age": 30}, {"name": "Jane", "age": 25}]
# Example usage: Saving a list of dictionaries to a JSONL file
list_to_file(data, "output.jsonl")
yaml_reader()
函数读取一个 YAML 配置文件,并将其内容作为 Python 字典返回。该函数假定 YAML 文件结构正确,并返回解析后的数据。
path
(Optional[str]): YAML 文件的路径
使用举例:
from llmer.file import yaml_reader
# Example YAML file path
yaml_file = "config.yaml"
# Example usage: Reading a YAML configuration file
config = yaml_reader(yaml_file)
print(config)
image_to_base64()
函数将图像文件转换为 Base64 编码的字符串。它还可以为 Base64 字符串添加data:xxx
,以便直接在网页内容或其他用途中嵌入图像。
path
(Optional[str]): 图像文件的路径prefix
(bool, 可选): 如果为True
,则在 Base64 字符串前添加数据前缀。默认值为False
。
使用举例:
from llmer.file import image_to_base64
# Example image file path
image_file = "example.png"
# Convert image to Base64 without prefix
encoded_image = image_to_base64(image_file)
print(encoded_image)
# Convert image to Base64 with prefix
encoded_image_with_prefix = image_to_base64(image_file, prefix=True)
print(encoded_image_with_prefix)
chatml()
函数将一系列消息格式化为 ChatML 格式,这种格式通常用于以结构化方式处理语言模型中的对话。此函数生成一个字符串,其中每条消息都被适当地包装为系统、用户和助手角色的标签。
messages
(List[Dict[str, str]]): 消息列表,其中每条消息是一个包含以下键的字典:role
(str): 说话者的角色,例如"system"
、"user"
或"assistant"
。content
(str): 消息的内容。
使用举例:
from llmer.prompt import chatml
# Example messages
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Can you tell me the weather today?"},
{"role": "assistant", "content": "The weather is sunny with a chance of rain."}
]
# Format messages into ChatML
formatted_message = chatml(messages)
print(formatted_message)
<|im_start|>system
You are a helpful assistant.<|im_end|>
<|im_start|>user
Can you tell me the weather today?<|im_end|>
<|im_start|>assistant
The weather is sunny with a chance of rain.<|im_end|>
<|im_start|>assistant
parse_json()
函数将json字符串加载成json对象,尤其是LLM输出的```json{}```这种风格的字符串。
使用举例:
from llmer.parser import parse_json
json_str = """
some other words...
```json
{
"name": "Alice",
"details": {"key": "value"}
}```
some other words...
"""
json_data = parse_json(json_str)
print(json_data)
json_str = """
{
"name": "Alice",
"details": {"key": "value"}
}
"""
json_data = parse_json(json_str)
print(json_data)
{'name': 'Alice', 'details': {'key': 'value'}}
{'name': 'Alice', 'details': {'key': 'value'}}
@deploy()
装饰器能快速便捷将任意函数部署成FastAPI服务。
使用举例: 如下打开任何一个xxx.serve(),即可立即部署成FastAPI接口
from llmer.server import deploy
import time
import asyncio
# 普通服务
@deploy(host="127.0.0.1", port=9510)
def add(a: int, b: int):
return a + b
# add.serve()
# 异步服务
@deploy(host="127.0.0.1", port=9510)
async def async_add(a: int, b: int):
return a + b
# async_add.serve()
# 流式服务
@deploy(host="127.0.0.1", port=9511)
def stream_numbers(start: int, end: int):
for i in range(start, end + 1):
time.sleep(1)
yield f'{{"number": {i}}}'
# stream_numbers.serve()
# 异步流式服务
@deploy(host="127.0.0.1", port=9511)
async def async_stream_numbers(start: int, end: int):
for i in range(start, end + 1):
await asyncio.sleep(1)
yield {"number": i}
# async_stream_numbers.serve()
以最后一个异步流式服务为例,启动后
INFO: Started server process [26081]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:9511 (Press CTRL+C to quit)
调用举例
curl -X POST "http://127.0.0.1:9511/async_stream_numbers" \
-H "Content-Type: application/json" \
-d '{"start": 1, "end": 5}'
流式输出
data: {'number': 1}
data: {'number': 2}
data: {'number': 3}
data: {'number': 4}
data: {'number': 5}
欢迎为本项目做出贡献。您可以提交问题或提交拉取请求。
您可以通过 pydaxing@gmail.com 联系我们。