diff --git a/assets/src/generated/graphql.ts b/assets/src/generated/graphql.ts index b7d400d27d..41fa386b8f 100644 --- a/assets/src/generated/graphql.ts +++ b/assets/src/generated/graphql.ts @@ -178,6 +178,7 @@ export type AgentMigrationAttributes = { export type AiDelta = { __typename?: 'AiDelta'; content: Scalars['String']['output']; + seq: Scalars['Int']['output']; }; /** A representation of a LLM-derived insight */ diff --git a/go/client/models_gen.go b/go/client/models_gen.go index 2a75a98d2a..9197aaf94a 100644 --- a/go/client/models_gen.go +++ b/go/client/models_gen.go @@ -128,6 +128,7 @@ type AgentMigrationAttributes struct { } type AiDelta struct { + Seq int64 `json:"seq"` Content string `json:"content"` } diff --git a/lib/console/ai/stream.ex b/lib/console/ai/stream.ex index 795eab2074..c0ecc6ff64 100644 --- a/lib/console/ai/stream.ex +++ b/lib/console/ai/stream.ex @@ -9,10 +9,10 @@ defmodule Console.AI.Stream do def stream(), do: Process.get(@stream) - def publish(%__MODULE__{topic: topic}, chunk) when is_binary(topic) do + def publish(%__MODULE__{topic: topic}, delta) when is_binary(topic) do Absinthe.Subscription.publish( ConsoleWeb.Endpoint, - %{content: chunk}, + delta, [ai_stream: topic] ) end diff --git a/lib/console/ai/stream/exec.ex b/lib/console/ai/stream/exec.ex index b458765075..9a0763afaa 100644 --- a/lib/console/ai/stream/exec.ex +++ b/lib/console/ai/stream/exec.ex @@ -8,16 +8,18 @@ defmodule Console.AI.Stream.Exec do defp handle_openai(_), do: :pass defp exec(fun, %AIStream{} = stream, reducer) when is_function(reducer, 1) do - Enum.reduce_while(build_stream(fun), [], fn - %AIStream.SSE.Event{data: data}, acc -> + build_stream(fun) + |> Stream.with_index() + |> Enum.reduce_while([], fn + {%AIStream.SSE.Event{data: data}, ind}, acc -> case reducer.(data) do c when is_binary(c) -> - AIStream.publish(stream, c) + AIStream.publish(stream, %{content: c, seq: ind}) {:cont, [c | acc]} _ -> {:cont, acc} end - {:error, error}, _ -> {:halt, {:error, "ai service error: #{inspect(error)}"}} + {{:error, error}, _}, _ -> {:halt, {:error, "ai service error: #{inspect(error)}"}} _, acc -> {:cont, acc} end) diff --git a/lib/console/graphql/ai.ex b/lib/console/graphql/ai.ex index 85e9c5bf95..fd9df88d12 100644 --- a/lib/console/graphql/ai.ex +++ b/lib/console/graphql/ai.ex @@ -116,6 +116,7 @@ defmodule Console.GraphQl.AI do end object :ai_delta do + field :seq, non_null(:integer) field :content, non_null(:string) end diff --git a/schema/schema.graphql b/schema/schema.graphql index 0142fc8110..7fba135823 100644 --- a/schema/schema.graphql +++ b/schema/schema.graphql @@ -6217,6 +6217,7 @@ type ClusterInsightComponent { } type AiDelta { + seq: Int! content: String! } diff --git a/test/console_web/channels/graphql/ai_subscription_test.exs b/test/console_web/channels/graphql/ai_subscription_test.exs index 0a19acf26f..ebba32a436 100644 --- a/test/console_web/channels/graphql/ai_subscription_test.exs +++ b/test/console_web/channels/graphql/ai_subscription_test.exs @@ -17,7 +17,7 @@ defmodule ConsoleWeb.GraphQl.AISubscriptionTest do assert_reply(ref, :ok, %{subscriptionId: _}) stream = %Stream{topic: Stream.topic(:thread, thread.id, user)} - Stream.publish(stream, "something") + Stream.publish(stream, %{content: "something", seq: 1}) assert_push("subscription:data", %{result: %{data: %{"aiStream" => %{"content" => "something"}}}}) end end