diff --git a/examples/test_custom_response_handler.py b/examples/test_custom_response_handler.py index 77f0cd8..e364c9f 100644 --- a/examples/test_custom_response_handler.py +++ b/examples/test_custom_response_handler.py @@ -1,7 +1,9 @@ import openai import openai_responses -from openai_responses import OpenAIMock, Request, Response, Route +from openai_responses import OpenAIMock +from openai_responses.ext.httpx import Request, Response +from openai_responses.ext.respx import Route from openai_responses.helpers.builders.chat import chat_completion_from_create_request diff --git a/examples/test_raw_httpx_response.py b/examples/test_raw_httpx_response.py index e6a4b35..2c63f60 100644 --- a/examples/test_raw_httpx_response.py +++ b/examples/test_raw_httpx_response.py @@ -4,7 +4,8 @@ from openai import APIStatusError import openai_responses -from openai_responses import OpenAIMock, Response +from openai_responses import OpenAIMock +from openai_responses.ext.httpx import Response @openai_responses.mock() diff --git a/examples/test_router_usage.py b/examples/test_router_usage.py index e00e941..9e45b36 100644 --- a/examples/test_router_usage.py +++ b/examples/test_router_usage.py @@ -1,11 +1,13 @@ import json -import httpx import openai from openai.types.beta.threads.run_submit_tool_outputs_params import ToolOutput import openai_responses -from openai_responses import OpenAIMock, Request, Response, Route, StateStore +from openai_responses import OpenAIMock +from openai_responses.stores import StateStore +from openai_responses.ext.httpx import Request, Response, post +from openai_responses.ext.respx import Route from openai_responses.helpers.mergers.runs import merge_run_with_partial @@ -115,7 +117,7 @@ def test_external_api_mock(openai_mock: OpenAIMock): tool_outputs: list[ToolOutput] = [] for tool in run.required_action.submit_tool_outputs.tool_calls: if tool.function.name == "get_current_temperature": - res = httpx.post( + res = post( url="https://api.myweatherapi.com", json=json.loads(tool.function.arguments), ) diff --git a/examples/test_run_steps.py b/examples/test_run_steps.py index 67f5e07..18afbcd 100644 --- a/examples/test_run_steps.py +++ b/examples/test_run_steps.py @@ -4,7 +4,6 @@ from openai_responses import OpenAIMock from openai_responses.helpers.builders.messages import build_message from openai_responses.helpers.builders.run_steps import build_run_step -from openai_responses.helpers.state_store import add_resource_to_state_store @openai_responses.mock() @@ -60,8 +59,8 @@ def test_list_run_steps(openai_mock: OpenAIMock): }, } ) - add_resource_to_state_store(assistant_message, mock=openai_mock) - add_resource_to_state_store(run_step, mock=openai_mock) + openai_mock.state.beta.threads.messages.put(assistant_message) + openai_mock.state.beta.threads.runs.steps.put(run_step) steps = client.beta.threads.runs.steps.list(run.id, thread_id=thread.id) diff --git a/examples/test_streaming.py b/examples/test_streaming.py new file mode 100644 index 0000000..c4d3f1d --- /dev/null +++ b/examples/test_streaming.py @@ -0,0 +1,122 @@ +from typing import Generator +from typing_extensions import override + +import openai +from openai import AssistantEventHandler +from openai.types.beta import AssistantStreamEvent +from openai.types.beta.threads import Run + +import openai_responses +from openai_responses import OpenAIMock +from openai_responses.stores import StateStore +from openai_responses.streaming import EventStream, Event +from openai_responses.helpers.builders.runs import run_from_create_request +from openai_responses.helpers.builders.messages import build_message +from openai_responses.ext.httpx import Request, Response + +event_count = 0 + + +class EventHandler(AssistantEventHandler): + @override + def on_event(self, event: AssistantStreamEvent) -> None: + global event_count + if ( + event.event == "thread.run.created" + or event.event == "thread.run.in_progress" + or event.event == "thread.message.created" + or event.event == "thread.run.completed" + ): + event_count += 1 + + +class CreateRunEventStream(EventStream): + def __init__(self, created_run: Run, state_store: StateStore) -> None: + self.created_run = created_run + self.state_store = state_store + + @override + def generate(self) -> Generator[Event, None, None]: + self.state_store.beta.threads.runs.put(self.created_run) + yield self.event("thread.run.created", self.created_run) + + self.created_run.status = "in_progress" + self.state_store.beta.threads.runs.put(self.created_run) + yield self.event("thread.run.in_progress", self.created_run) + + assistant_message = build_message( + { + "assistant_id": self.created_run.assistant_id, + "thread_id": self.created_run.thread_id, + "run_id": self.created_run.id, + "role": "assistant", + "status": "completed", + "content": [ + { + "type": "text", + "text": {"annotations": [], "value": "Hello! How can I help?"}, + } + ], + } + ) + self.state_store.beta.threads.messages.put(assistant_message) + yield self.event("thread.message.created", assistant_message) + + self.created_run.status = "completed" + self.state_store.beta.threads.runs.put(self.created_run) + yield self.event("thread.run.completed", self.created_run) + + +def create_run_stream_response( + request: Request, + *, + thread_id: str, + state_store: StateStore, +) -> Response: + # NOTE: creating run this way does not currently inherit config from assistant + created_run = run_from_create_request( + thread_id, + request, + extra={"model": "gpt-4-turbo", "tools": [{"type": "code_interpreter"}]}, + ) + stream = CreateRunEventStream(created_run, state_store) + return Response(201, content=stream) + + +@openai_responses.mock() +def test_handle_stream(openai_mock: OpenAIMock): + openai_mock.beta.threads.runs.create.response = create_run_stream_response + + client = openai.Client(api_key="sk-fake123") + + assistant = client.beta.assistants.create( + instructions="You are a personal math tutor. When asked a question, write and run Python code to answer the question.", + name="Math Tutor", + tools=[{"type": "code_interpreter"}], + model="gpt-4-turbo", + ) + + thread = client.beta.threads.create() + + client.beta.threads.messages.create( + thread_id=thread.id, + role="user", + content=[{"type": "text", "text": "Hello!"}], + ) + + with client.beta.threads.runs.stream( + thread_id=thread.id, + assistant_id=assistant.id, + event_handler=EventHandler(), + ) as stream: + stream.until_done() + run = stream.current_run + assert run + + messages = client.beta.threads.messages.list(thread.id) + + global event_count + assert event_count == 4 + assert len(messages.data) == 2 + + assert openai_mock.beta.threads.runs.create.calls.call_count == 1 diff --git a/examples/test_streaming_async.py b/examples/test_streaming_async.py new file mode 100644 index 0000000..a8b88d3 --- /dev/null +++ b/examples/test_streaming_async.py @@ -0,0 +1,125 @@ +from typing import Generator +from typing_extensions import override + +import pytest + +import openai +from openai import AsyncAssistantEventHandler +from openai.types.beta import AssistantStreamEvent +from openai.types.beta.threads import Run + +import openai_responses +from openai_responses import OpenAIMock +from openai_responses.stores import StateStore +from openai_responses.streaming import AsyncEventStream, Event +from openai_responses.helpers.builders.runs import run_from_create_request +from openai_responses.helpers.builders.messages import build_message +from openai_responses.ext.httpx import Request, Response + +event_count = 0 + + +class EventHandler(AsyncAssistantEventHandler): + @override + async def on_event(self, event: AssistantStreamEvent) -> None: + global event_count + if ( + event.event == "thread.run.created" + or event.event == "thread.run.in_progress" + or event.event == "thread.message.created" + or event.event == "thread.run.completed" + ): + event_count += 1 + + +class CreateRunEventStream(AsyncEventStream): + def __init__(self, created_run: Run, state_store: StateStore) -> None: + self.created_run = created_run + self.state_store = state_store + + @override + def generate(self) -> Generator[Event, None, None]: + self.state_store.beta.threads.runs.put(self.created_run) + yield self.event("thread.run.created", self.created_run) + + self.created_run.status = "in_progress" + self.state_store.beta.threads.runs.put(self.created_run) + yield self.event("thread.run.in_progress", self.created_run) + + assistant_message = build_message( + { + "assistant_id": self.created_run.assistant_id, + "thread_id": self.created_run.thread_id, + "run_id": self.created_run.id, + "role": "assistant", + "status": "completed", + "content": [ + { + "type": "text", + "text": {"annotations": [], "value": "Hello! How can I help?"}, + } + ], + } + ) + self.state_store.beta.threads.messages.put(assistant_message) + yield self.event("thread.message.created", assistant_message) + + self.created_run.status = "completed" + self.state_store.beta.threads.runs.put(self.created_run) + yield self.event("thread.run.completed", self.created_run) + + +def create_run_stream_response( + request: Request, + *, + thread_id: str, + state_store: StateStore, +) -> Response: + # NOTE: creating run this way does not currently inherit config from assistant + created_run = run_from_create_request( + thread_id, + request, + extra={"model": "gpt-4-turbo", "tools": [{"type": "code_interpreter"}]}, + ) + stream = CreateRunEventStream(created_run, state_store) + return Response(201, content=stream) + + +@pytest.mark.asyncio +@openai_responses.mock() +async def test_handle_stream(openai_mock: OpenAIMock): + openai_mock.beta.threads.runs.create.response = create_run_stream_response + + client = openai.AsyncClient(api_key="sk-fake123") + + assistant = await client.beta.assistants.create( + instructions="You are a personal math tutor. When asked a question, write and run Python code to answer the question.", + name="Math Tutor", + tools=[{"type": "code_interpreter"}], + model="gpt-4-turbo", + ) + + thread = await client.beta.threads.create() + + await client.beta.threads.messages.create( + thread_id=thread.id, + role="user", + content=[{"type": "text", "text": "Hello!"}], + ) + + async with client.beta.threads.runs.stream( + thread_id=thread.id, + assistant_id=assistant.id, + event_handler=EventHandler(), + ) as stream: + await stream.until_done() + run = stream.current_run + assert run + + messages = await client.beta.threads.messages.list(thread.id) + + global event_count + assert event_count == 4 + assert len(messages.data) == 2 + + assert openai_mock.beta.threads.runs.create.calls.call_count == 1 diff --git a/examples/test_transport.py b/examples/test_transport.py new file mode 100644 index 0000000..3a991b1 --- /dev/null +++ b/examples/test_transport.py @@ -0,0 +1,39 @@ +import openai +from openai import DefaultHttpxClient + +from openai_responses import OpenAIMock +from openai_responses.ext.httpx import MockTransport + + +def test_create_chat_completion(): + openai_mock = OpenAIMock() + openai_mock.chat.completions.create.response = { + "choices": [ + { + "index": 0, + "finish_reason": "stop", + "message": { + "content": "Hello! How can I help?", + "role": "assistant", + }, + } + ] + } + + client = openai.Client( + api_key="sk-fake123", + http_client=DefaultHttpxClient( + transport=MockTransport(openai_mock.router.handler) + ), + ) + completion = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Hello!"}, + ], + ) + + assert len(completion.choices) == 1 + assert completion.choices[0].message.content == "Hello! How can I help?" + assert openai_mock.chat.completions.create.calls.call_count == 1 diff --git a/pyproject.toml b/pyproject.toml index 5704349..3c4fd2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,3 +49,4 @@ strictSetInference = true reportMissingTypeStubs = false reportUnusedImport = "error" reportPrivateUsage = "none" +reportWildcardImportFromLibrary = "none" diff --git a/src/openai_responses/__init__.py b/src/openai_responses/__init__.py index 149a32a..e4bdc2a 100644 --- a/src/openai_responses/__init__.py +++ b/src/openai_responses/__init__.py @@ -1,18 +1,9 @@ -from httpx import Request, Response -from respx import Route - from ._api import mock from ._mock import OpenAIMock -from ._stores import StateStore -__all__ = [ - # main API - "mock", - # internal types - "OpenAIMock", - "StateStore", - # external types - "Request", - "Response", - "Route", -] +from . import ext +from . import helpers +from . import stores +from . import streaming + +__all__ = ["mock", "OpenAIMock", "ext", "helpers", "stores", "streaming"] diff --git a/src/openai_responses/_api.py b/src/openai_responses/_api.py index 2e9aad7..4b3ac29 100644 --- a/src/openai_responses/_api.py +++ b/src/openai_responses/_api.py @@ -1,7 +1,7 @@ from typing import Any, Awaitable, Callable, Optional, Union from ._mock import OpenAIMock -from ._stores import StateStore +from .stores import StateStore WrappedFn = Callable[..., Union[Callable[..., Any], Awaitable[Callable[..., Any]]]] diff --git a/src/openai_responses/_mock.py b/src/openai_responses/_mock.py index 33b8f45..295db96 100644 --- a/src/openai_responses/_mock.py +++ b/src/openai_responses/_mock.py @@ -5,7 +5,7 @@ import respx from ._routes import BetaRoutes, ChatRoutes, EmbeddingsRoutes, FileRoutes -from ._stores import StateStore +from .stores import StateStore class OpenAIMock: @@ -33,6 +33,11 @@ def router(self) -> respx.MockRouter: """[RESPX](https://lundberg.github.io/respx) router with patched OpenAI routes""" return self._router + @property + def state(self) -> StateStore: + """State store for API resources""" + return self._state + def _start_mock(self): def wrapper(fn: Callable[..., Any]): is_async = inspect.iscoroutinefunction(fn) diff --git a/src/openai_responses/_routes/__init__.py b/src/openai_responses/_routes/__init__.py index 362560e..2a9bb5c 100644 --- a/src/openai_responses/_routes/__init__.py +++ b/src/openai_responses/_routes/__init__.py @@ -1,6 +1,6 @@ import respx -from .._stores import StateStore +from ..stores import StateStore from .chat import ChatCompletionsCreateRoute from .embeddings import EmbeddingsCreateRoute diff --git a/src/openai_responses/_routes/_base.py b/src/openai_responses/_routes/_base.py index 19489da..b78894f 100644 --- a/src/openai_responses/_routes/_base.py +++ b/src/openai_responses/_routes/_base.py @@ -9,7 +9,7 @@ from openai import BaseModel -from .._stores import StateStore +from ..stores import StateStore from .._types.generics import M, P from .._utils.serde import model_dict @@ -133,7 +133,9 @@ def __init__( def _side_effect(self) -> Callable[..., httpx.Response]: if callable(self._response): argspec = inspect.getfullargspec(self._response) - needs_store = "state_store" in argspec.args + needs_store = ( + "state_store" in argspec.args or "state_store" in argspec.kwonlyargs + ) if needs_store: return partial(self._response, state_store=self._state) else: diff --git a/src/openai_responses/_routes/assistants.py b/src/openai_responses/_routes/assistants.py index 32f0df0..97ef9ce 100644 --- a/src/openai_responses/_routes/assistants.py +++ b/src/openai_responses/_routes/assistants.py @@ -11,7 +11,7 @@ from ._base import StatefulRoute -from .._stores import StateStore +from ..stores import StateStore from .._types.partials.assistants import ( PartialAssistant, PartialAssistantList, diff --git a/src/openai_responses/_routes/files.py b/src/openai_responses/_routes/files.py index 18898b5..84b093a 100644 --- a/src/openai_responses/_routes/files.py +++ b/src/openai_responses/_routes/files.py @@ -11,7 +11,7 @@ from ._base import StatefulRoute -from .._stores import StateStore +from ..stores import StateStore from .._types.partials.files import ( PartialFileObject, PartialFileList, diff --git a/src/openai_responses/_routes/messages.py b/src/openai_responses/_routes/messages.py index c9665cc..b48a612 100644 --- a/src/openai_responses/_routes/messages.py +++ b/src/openai_responses/_routes/messages.py @@ -11,7 +11,7 @@ from ._base import StatefulRoute -from .._stores import StateStore +from ..stores import StateStore from .._types.partials.messages import ( PartialMessage, PartialMessageList, @@ -74,13 +74,29 @@ def _build(partial: PartialMessage, request: httpx.Request) -> Message: "status": "completed", } if content.get("content"): - defaults["content"].append( - { - "type": "text", - "text": {"annotations": [], "value": content.get("content")}, - } - ) + value = content.get("content") + if isinstance(value, str): + defaults["content"].append( + { + "type": "text", + "text": {"annotations": [], "value": value}, + } + ) + else: + for block in value: + if block.get("type") == "text": + defaults["content"].append( + { + "type": "text", + "text": { + "annotations": [], + "value": block.get("text"), + }, + } + ) + del content["content"] + return model_parse(Message, defaults | partial | content) diff --git a/src/openai_responses/_routes/run_steps.py b/src/openai_responses/_routes/run_steps.py index cc864b3..4382223 100644 --- a/src/openai_responses/_routes/run_steps.py +++ b/src/openai_responses/_routes/run_steps.py @@ -9,7 +9,7 @@ from ._base import StatefulRoute -from .._stores import StateStore +from ..stores import StateStore from .._types.partials.run_steps import PartialRunStep, PartialRunStepList from .._utils.serde import model_dict diff --git a/src/openai_responses/_routes/runs.py b/src/openai_responses/_routes/runs.py index b88e737..4d33332 100644 --- a/src/openai_responses/_routes/runs.py +++ b/src/openai_responses/_routes/runs.py @@ -16,7 +16,7 @@ from ..helpers.builders.messages import message_from_create_request from ..helpers.builders.threads import thread_from_create_request -from .._stores import StateStore +from ..stores import StateStore from .._types.partials.runs import PartialRun, PartialRunList from .._utils.copy import model_copy diff --git a/src/openai_responses/_routes/threads.py b/src/openai_responses/_routes/threads.py index 42faab2..2fb4371 100644 --- a/src/openai_responses/_routes/threads.py +++ b/src/openai_responses/_routes/threads.py @@ -14,7 +14,7 @@ from ..helpers.builders.messages import message_from_create_request -from .._stores import StateStore +from ..stores import StateStore from .._types.partials.threads import PartialThread, PartialThreadDeleted from .._utils.faker import faker diff --git a/src/openai_responses/_utils/aio.py b/src/openai_responses/_utils/aio.py new file mode 100644 index 0000000..c21b6bc --- /dev/null +++ b/src/openai_responses/_utils/aio.py @@ -0,0 +1,14 @@ +import asyncio +from typing import AsyncGenerator, Generator, TypeVar + +T = TypeVar("T") + +__all__ = ["make_async_generator"] + + +async def make_async_generator( + sync_gen: Generator[T, None, None] +) -> AsyncGenerator[T, None]: + for value in sync_gen: + await asyncio.sleep(0) + yield value diff --git a/src/openai_responses/ext/__init__.py b/src/openai_responses/ext/__init__.py new file mode 100644 index 0000000..55e2bbd --- /dev/null +++ b/src/openai_responses/ext/__init__.py @@ -0,0 +1,4 @@ +import httpx +import respx + +__all__ = ["httpx", "respx"] diff --git a/src/openai_responses/ext/httpx.py b/src/openai_responses/ext/httpx.py new file mode 100644 index 0000000..0617822 --- /dev/null +++ b/src/openai_responses/ext/httpx.py @@ -0,0 +1 @@ +from httpx import * # noqa: F403 diff --git a/src/openai_responses/ext/respx.py b/src/openai_responses/ext/respx.py new file mode 100644 index 0000000..138d3f3 --- /dev/null +++ b/src/openai_responses/ext/respx.py @@ -0,0 +1 @@ +from respx import * # noqa: F403 diff --git a/src/openai_responses/helpers/state_store.py b/src/openai_responses/helpers/state_store.py deleted file mode 100644 index db544f6..0000000 --- a/src/openai_responses/helpers/state_store.py +++ /dev/null @@ -1,39 +0,0 @@ -from typing import Optional - -from .._mock import OpenAIMock -from .._stores.state_store import Resource, StateStore - - -def add_resource_to_state_store( - resource: Resource, - *, - mock: Optional[OpenAIMock] = None, - state_store: Optional[StateStore] = None, -): - """Add a resource to the state store being used for a test. If an object with the same resource - ID already exists in the state store then it will be overwritten. - - Args: - resource (Resource): An OpenAI resource - mock (Optional[OpenAIMock], optional): Mock associated with test. Defaults to None. - state_store (Optional[StateStore], optional): State store associated with test. Defaults to None. - - Raises: - ValueError: If neither mock or state store are provided - ValueError: If both mock and state store are provided - """ - if not mock and not state_store: - raise ValueError( - "Either a mock instance or a state store instance must be provided" - ) - - if mock and state_store: - raise ValueError( - "Only one of mock instance or state store instance should be provided not both" - ) - - if mock: - mock._state._blind_put(resource) - - if state_store: - state_store._blind_put(resource) diff --git a/src/openai_responses/_stores/__init__.py b/src/openai_responses/stores/__init__.py similarity index 100% rename from src/openai_responses/_stores/__init__.py rename to src/openai_responses/stores/__init__.py diff --git a/src/openai_responses/_stores/state_store.py b/src/openai_responses/stores/state_store.py similarity index 100% rename from src/openai_responses/_stores/state_store.py rename to src/openai_responses/stores/state_store.py diff --git a/src/openai_responses/streaming.py b/src/openai_responses/streaming.py new file mode 100644 index 0000000..195848f --- /dev/null +++ b/src/openai_responses/streaming.py @@ -0,0 +1,169 @@ +import json +from dataclasses import dataclass +from typing import AsyncIterator, Generator, Literal, Optional, Tuple, Union, overload + +from openai.types import ErrorObject +from openai.types.chat.chat_completion_chunk import ChatCompletionChunk + +from openai.types.beta.thread import Thread +from openai.types.beta.threads.message import Message +from openai.types.beta.threads.message_delta import MessageDelta +from openai.types.beta.threads.run import Run +from openai.types.beta.threads.runs.run_step import RunStep +from openai.types.beta.threads.runs.run_step_delta import RunStepDelta + +from ._utils.aio import make_async_generator +from ._utils.serde import model_dict + +__all__ = ["Event", "EventStream", "AsyncEventStream"] + +EventType = Literal[ + "thread.created", + "thread.run.created", + "thread.run.queued", + "thread.run.in_progress", + "thread.run.requires_action", + "thread.run.completed", + "thread.run.incomplete", + "thread.run.failed", + "thread.run.cancelling", + "thread.run.cancelled", + "thread.run.expired", + "thread.run.step.created", + "thread.run.step.in_progress", + "thread.run.step.delta", + "thread.run.step.completed", + "thread.run.step.failed", + "thread.run.step.cancelled", + "thread.run.step.expired", + "thread.message.created", + "thread.message.in_progress", + "thread.message.delta", + "thread.message.completed", + "thread.message.incomplete", + "error", +] +EventData = Union[ + ChatCompletionChunk, + Thread, + Run, + RunStep, + RunStepDelta, + Message, + MessageDelta, + ErrorObject, +] + + +@dataclass(frozen=True) +class Event: + event: Optional[EventType] + data: EventData + + def to_sse_event(self) -> Tuple[Optional[bytes], Optional[bytes]]: + encoded_event = f"event: {self.event}\n".encode() if self.event else None + encoded_data = f"data: {json.dumps(model_dict(self.data))}\n\n".encode() + return encoded_event, encoded_data + + +class BaseEventStream: + @overload + def event(self, event: Literal["thread.created"], data: Thread) -> Event: ... + + @overload + def event( + self, + event: Literal[ + "thread.run.created", + "thread.run.queued", + "thread.run.in_progress", + "thread.run.requires_action", + "thread.run.completed", + "thread.run.incomplete", + "thread.run.failed", + "thread.run.cancelling", + "thread.run.cancelled", + "thread.run.expired", + ], + data: Run, + ) -> Event: ... + + @overload + def event( + self, + event: Literal[ + "thread.run.step.created", + "thread.run.step.in_progress", + "thread.run.step.completed", + "thread.run.step.failed", + "thread.run.step.cancelled", + "thread.run.step.expired", + ], + data: RunStep, + ) -> Event: ... + + @overload + def event( + self, event: Literal["thread.run.step.delta"], data: RunStepDelta + ) -> Event: ... + + @overload + def event( + self, + event: Literal[ + "thread.message.created", + "thread.message.in_progress", + "thread.message.completed", + "thread.message.incomplete", + ], + data: Message, + ) -> Event: ... + + @overload + def event( + self, event: Literal["thread.message.delta"], data: MessageDelta + ) -> Event: ... + + @overload + def event(self, event: Literal["error"], data: ErrorObject) -> Event: ... + + @overload + def event(self, event: None, data: ChatCompletionChunk) -> Event: ... + + def event(self, event: Optional[EventType], data: EventData) -> Event: + """ + Create a server sent event payload with optional event.type and event.data payloads + """ + return Event(event, data) + + def generate(self) -> Generator[Event, None, None]: + raise NotImplementedError + + +class EventStream(BaseEventStream): + """Event stream protocol for building mock OpenAI server sent event stream""" + + def __iter__(self) -> Generator[bytes, None, None]: + for _event in self.generate(): + t, d = _event.to_sse_event() + if t: + yield t + if d: + yield d + + yield b"event: done\n" + yield b"data: [DONE]\n\n" + + +class AsyncEventStream(BaseEventStream): + """Async event stream protocol for building mock OpenAI server sent event stream""" + + async def __aiter__(self) -> AsyncIterator[bytes]: + async for _event in make_async_generator(self.generate()): + t, d = _event.to_sse_event() + if t: + yield t + if d: + yield d + yield b"event: done\n" + yield b"data: [DONE]\n\n" diff --git a/tests/unit/test_state_store.py b/tests/unit/test_state_store.py index 5bf696a..ca94b27 100644 --- a/tests/unit/test_state_store.py +++ b/tests/unit/test_state_store.py @@ -5,7 +5,7 @@ from openai.types.beta.threads.message import Message from openai.types.beta.threads.run import Run -from openai_responses import StateStore +from openai_responses.stores import StateStore @pytest.fixture