Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions service/app/core/chat/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
GeneratedFileHandler,
StreamContext,
StreamingEventHandler,
ThinkingEventHandler,
TokenStreamProcessor,
ToolEventHandler,
)
Expand Down Expand Up @@ -301,7 +302,7 @@ async def _handle_updates_mode(data: Any, ctx: StreamContext) -> AsyncGenerator[


async def _handle_messages_mode(data: Any, ctx: StreamContext) -> AsyncGenerator[StreamingEvent, None]:
"""Handle 'messages' mode events (token streaming)."""
"""Handle 'messages' mode events (token streaming and thinking content)."""
if not isinstance(data, tuple):
return

Expand Down Expand Up @@ -331,7 +332,27 @@ async def _handle_messages_mode(data: Any, ctx: StreamContext) -> AsyncGenerator
if node and node not in ("model", "agent"):
return

# Extract and emit token
# Check for thinking content first (from reasoning models like Claude, DeepSeek R1, Gemini 3)
thinking_content = ThinkingEventHandler.extract_thinking_content(message_chunk)

if thinking_content:
# Start thinking if not already
if not ctx.is_thinking:
logger.debug("Emitting thinking_start for stream_id=%s", ctx.stream_id)
ctx.is_thinking = True
yield ThinkingEventHandler.create_thinking_start(ctx.stream_id)

ctx.thinking_buffer.append(thinking_content)
yield ThinkingEventHandler.create_thinking_chunk(ctx.stream_id, thinking_content)
return

# If we were thinking but now have regular content, end thinking first
if ctx.is_thinking:
logger.debug("Emitting thinking_end for stream_id=%s", ctx.stream_id)
ctx.is_thinking = False
yield ThinkingEventHandler.create_thinking_end(ctx.stream_id)

# Extract and emit token for regular streaming
token_text = TokenStreamProcessor.extract_token_text(message_chunk)
if not token_text:
return
Expand All @@ -347,6 +368,12 @@ async def _handle_messages_mode(data: Any, ctx: StreamContext) -> AsyncGenerator

async def _finalize_streaming(ctx: StreamContext) -> AsyncGenerator[StreamingEvent, None]:
"""Finalize the streaming session."""
# If still thinking when finalizing, emit thinking_end
if ctx.is_thinking:
logger.debug("Emitting thinking_end (in finalize) for stream_id=%s", ctx.stream_id)
ctx.is_thinking = False
yield ThinkingEventHandler.create_thinking_end(ctx.stream_id)

if ctx.is_streaming:
logger.debug(
"Emitting streaming_end for stream_id=%s (total tokens: %d)",
Expand Down
111 changes: 101 additions & 10 deletions service/app/core/chat/stream_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
StreamingEndData,
StreamingEvent,
StreamingStartData,
ThinkingChunkData,
ThinkingEndData,
ThinkingStartData,
TokenUsageData,
ToolCallRequestData,
ToolCallResponseData,
Expand Down Expand Up @@ -55,6 +58,9 @@ class StreamContext:
total_input_tokens: int = 0
total_output_tokens: int = 0
total_tokens: int = 0
# Thinking/reasoning content state
is_thinking: bool = False
thinking_buffer: list[str] = field(default_factory=list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: 移除或使用 thinking_buffer,避免未使用的状态造成潜在困惑。

thinking_bufferStreamContext 中被定义,但在当前流程中从未被读取,目前只使用了 ctx.is_thinking 并直接向外流式输出 chunk。如果你计划之后聚合推理内容,建议显式消费 thinking_buffer(例如在 _finalize_streaming 中),否则可以先移除它,以保持 StreamContext 精简,并避免暗示目前不存在的行为。

Original comment in English

suggestion: Remove or use thinking_buffer to avoid unused state and potential confusion.

thinking_buffer is defined on StreamContext but never read in the current flow, which only uses ctx.is_thinking and streams chunks directly. If you plan to aggregate reasoning later, consider explicitly consuming thinking_buffer (e.g., in _finalize_streaming) or remove it for now to keep StreamContext minimal and avoid implying behavior that doesn’t exist yet.



class ToolEventHandler:
Expand All @@ -79,7 +85,7 @@ def create_tool_request_event(tool_call: dict[str, Any]) -> StreamingEvent:
"status": ToolCallStatus.EXECUTING,
"timestamp": asyncio.get_event_loop().time(),
}
return {"type": ChatEventType.TOOL_CALL_REQUEST, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.TOOL_CALL_REQUEST, "data": data}

@staticmethod
def create_tool_response_event(
Expand All @@ -101,7 +107,7 @@ def create_tool_response_event(
"status": status,
"result": result,
}
return {"type": ChatEventType.TOOL_CALL_RESPONSE, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.TOOL_CALL_RESPONSE, "data": data}


class StreamingEventHandler:
Expand All @@ -111,13 +117,13 @@ class StreamingEventHandler:
def create_streaming_start(stream_id: str) -> StreamingEvent:
"""Create streaming start event."""
data: StreamingStartData = {"id": stream_id}
return {"type": ChatEventType.STREAMING_START, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.STREAMING_START, "data": data}

@staticmethod
def create_streaming_chunk(stream_id: str, content: str) -> StreamingEvent:
"""Create streaming chunk event."""
data: StreamingChunkData = {"id": stream_id, "content": content}
return {"type": ChatEventType.STREAMING_CHUNK, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.STREAMING_CHUNK, "data": data}

@staticmethod
def create_streaming_end(stream_id: str) -> StreamingEvent:
Expand All @@ -126,7 +132,7 @@ def create_streaming_end(stream_id: str) -> StreamingEvent:
"id": stream_id,
"created_at": asyncio.get_event_loop().time(),
}
return {"type": ChatEventType.STREAMING_END, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.STREAMING_END, "data": data}

@staticmethod
def create_token_usage_event(input_tokens: int, output_tokens: int, total_tokens: int) -> StreamingEvent:
Expand All @@ -136,17 +142,102 @@ def create_token_usage_event(input_tokens: int, output_tokens: int, total_tokens
"output_tokens": output_tokens,
"total_tokens": total_tokens,
}
return {"type": ChatEventType.TOKEN_USAGE, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.TOKEN_USAGE, "data": data}

@staticmethod
def create_processing_event(status: str = ProcessingStatus.PREPARING_REQUEST) -> StreamingEvent:
"""Create processing status event."""
return {"type": ChatEventType.PROCESSING, "data": {"status": status}} # type: ignore[return-value]
return {"type": ChatEventType.PROCESSING, "data": {"status": status}}

@staticmethod
def create_error_event(error: str) -> StreamingEvent:
"""Create error event."""
return {"type": ChatEventType.ERROR, "data": {"error": error}} # type: ignore[return-value]
return {"type": ChatEventType.ERROR, "data": {"error": error}}


class ThinkingEventHandler:
"""Handle thinking/reasoning content streaming events."""

@staticmethod
def create_thinking_start(stream_id: str) -> StreamingEvent:
"""Create thinking start event."""
data: ThinkingStartData = {"id": stream_id}
return {"type": ChatEventType.THINKING_START, "data": data}

@staticmethod
def create_thinking_chunk(stream_id: str, content: str) -> StreamingEvent:
"""Create thinking chunk event."""
data: ThinkingChunkData = {"id": stream_id, "content": content}
return {"type": ChatEventType.THINKING_CHUNK, "data": data}

@staticmethod
def create_thinking_end(stream_id: str) -> StreamingEvent:
"""Create thinking end event."""
data: ThinkingEndData = {"id": stream_id}
return {"type": ChatEventType.THINKING_END, "data": data}

@staticmethod
def extract_thinking_content(message_chunk: Any) -> str | None:
"""
Extract thinking/reasoning content from message chunk.

Checks various provider-specific locations:
- Anthropic Claude: content blocks with type="thinking"
- DeepSeek R1: additional_kwargs.reasoning_content
- Gemini 3: content blocks with type="thought" or response_metadata.reasoning
- Generic: response_metadata.reasoning_content or thinking

Args:
message_chunk: Message chunk from LLM streaming

Returns:
Extracted thinking content or None
"""
# Check for DeepSeek/OpenAI style reasoning_content in additional_kwargs
if hasattr(message_chunk, "additional_kwargs"):
additional_kwargs = message_chunk.additional_kwargs
if isinstance(additional_kwargs, dict):
reasoning = additional_kwargs.get("reasoning_content")
if reasoning:
logger.debug("Found thinking in additional_kwargs.reasoning_content")
return reasoning

# Check for thinking/thought blocks in content (Anthropic, Gemini 3)
if hasattr(message_chunk, "content"):
content = message_chunk.content
if isinstance(content, list):
for block in content:
if isinstance(block, dict):
block_type = block.get("type", "")
# Anthropic Claude uses "thinking" type
if block_type == "thinking":
thinking_text = block.get("thinking", "")
if thinking_text:
logger.debug("Found thinking in content block type='thinking'")
return thinking_text
# Gemini 3 uses "thought" type
elif block_type == "thought":
thought_text = block.get("thought", "") or block.get("text", "")
if thought_text:
logger.debug("Found thinking in content block type='thought'")
return thought_text

# Check response_metadata for thinking content
if hasattr(message_chunk, "response_metadata"):
metadata = message_chunk.response_metadata
if isinstance(metadata, dict):
# Gemini 3 uses "reasoning" key
thinking = (
metadata.get("thinking")
or metadata.get("reasoning_content")
or metadata.get("reasoning")
or metadata.get("thoughts")
)
if thinking:
logger.debug("Found thinking in response_metadata: %s", list(metadata.keys()))
return thinking

return None


class CitationExtractor:
Expand Down Expand Up @@ -264,7 +355,7 @@ def _deduplicate_citations(citations: list[CitationData]) -> list[CitationData]:
def create_citations_event(citations: list[CitationData]) -> StreamingEvent:
"""Create search citations event."""
data: SearchCitationsData = {"citations": citations}
return {"type": ChatEventType.SEARCH_CITATIONS, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.SEARCH_CITATIONS, "data": data}


class GeneratedFileHandler:
Expand Down Expand Up @@ -397,7 +488,7 @@ async def process_generated_content(
def create_generated_files_event(files: list[GeneratedFileInfo]) -> StreamingEvent:
"""Create generated files event."""
data: GeneratedFilesData = {"files": files}
return {"type": ChatEventType.GENERATED_FILES, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.GENERATED_FILES, "data": data}


class TokenStreamProcessor:
Expand Down
1 change: 0 additions & 1 deletion service/app/core/providers/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def _create_google(self, model: str, credentials: LLMCredentials, runtime_kwargs
# Extract google_search_enabled from runtime_kwargs
google_search_enabled = runtime_kwargs.pop("google_search_enabled", False)

# Create the base model
llm = ChatGoogleGenerativeAI(
model=model,
google_api_key=credentials["api_key"],
Expand Down
3 changes: 3 additions & 0 deletions service/app/models/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class MessageBase(SQLModel):
role: str
content: str
topic_id: UUID = Field(index=True)
# Thinking/reasoning content from models like Claude, DeepSeek R1, Gemini 3
thinking_content: str | None = None


class Message(MessageBase, table=True):
Expand Down Expand Up @@ -63,3 +65,4 @@ class MessageReadWithFilesAndCitations(MessageBase):
class MessageUpdate(SQLModel):
role: str | None = None
content: str | None = None
thinking_content: str | None = None
1 change: 1 addition & 0 deletions service/app/repos/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ async def get_messages_with_files_and_citations(
created_at=message.created_at,
attachments=file_reads_with_urls,
citations=citations,
thinking_content=message.thinking_content,
)
messages_with_files_and_citations.append(message_with_files_and_citations)

Expand Down
49 changes: 49 additions & 0 deletions service/app/schemas/chat_event_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,25 @@ class InsufficientBalanceData(TypedDict):
action_required: str


class ThinkingStartData(TypedDict):
"""Data payload for THINKING_START event."""

id: str


class ThinkingChunkData(TypedDict):
"""Data payload for THINKING_CHUNK event."""

id: str
content: str


class ThinkingEndData(TypedDict):
"""Data payload for THINKING_END event."""

id: str


# =============================================================================
# Full Event Structures (type + data)
# =============================================================================
Expand Down Expand Up @@ -253,6 +272,27 @@ class InsufficientBalanceEvent(TypedDict):
data: InsufficientBalanceData


class ThinkingStartEvent(TypedDict):
"""Full event structure for thinking start."""

type: Literal[ChatEventType.THINKING_START]
data: ThinkingStartData


class ThinkingChunkEvent(TypedDict):
"""Full event structure for thinking chunk."""

type: Literal[ChatEventType.THINKING_CHUNK]
data: ThinkingChunkData


class ThinkingEndEvent(TypedDict):
"""Full event structure for thinking end."""

type: Literal[ChatEventType.THINKING_END]
data: ThinkingEndData


# =============================================================================
# Union type for generic event handling
# =============================================================================
Expand All @@ -273,6 +313,9 @@ class InsufficientBalanceEvent(TypedDict):
| MessageSavedEvent
| MessageEvent
| InsufficientBalanceEvent
| ThinkingStartEvent
| ThinkingChunkEvent
| ThinkingEndEvent
)


Expand All @@ -294,6 +337,9 @@ class InsufficientBalanceEvent(TypedDict):
"MessageSavedData",
"MessageData",
"InsufficientBalanceData",
"ThinkingStartData",
"ThinkingChunkData",
"ThinkingEndData",
# Event types
"StreamingStartEvent",
"StreamingChunkEvent",
Expand All @@ -309,6 +355,9 @@ class InsufficientBalanceEvent(TypedDict):
"MessageSavedEvent",
"MessageEvent",
"InsufficientBalanceEvent",
"ThinkingStartEvent",
"ThinkingChunkEvent",
"ThinkingEndEvent",
# Union
"StreamingEvent",
]
5 changes: 5 additions & 0 deletions service/app/schemas/chat_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class ChatEventType(StrEnum):
# Balance/billing events
INSUFFICIENT_BALANCE = "insufficient_balance"

# Thinking/reasoning content (for models like Claude, DeepSeek R1, OpenAI o1)
THINKING_START = "thinking_start"
THINKING_CHUNK = "thinking_chunk"
THINKING_END = "thinking_end"


class ChatClientEventType(StrEnum):
"""Client -> Server event types (messages coming from the frontend)."""
Expand Down
Loading
Loading