-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add sequence number for ai stream objects #1597
Conversation
needed to handle out of order
WalkthroughThe changes introduce enhancements to the data models related to AI interactions by adding a sequence field to the Changes
Possibly related PRs
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (8)
lib/console/ai/stream.ex (1)
12-17
: Consider adding type specs and validationSince this function is critical for message ordering, consider:
- Adding a type spec to document the expected delta structure
- Adding runtime validation for the required fields
+ @type delta :: %{required(:content) :: String.t(), required(:seq) :: integer()} + @spec publish(%__MODULE__{topic: String.t()}, delta()) :: :ok def publish(%__MODULE__{topic: topic}, delta) when is_binary(topic) do + unless is_map(delta) and is_integer(delta[:seq]) and is_binary(delta[:content]) do + raise ArgumentError, "delta must contain :content (string) and :seq (integer)" + end Absinthe.Subscription.publish( ConsoleWeb.Endpoint, delta, [ai_stream: topic] ) endtest/console_web/channels/graphql/ai_subscription_test.exs (2)
20-21
: LGTM, but test coverage could be improved.While the change correctly updates the
Stream.publish
call to include the sequence number, the test doesn't verify that this field is properly transmitted. Consider updating the GraphQL subscription query to include theseq
field and verify it in the assertion.- subscription AI($threadId: ID) { - aiStream(threadId: $threadId) { content } - } + subscription AI($threadId: ID) { + aiStream(threadId: $threadId) { + content + seq + } + } -assert_push("subscription:data", %{result: %{data: %{"aiStream" => %{"content" => "something"}}}}) +assert_push("subscription:data", %{result: %{data: %{"aiStream" => %{"content" => "something", "seq" => 1}}}})
Line range hint
1-24
: Add test cases for out-of-order message handling.Given that the PR's main objective is to handle out-of-order processing, it would be valuable to add test cases that verify this functionality. Consider adding tests that:
- Send messages with non-sequential sequence numbers
- Verify correct ordering on the receiving end
- Test edge cases like duplicate sequence numbers or gaps
Here's a suggested test case structure:
test "handles out-of-order messages correctly" do user = insert(:user) thread = insert(:chat_thread) {:ok, socket} = establish_socket(user) ref = push_doc(socket, """ subscription AI($threadId: ID) { aiStream(threadId: $threadId) { content seq } } """, variables: %{"threadId" => thread.id}) assert_reply(ref, :ok, %{subscriptionId: _}) stream = %Stream{topic: Stream.topic(:thread, thread.id, user)} # Send messages out of order Stream.publish(stream, %{content: "second", seq: 2}) Stream.publish(stream, %{content: "first", seq: 1}) Stream.publish(stream, %{content: "third", seq: 3}) # Verify they are received in correct order assert_push("subscription:data", %{result: %{data: %{"aiStream" => %{"content" => "first", "seq" => 1}}}}) assert_push("subscription:data", %{result: %{data: %{"aiStream" => %{"content" => "second", "seq" => 2}}}}) assert_push("subscription:data", %{result: %{data: %{"aiStream" => %{"content" => "third", "seq" => 3}}}}) endlib/console/ai/stream/exec.ex (3)
14-21
: Consider adding type specs for better documentationThe implementation correctly includes sequence numbers in the published messages. Consider adding a type spec for better documentation and compile-time checks:
@type event_acc :: list(String.t()) @spec exec(fun(), AIStream.t(), (map() -> String.t() | :pass)) :: {:ok, String.t()} | {:error, String.t()}
22-22
: Consider adding error loggingThe error handling correctly matches the new indexed structure. Consider adding error logging for better debugging capabilities:
- {{:error, error}, _}, _ -> {:halt, {:error, "ai service error: #{inspect(error)}"}} + {{:error, error}, _}, _ -> + Logger.error("AI service error: #{inspect(error)}") + {:halt, {:error, "ai service error: #{inspect(error)}"}}
11-22
: Well-designed solution for handling message orderingThe implementation elegantly solves the out-of-order processing problem by:
- Using Elixir's built-in
Stream.with_index()
for efficient sequence number generation- Maintaining the stream's lazy evaluation properties
- Seamlessly integrating sequence numbers into the existing message structure
This approach ensures message ordering can be tracked without compromising the system's performance or complexity.
lib/console/graphql/ai.ex (1)
119-119
: LGTM! The sequence field addition looks good.The non-null integer field for sequence numbers is appropriate for handling message ordering in AI streams.
Since this sequence number is crucial for handling out-of-order processing:
- Ensure the client-side code buffers and reorders messages based on this sequence number
- Consider implementing a window-based approach for handling severely delayed messages (e.g., dropping messages that are too old)
- Think about error handling when sequence gaps are detected
go/client/models_gen.go (1)
Line range hint
1-1
: Note: This is a generated fileAs indicated by the filename suffix
_gen.go
and the header comment, this is a generated file. Any changes should be made to the generator rather than manually editing this file.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
assets/src/generated/graphql.ts
is excluded by!**/generated/**
📒 Files selected for processing (6)
go/client/models_gen.go
(1 hunks)lib/console/ai/stream.ex
(1 hunks)lib/console/ai/stream/exec.ex
(1 hunks)lib/console/graphql/ai.ex
(1 hunks)schema/schema.graphql
(1 hunks)test/console_web/channels/graphql/ai_subscription_test.exs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- schema/schema.graphql
🔇 Additional comments (5)
lib/console/ai/stream.ex (1)
12-12
: LGTM: Improved parameter naming
The rename from chunk
to delta
better reflects the nature of the data being handled, especially now that it includes sequence information for ordering.
lib/console/ai/stream/exec.ex (1)
11-13
: LGTM: Stream indexing implementation is correct
The addition of Stream.with_index()
provides a clean way to generate sequence numbers while maintaining the stream's lazy evaluation properties.
lib/console/graphql/ai.ex (1)
Line range hint 258-271
: Verify subscription handling with sequence numbers.
The subscription configuration looks good, but we should verify that the Stream module correctly handles sequence numbers when publishing messages.
go/client/models_gen.go (2)
131-134
: LGTM! Well-structured AiDelta type
The AiDelta struct is well-designed with appropriate field types and JSON tags. The int64 type for Seq will handle large sequence numbers effectively.
Line range hint 1002-1010
: LGTM! Consistent sequence field addition
The Seq field addition to the Chat struct maintains consistency with AiDelta and follows proper Go struct field conventions.
needed to handle out of order
Test Plan
n/a
Checklist
Summary by CodeRabbit
New Features
seq
field to theai_delta
object in the GraphQL schema for better tracking of AI-generated messages.Bug Fixes
exec
function to ensure robust message publishing.Tests