Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): initial pass at streaming support #40

Merged
merged 10 commits into from
May 21, 2024
4 changes: 3 additions & 1 deletion examples/test_custom_response_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import openai

import openai_responses
from openai_responses import OpenAIMock, Request, Response, Route
from openai_responses import OpenAIMock
from openai_responses.ext.httpx import Request, Response
from openai_responses.ext.respx import Route
from openai_responses.helpers.builders.chat import chat_completion_from_create_request


Expand Down
3 changes: 2 additions & 1 deletion examples/test_raw_httpx_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from openai import APIStatusError

import openai_responses
from openai_responses import OpenAIMock, Response
from openai_responses import OpenAIMock
from openai_responses.ext.httpx import Response


@openai_responses.mock()
Expand Down
8 changes: 5 additions & 3 deletions examples/test_router_usage.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import json

import httpx
import openai
from openai.types.beta.threads.run_submit_tool_outputs_params import ToolOutput

import openai_responses
from openai_responses import OpenAIMock, Request, Response, Route, StateStore
from openai_responses import OpenAIMock
from openai_responses.stores import StateStore
from openai_responses.ext.httpx import Request, Response, post
from openai_responses.ext.respx import Route
from openai_responses.helpers.mergers.runs import merge_run_with_partial


Expand Down Expand Up @@ -115,7 +117,7 @@ def test_external_api_mock(openai_mock: OpenAIMock):
tool_outputs: list[ToolOutput] = []
for tool in run.required_action.submit_tool_outputs.tool_calls:
if tool.function.name == "get_current_temperature":
res = httpx.post(
res = post(
url="https://api.myweatherapi.com",
json=json.loads(tool.function.arguments),
)
Expand Down
5 changes: 2 additions & 3 deletions examples/test_run_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from openai_responses import OpenAIMock
from openai_responses.helpers.builders.messages import build_message
from openai_responses.helpers.builders.run_steps import build_run_step
from openai_responses.helpers.state_store import add_resource_to_state_store


@openai_responses.mock()
Expand Down Expand Up @@ -60,8 +59,8 @@ def test_list_run_steps(openai_mock: OpenAIMock):
},
}
)
add_resource_to_state_store(assistant_message, mock=openai_mock)
add_resource_to_state_store(run_step, mock=openai_mock)
openai_mock.state.beta.threads.messages.put(assistant_message)
openai_mock.state.beta.threads.runs.steps.put(run_step)

steps = client.beta.threads.runs.steps.list(run.id, thread_id=thread.id)

Expand Down
122 changes: 122 additions & 0 deletions examples/test_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from typing import Generator
from typing_extensions import override

import openai
from openai import AssistantEventHandler
from openai.types.beta import AssistantStreamEvent
from openai.types.beta.threads import Run

import openai_responses
from openai_responses import OpenAIMock
from openai_responses.stores import StateStore
from openai_responses.streaming import EventStream, Event
from openai_responses.helpers.builders.runs import run_from_create_request
from openai_responses.helpers.builders.messages import build_message
from openai_responses.ext.httpx import Request, Response

event_count = 0


class EventHandler(AssistantEventHandler):
@override
def on_event(self, event: AssistantStreamEvent) -> None:
global event_count
if (
event.event == "thread.run.created"
or event.event == "thread.run.in_progress"
or event.event == "thread.message.created"
or event.event == "thread.run.completed"
):
event_count += 1


class CreateRunEventStream(EventStream):
def __init__(self, created_run: Run, state_store: StateStore) -> None:
self.created_run = created_run
self.state_store = state_store

@override
def generate(self) -> Generator[Event, None, None]:
self.state_store.beta.threads.runs.put(self.created_run)
yield self.event("thread.run.created", self.created_run)

self.created_run.status = "in_progress"
self.state_store.beta.threads.runs.put(self.created_run)
yield self.event("thread.run.in_progress", self.created_run)

assistant_message = build_message(
{
"assistant_id": self.created_run.assistant_id,
"thread_id": self.created_run.thread_id,
"run_id": self.created_run.id,
"role": "assistant",
"status": "completed",
"content": [
{
"type": "text",
"text": {"annotations": [], "value": "Hello! How can I help?"},
}
],
}
)
self.state_store.beta.threads.messages.put(assistant_message)
yield self.event("thread.message.created", assistant_message)

self.created_run.status = "completed"
self.state_store.beta.threads.runs.put(self.created_run)
yield self.event("thread.run.completed", self.created_run)


def create_run_stream_response(
request: Request,
*,
thread_id: str,
state_store: StateStore,
) -> Response:
# NOTE: creating run this way does not currently inherit config from assistant
created_run = run_from_create_request(
thread_id,
request,
extra={"model": "gpt-4-turbo", "tools": [{"type": "code_interpreter"}]},
)
stream = CreateRunEventStream(created_run, state_store)
return Response(201, content=stream)


@openai_responses.mock()
def test_handle_stream(openai_mock: OpenAIMock):
openai_mock.beta.threads.runs.create.response = create_run_stream_response

client = openai.Client(api_key="sk-fake123")

assistant = client.beta.assistants.create(
instructions="You are a personal math tutor. When asked a question, write and run Python code to answer the question.",
name="Math Tutor",
tools=[{"type": "code_interpreter"}],
model="gpt-4-turbo",
)

thread = client.beta.threads.create()

client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=[{"type": "text", "text": "Hello!"}],
)

with client.beta.threads.runs.stream(
thread_id=thread.id,
assistant_id=assistant.id,
event_handler=EventHandler(),
) as stream:
stream.until_done()
run = stream.current_run
assert run

messages = client.beta.threads.messages.list(thread.id)

global event_count
assert event_count == 4
assert len(messages.data) == 2

assert openai_mock.beta.threads.runs.create.calls.call_count == 1
125 changes: 125 additions & 0 deletions examples/test_streaming_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from typing import Generator
from typing_extensions import override

import pytest

import openai
from openai import AsyncAssistantEventHandler
from openai.types.beta import AssistantStreamEvent
from openai.types.beta.threads import Run

import openai_responses
from openai_responses import OpenAIMock
from openai_responses.stores import StateStore
from openai_responses.streaming import AsyncEventStream, Event
from openai_responses.helpers.builders.runs import run_from_create_request
from openai_responses.helpers.builders.messages import build_message
from openai_responses.ext.httpx import Request, Response

event_count = 0


class EventHandler(AsyncAssistantEventHandler):
@override
async def on_event(self, event: AssistantStreamEvent) -> None:
global event_count
if (
event.event == "thread.run.created"
or event.event == "thread.run.in_progress"
or event.event == "thread.message.created"
or event.event == "thread.run.completed"
):
event_count += 1


class CreateRunEventStream(AsyncEventStream):
def __init__(self, created_run: Run, state_store: StateStore) -> None:
self.created_run = created_run
self.state_store = state_store

@override
def generate(self) -> Generator[Event, None, None]:
self.state_store.beta.threads.runs.put(self.created_run)
yield self.event("thread.run.created", self.created_run)

self.created_run.status = "in_progress"
self.state_store.beta.threads.runs.put(self.created_run)
yield self.event("thread.run.in_progress", self.created_run)

assistant_message = build_message(
{
"assistant_id": self.created_run.assistant_id,
"thread_id": self.created_run.thread_id,
"run_id": self.created_run.id,
"role": "assistant",
"status": "completed",
"content": [
{
"type": "text",
"text": {"annotations": [], "value": "Hello! How can I help?"},
}
],
}
)
self.state_store.beta.threads.messages.put(assistant_message)
yield self.event("thread.message.created", assistant_message)

self.created_run.status = "completed"
self.state_store.beta.threads.runs.put(self.created_run)
yield self.event("thread.run.completed", self.created_run)


def create_run_stream_response(
request: Request,
*,
thread_id: str,
state_store: StateStore,
) -> Response:
# NOTE: creating run this way does not currently inherit config from assistant
created_run = run_from_create_request(
thread_id,
request,
extra={"model": "gpt-4-turbo", "tools": [{"type": "code_interpreter"}]},
)
stream = CreateRunEventStream(created_run, state_store)
return Response(201, content=stream)


@pytest.mark.asyncio
@openai_responses.mock()
async def test_handle_stream(openai_mock: OpenAIMock):
openai_mock.beta.threads.runs.create.response = create_run_stream_response

client = openai.AsyncClient(api_key="sk-fake123")

assistant = await client.beta.assistants.create(
instructions="You are a personal math tutor. When asked a question, write and run Python code to answer the question.",
name="Math Tutor",
tools=[{"type": "code_interpreter"}],
model="gpt-4-turbo",
)

thread = await client.beta.threads.create()

await client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=[{"type": "text", "text": "Hello!"}],
)

async with client.beta.threads.runs.stream(
thread_id=thread.id,
assistant_id=assistant.id,
event_handler=EventHandler(),
) as stream:
await stream.until_done()
run = stream.current_run
assert run

messages = await client.beta.threads.messages.list(thread.id)

global event_count
assert event_count == 4
assert len(messages.data) == 2

assert openai_mock.beta.threads.runs.create.calls.call_count == 1
39 changes: 39 additions & 0 deletions examples/test_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import openai
from openai import DefaultHttpxClient

from openai_responses import OpenAIMock
from openai_responses.ext.httpx import MockTransport


def test_create_chat_completion():
openai_mock = OpenAIMock()
openai_mock.chat.completions.create.response = {
"choices": [
{
"index": 0,
"finish_reason": "stop",
"message": {
"content": "Hello! How can I help?",
"role": "assistant",
},
}
]
}

client = openai.Client(
api_key="sk-fake123",
http_client=DefaultHttpxClient(
transport=MockTransport(openai_mock.router.handler)
),
)
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"},
],
)

assert len(completion.choices) == 1
assert completion.choices[0].message.content == "Hello! How can I help?"
assert openai_mock.chat.completions.create.calls.call_count == 1
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ strictSetInference = true
reportMissingTypeStubs = false
reportUnusedImport = "error"
reportPrivateUsage = "none"
reportWildcardImportFromLibrary = "none"
21 changes: 6 additions & 15 deletions src/openai_responses/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
from httpx import Request, Response
from respx import Route

from ._api import mock
from ._mock import OpenAIMock
from ._stores import StateStore

__all__ = [
# main API
"mock",
# internal types
"OpenAIMock",
"StateStore",
# external types
"Request",
"Response",
"Route",
]
from . import ext
from . import helpers
from . import stores
from . import streaming

__all__ = ["mock", "OpenAIMock", "ext", "helpers", "stores", "streaming"]
2 changes: 1 addition & 1 deletion src/openai_responses/_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Awaitable, Callable, Optional, Union

from ._mock import OpenAIMock
from ._stores import StateStore
from .stores import StateStore

WrappedFn = Callable[..., Union[Callable[..., Any], Awaitable[Callable[..., Any]]]]

Expand Down
Loading
Loading