Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
916 changes: 812 additions & 104 deletions AGENTS.md

Large diffs are not rendered by default.

41 changes: 0 additions & 41 deletions ROADMAP.md

This file was deleted.

30 changes: 0 additions & 30 deletions TODO.md

This file was deleted.

10 changes: 10 additions & 0 deletions service/app/configs/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class LLMConfig(BaseModel):
openai: LLMProviderConfig = Field(default_factory=LLMProviderConfig, description="OpenAI config")
google: LLMProviderConfig = Field(default_factory=LLMProviderConfig, description="Google GenAI config")
googlevertex: LLMProviderConfig = Field(default_factory=LLMProviderConfig, description="Google Vertex config")
gpugeek: LLMProviderConfig = Field(default_factory=LLMProviderConfig, description="GPUGeek config")
qwen: LLMProviderConfig = Field(default_factory=LLMProviderConfig, description="Qwen config")

# Legacy single-provider fields
provider: ProviderType | None = Field(default=None, description="(Legacy) Provider type")
Expand Down Expand Up @@ -111,6 +113,10 @@ def normalize(s: str) -> str:
return ProviderType.AZURE_OPENAI.value
if s == "googlevertex":
return ProviderType.GOOGLE_VERTEX.value
if s == "gpugeek":
return ProviderType.GPUGEEK.value
if s == "qwen":
return ProviderType.QWEN.value
return s

return [ProviderType(normalize(item)) for item in items]
Expand Down Expand Up @@ -140,6 +146,10 @@ def get_provider_config(self, provider: ProviderType) -> LLMProviderConfig:
return self.google
case ProviderType.GOOGLE_VERTEX:
return self.googlevertex
case ProviderType.GPUGEEK:
return self.gpugeek
case ProviderType.QWEN:
return self.qwen

def iter_enabled(self) -> list[tuple[ProviderType, LLMProviderConfig]]:
"""Return enabled provider configs.
Expand Down
31 changes: 29 additions & 2 deletions service/app/core/chat/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
GeneratedFileHandler,
StreamContext,
StreamingEventHandler,
ThinkingEventHandler,
TokenStreamProcessor,
ToolEventHandler,
)
Expand Down Expand Up @@ -301,7 +302,7 @@ async def _handle_updates_mode(data: Any, ctx: StreamContext) -> AsyncGenerator[


async def _handle_messages_mode(data: Any, ctx: StreamContext) -> AsyncGenerator[StreamingEvent, None]:
"""Handle 'messages' mode events (token streaming)."""
"""Handle 'messages' mode events (token streaming and thinking content)."""
if not isinstance(data, tuple):
return

Expand Down Expand Up @@ -331,7 +332,27 @@ async def _handle_messages_mode(data: Any, ctx: StreamContext) -> AsyncGenerator
if node and node not in ("model", "agent"):
return

# Extract and emit token
# Check for thinking content first (from reasoning models like Claude, DeepSeek R1, Gemini 3)
thinking_content = ThinkingEventHandler.extract_thinking_content(message_chunk)

if thinking_content:
# Start thinking if not already
if not ctx.is_thinking:
logger.debug("Emitting thinking_start for stream_id=%s", ctx.stream_id)
ctx.is_thinking = True
yield ThinkingEventHandler.create_thinking_start(ctx.stream_id)

ctx.thinking_buffer.append(thinking_content)
yield ThinkingEventHandler.create_thinking_chunk(ctx.stream_id, thinking_content)
return

# If we were thinking but now have regular content, end thinking first
if ctx.is_thinking:
logger.debug("Emitting thinking_end for stream_id=%s", ctx.stream_id)
ctx.is_thinking = False
yield ThinkingEventHandler.create_thinking_end(ctx.stream_id)

# Extract and emit token for regular streaming
token_text = TokenStreamProcessor.extract_token_text(message_chunk)
if not token_text:
return
Expand All @@ -347,6 +368,12 @@ async def _handle_messages_mode(data: Any, ctx: StreamContext) -> AsyncGenerator

async def _finalize_streaming(ctx: StreamContext) -> AsyncGenerator[StreamingEvent, None]:
"""Finalize the streaming session."""
# If still thinking when finalizing, emit thinking_end
if ctx.is_thinking:
logger.debug("Emitting thinking_end (in finalize) for stream_id=%s", ctx.stream_id)
ctx.is_thinking = False
yield ThinkingEventHandler.create_thinking_end(ctx.stream_id)

if ctx.is_streaming:
logger.debug(
"Emitting streaming_end for stream_id=%s (total tokens: %d)",
Expand Down
111 changes: 101 additions & 10 deletions service/app/core/chat/stream_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
StreamingEndData,
StreamingEvent,
StreamingStartData,
ThinkingChunkData,
ThinkingEndData,
ThinkingStartData,
TokenUsageData,
ToolCallRequestData,
ToolCallResponseData,
Expand Down Expand Up @@ -55,6 +58,9 @@ class StreamContext:
total_input_tokens: int = 0
total_output_tokens: int = 0
total_tokens: int = 0
# Thinking/reasoning content state
is_thinking: bool = False
thinking_buffer: list[str] = field(default_factory=list)


class ToolEventHandler:
Expand All @@ -79,7 +85,7 @@ def create_tool_request_event(tool_call: dict[str, Any]) -> StreamingEvent:
"status": ToolCallStatus.EXECUTING,
"timestamp": asyncio.get_event_loop().time(),
}
return {"type": ChatEventType.TOOL_CALL_REQUEST, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.TOOL_CALL_REQUEST, "data": data}

@staticmethod
def create_tool_response_event(
Expand All @@ -101,7 +107,7 @@ def create_tool_response_event(
"status": status,
"result": result,
}
return {"type": ChatEventType.TOOL_CALL_RESPONSE, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.TOOL_CALL_RESPONSE, "data": data}


class StreamingEventHandler:
Expand All @@ -111,13 +117,13 @@ class StreamingEventHandler:
def create_streaming_start(stream_id: str) -> StreamingEvent:
"""Create streaming start event."""
data: StreamingStartData = {"id": stream_id}
return {"type": ChatEventType.STREAMING_START, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.STREAMING_START, "data": data}

@staticmethod
def create_streaming_chunk(stream_id: str, content: str) -> StreamingEvent:
"""Create streaming chunk event."""
data: StreamingChunkData = {"id": stream_id, "content": content}
return {"type": ChatEventType.STREAMING_CHUNK, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.STREAMING_CHUNK, "data": data}

@staticmethod
def create_streaming_end(stream_id: str) -> StreamingEvent:
Expand All @@ -126,7 +132,7 @@ def create_streaming_end(stream_id: str) -> StreamingEvent:
"id": stream_id,
"created_at": asyncio.get_event_loop().time(),
}
return {"type": ChatEventType.STREAMING_END, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.STREAMING_END, "data": data}

@staticmethod
def create_token_usage_event(input_tokens: int, output_tokens: int, total_tokens: int) -> StreamingEvent:
Expand All @@ -136,17 +142,102 @@ def create_token_usage_event(input_tokens: int, output_tokens: int, total_tokens
"output_tokens": output_tokens,
"total_tokens": total_tokens,
}
return {"type": ChatEventType.TOKEN_USAGE, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.TOKEN_USAGE, "data": data}

@staticmethod
def create_processing_event(status: str = ProcessingStatus.PREPARING_REQUEST) -> StreamingEvent:
"""Create processing status event."""
return {"type": ChatEventType.PROCESSING, "data": {"status": status}} # type: ignore[return-value]
return {"type": ChatEventType.PROCESSING, "data": {"status": status}}

@staticmethod
def create_error_event(error: str) -> StreamingEvent:
"""Create error event."""
return {"type": ChatEventType.ERROR, "data": {"error": error}} # type: ignore[return-value]
return {"type": ChatEventType.ERROR, "data": {"error": error}}


class ThinkingEventHandler:
"""Handle thinking/reasoning content streaming events."""

@staticmethod
def create_thinking_start(stream_id: str) -> StreamingEvent:
"""Create thinking start event."""
data: ThinkingStartData = {"id": stream_id}
return {"type": ChatEventType.THINKING_START, "data": data}

@staticmethod
def create_thinking_chunk(stream_id: str, content: str) -> StreamingEvent:
"""Create thinking chunk event."""
data: ThinkingChunkData = {"id": stream_id, "content": content}
return {"type": ChatEventType.THINKING_CHUNK, "data": data}

@staticmethod
def create_thinking_end(stream_id: str) -> StreamingEvent:
"""Create thinking end event."""
data: ThinkingEndData = {"id": stream_id}
return {"type": ChatEventType.THINKING_END, "data": data}

@staticmethod
def extract_thinking_content(message_chunk: Any) -> str | None:
"""
Extract thinking/reasoning content from message chunk.

Checks various provider-specific locations:
- Anthropic Claude: content blocks with type="thinking"
- DeepSeek R1: additional_kwargs.reasoning_content
- Gemini 3: content blocks with type="thought" or response_metadata.reasoning
- Generic: response_metadata.reasoning_content or thinking

Args:
message_chunk: Message chunk from LLM streaming

Returns:
Extracted thinking content or None
"""
# Check for DeepSeek/OpenAI style reasoning_content in additional_kwargs
if hasattr(message_chunk, "additional_kwargs"):
additional_kwargs = message_chunk.additional_kwargs
if isinstance(additional_kwargs, dict):
reasoning = additional_kwargs.get("reasoning_content")
if reasoning:
logger.debug("Found thinking in additional_kwargs.reasoning_content")
return reasoning

# Check for thinking/thought blocks in content (Anthropic, Gemini 3)
if hasattr(message_chunk, "content"):
content = message_chunk.content
if isinstance(content, list):
for block in content:
if isinstance(block, dict):
block_type = block.get("type", "")
# Anthropic Claude uses "thinking" type
if block_type == "thinking":
thinking_text = block.get("thinking", "")
if thinking_text:
logger.debug("Found thinking in content block type='thinking'")
return thinking_text
# Gemini 3 uses "thought" type
elif block_type == "thought":
thought_text = block.get("thought", "") or block.get("text", "")
if thought_text:
logger.debug("Found thinking in content block type='thought'")
return thought_text

# Check response_metadata for thinking content
if hasattr(message_chunk, "response_metadata"):
metadata = message_chunk.response_metadata
if isinstance(metadata, dict):
# Gemini 3 uses "reasoning" key
thinking = (
metadata.get("thinking")
or metadata.get("reasoning_content")
or metadata.get("reasoning")
or metadata.get("thoughts")
)
if thinking:
logger.debug("Found thinking in response_metadata: %s", list(metadata.keys()))
return thinking

return None


class CitationExtractor:
Expand Down Expand Up @@ -264,7 +355,7 @@ def _deduplicate_citations(citations: list[CitationData]) -> list[CitationData]:
def create_citations_event(citations: list[CitationData]) -> StreamingEvent:
"""Create search citations event."""
data: SearchCitationsData = {"citations": citations}
return {"type": ChatEventType.SEARCH_CITATIONS, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.SEARCH_CITATIONS, "data": data}


class GeneratedFileHandler:
Expand Down Expand Up @@ -397,7 +488,7 @@ async def process_generated_content(
def create_generated_files_event(files: list[GeneratedFileInfo]) -> StreamingEvent:
"""Create generated files event."""
data: GeneratedFilesData = {"files": files}
return {"type": ChatEventType.GENERATED_FILES, "data": data} # type: ignore[return-value]
return {"type": ChatEventType.GENERATED_FILES, "data": data}


class TokenStreamProcessor:
Expand Down
4 changes: 4 additions & 0 deletions service/app/core/chat/topic_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def _select_title_generation_model(
return "gemini-2.5-flash"
if provider_type == ProviderType.AZURE_OPENAI:
return "gpt-5-mini"
if provider_type == ProviderType.GPUGEEK:
return "Vendor2/Gemini-2.5-Flash"
if provider_type == ProviderType.QWEN:
return "qwen3-0.6b"
return session_model or default_model


Expand Down
Loading