Skip to content

fix: fix the first time calling knowledge tool error#194

Merged
Mile-Away merged 1 commit intotestfrom
fix/first-time-knowledge-tool-call-error
Jan 22, 2026
Merged

fix: fix the first time calling knowledge tool error#194
Mile-Away merged 1 commit intotestfrom
fix/first-time-knowledge-tool-call-error

Conversation

@xinquiry
Copy link
Collaborator

@xinquiry xinquiry commented Jan 22, 2026

变更内容

  • 新功能
  • 修复 Bug
  • 增强重构
  • 其他(请描述)

简要描述本次 PR 的主要变更内容。

相关 Issue

请关联相关 Issue(如有):#编号

检查清单

默认已勾选,如不满足,请检查。

  • 已在本地测试通过
  • 已补充/更新相关文档
  • 已添加测试用例
  • 代码风格已经过 pre-commit 钩子检查

其他说明

如有特殊说明或注意事项,请补充。

Summary by Sourcery

为知识工具操作引入专用的任务数据库会话上下文,并简化签到状态/积分的处理。

New Features:

  • 为 Celery worker 和工具执行上下文新增一个任务专用的异步数据库会话上下文管理器。

Bug Fixes:

  • 确保知识工具的文件操作使用任务安全的数据库会话,以避免首次执行时出现错误。

Enhancements:

  • 简化签到积分计算逻辑,并为签到状态响应添加一个带类型标注的字典。
  • 在文献客户端测试中收紧作者元数据的类型注解。

Tests:

  • 调整文献客户端测试,以对作者结构使用更精确的类型标注。
Original summary in English

Summary by Sourcery

Introduce a dedicated task database session context for knowledge tool operations and streamline check-in status/points handling.

New Features:

  • Add a task-specific async database session context manager for Celery worker and tool execution contexts.

Bug Fixes:

  • Ensure knowledge tool file operations use a task-safe database session to avoid first-time execution errors.

Enhancements:

  • Simplify check-in points calculation and add a typed dictionary for check-in status responses.
  • Tighten type annotations in literature client tests for author metadata.

Tests:

  • Adjust literature client tests to use more precise typing for author structures.

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Jan 22, 2026

Reviewer's Guide

为任务/worker 上下文引入了专用的异步数据库会话上下文,并在知识工具操作中使用它;同时重构了签到积分计算及其类型标注,并在一个文献相关测试中做了小的类型改进。

Sequence diagram for knowledge tool operations using get_task_db_session

sequenceDiagram
    actor Worker
    participant KnowledgeToolOperations
    participant DatabaseConnection
    participant AsyncSession
    participant FileRepository

    Worker->>KnowledgeToolOperations: list_files(user_id, knowledge_set_id)
    activate KnowledgeToolOperations

    KnowledgeToolOperations->>DatabaseConnection: get_task_db_session()
    activate DatabaseConnection
    DatabaseConnection->>DatabaseConnection: pid = os.getpid()
    alt engine_not_cached_for_pid
        DatabaseConnection->>DatabaseConnection: create_async_engine(ASYNC_DATABASE_URL)
        DatabaseConnection->>DatabaseConnection: async_sessionmaker(bind=task_engine)
        DatabaseConnection->>DatabaseConnection: _worker_engines[pid] = sessionmaker
    end
    DatabaseConnection-->>KnowledgeToolOperations: AsyncSession context manager
    deactivate DatabaseConnection

    KnowledgeToolOperations->>AsyncSession: open session (async with)
    activate AsyncSession
    KnowledgeToolOperations->>FileRepository: FileRepository(db_session)
    activate FileRepository
    FileRepository-->>KnowledgeToolOperations: files
    deactivate FileRepository

    AsyncSession-->>KnowledgeToolOperations: close session (context exit)
    deactivate AsyncSession

    KnowledgeToolOperations-->>Worker: result dict
    deactivate KnowledgeToolOperations
Loading

Class diagram for CheckInService and CheckInStatus

classDiagram
    class CheckInStatus {
        bool checked_in_today
        int consecutive_days
        int next_points
        int total_check_ins
    }

    class CheckInService {
        +calculate_points(consecutive_days int) int
        +check_in(user_id str) tuple
        +get_check_in_status(user_id str) CheckInStatus
    }
Loading

File-Level Changes

Change Details Files
Add a reusable async DB session context manager for Celery/worker contexts and expose it via the database package.
  • Introduce a per-process worker engine cache keyed by PID to avoid cross-process async engine issues.
  • Implement get_task_db_session async context manager that lazily creates and reuses async_sessionmaker instances per process.
  • Export get_task_db_session from the infra.database package and include it in all.
service/app/infra/database/connection.py
service/app/infra/database/__init__.py
在 Celery/worker 上下文中添加一个可复用的异步数据库会话上下文管理器,并通过 database 包对外暴露。
  • 引入按进程划分、以 PID 为键的 worker engine 缓存,以避免跨进程异步 engine 问题。
  • 实现 get_task_db_session 异步上下文管理器,在每个进程中按需延迟创建并复用 async_sessionmaker 实例。
  • 从 infra.database 包中导出 get_task_db_session,并将其加入 all
service/app/infra/database/connection.py
service/app/infra/database/__init__.py
Use the new task DB session in knowledge tool file operations.
  • Replace direct AsyncSessionLocal usage with get_task_db_session in list_files, read_file, write_file, and search_files operations to ensure safe DB usage in tool execution contexts.
service/app/tools/builtin/knowledge/operations.py
在知识工具的文件操作中使用新的 task 数据库会话。
  • 在 list_files、read_file、write_file 和 search_files 操作中,用 get_task_db_session 替代直接使用 AsyncSessionLocal,以确保在工具执行上下文中安全地使用数据库。
service/app/tools/builtin/knowledge/operations.py
Improve check-in service typing and simplify point calculation logic.
  • Introduce a CheckInStatus TypedDict and use it as the return type for get_check_in_status for stricter typing.
  • Refactor calculate_points to a concise formula using clamped consecutive_days instead of multiple conditionals.
  • Remove a commented-out, message-specific variant of the ALREADY_CHECKED_IN_TODAY error.
service/app/core/checkin.py
改进签到服务的类型标注,并简化积分计算逻辑。
  • 引入 CheckInStatus TypedDict,并将其作为 get_check_in_status 的返回类型,以获得更严格的类型约束。
  • 重构 calculate_points,使用基于限制范围后的 consecutive_days 的简洁公式来替代多个条件分支。
  • 移除一个已被注释掉、带特定提示信息的 ALREADY_CHECKED_IN_TODAY 错误变体。
service/app/core/checkin.py
Tighten typing in literature base client tests.
  • Annotate authors list with a precise list[dict[str, str | None]] type in the complete literature work test.
service/tests/unit/test_literature/test_base_client.py
收紧文献基础客户端测试中的类型标注。
  • 在完整文献作品测试中,将 authors 列表精确标注为 list[dict[str, str | None]] 类型。
service/tests/unit/test_literature/test_base_client.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

自定义你的使用体验

访问你的 dashboard 来:

  • 启用或禁用诸如 Sourcery 自动生成的 PR 摘要、审阅指南等审阅功能。
  • 更改审阅语言。
  • 添加、删除或编辑自定义审阅说明。
  • 调整其他审阅相关设置。

获取帮助

Original review guide in English

Reviewer's Guide

Introduces a dedicated async DB session context for task/worker contexts and uses it in knowledge tool operations, refactors check-in points calculation and typing, and adds a small typing improvement in a literature test.

Sequence diagram for knowledge tool operations using get_task_db_session

sequenceDiagram
    actor Worker
    participant KnowledgeToolOperations
    participant DatabaseConnection
    participant AsyncSession
    participant FileRepository

    Worker->>KnowledgeToolOperations: list_files(user_id, knowledge_set_id)
    activate KnowledgeToolOperations

    KnowledgeToolOperations->>DatabaseConnection: get_task_db_session()
    activate DatabaseConnection
    DatabaseConnection->>DatabaseConnection: pid = os.getpid()
    alt engine_not_cached_for_pid
        DatabaseConnection->>DatabaseConnection: create_async_engine(ASYNC_DATABASE_URL)
        DatabaseConnection->>DatabaseConnection: async_sessionmaker(bind=task_engine)
        DatabaseConnection->>DatabaseConnection: _worker_engines[pid] = sessionmaker
    end
    DatabaseConnection-->>KnowledgeToolOperations: AsyncSession context manager
    deactivate DatabaseConnection

    KnowledgeToolOperations->>AsyncSession: open session (async with)
    activate AsyncSession
    KnowledgeToolOperations->>FileRepository: FileRepository(db_session)
    activate FileRepository
    FileRepository-->>KnowledgeToolOperations: files
    deactivate FileRepository

    AsyncSession-->>KnowledgeToolOperations: close session (context exit)
    deactivate AsyncSession

    KnowledgeToolOperations-->>Worker: result dict
    deactivate KnowledgeToolOperations
Loading

Class diagram for CheckInService and CheckInStatus

classDiagram
    class CheckInStatus {
        bool checked_in_today
        int consecutive_days
        int next_points
        int total_check_ins
    }

    class CheckInService {
        +calculate_points(consecutive_days int) int
        +check_in(user_id str) tuple
        +get_check_in_status(user_id str) CheckInStatus
    }
Loading

File-Level Changes

Change Details Files
Add a reusable async DB session context manager for Celery/worker contexts and expose it via the database package.
  • Introduce a per-process worker engine cache keyed by PID to avoid cross-process async engine issues.
  • Implement get_task_db_session async context manager that lazily creates and reuses async_sessionmaker instances per process.
  • Export get_task_db_session from the infra.database package and include it in all.
service/app/infra/database/connection.py
service/app/infra/database/__init__.py
Use the new task DB session in knowledge tool file operations.
  • Replace direct AsyncSessionLocal usage with get_task_db_session in list_files, read_file, write_file, and search_files operations to ensure safe DB usage in tool execution contexts.
service/app/tools/builtin/knowledge/operations.py
Improve check-in service typing and simplify point calculation logic.
  • Introduce a CheckInStatus TypedDict and use it as the return type for get_check_in_status for stricter typing.
  • Refactor calculate_points to a concise formula using clamped consecutive_days instead of multiple conditionals.
  • Remove a commented-out, message-specific variant of the ALREADY_CHECKED_IN_TODAY error.
service/app/core/checkin.py
Tighten typing in literature base client tests.
  • Annotate authors list with a precise list[dict[str, str
None]] type in the complete literature work test.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 3 个问题,并给出了一些高层次的反馈:

  • get_task_db_session 中,按 PID 作为 key 的全局 _worker_engines 字典不是并发安全的,所以在同一进程中并发的首次调用可能会创建多个 engine,并且有些不会被释放;建议在初始化时加保护(例如使用锁),或者使用 setdefault 并统一到单一的 engine 创建路径。
  • get_task_db_session 中,存储在 _worker_engines 里的 engine 从未被释放,这会导致在长时间运行的 worker 生命周期中出现资源泄漏;你可能需要一个清理钩子(例如在 worker 关闭时)或一个显式的释放路径。
  • get_task_db_session 中那大块被注释掉的代码让函数更难阅读;建议删除它,或者将这些备用实现的细节移到 docstring 或设计文档里。
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The global `_worker_engines` dict keyed by PID in `get_task_db_session` is not concurrency-safe, so concurrent first-time calls in the same process could create multiple engines and leave some undisposed; consider protecting initialization (e.g., with a lock) or using `setdefault` plus a single engine creation path.
- In `get_task_db_session`, engines stored in `_worker_engines` are never disposed, which can lead to resource leakage over the lifetime of long-running workers; you may want a cleanup hook (e.g., on worker shutdown) or an explicit disposal path.
- The large block of commented-out code in `get_task_db_session` makes the function harder to read; consider removing it or moving the alternative implementation details into a docstring or design note.

## Individual Comments

### Comment 1
<location> `service/app/infra/database/connection.py:150-158` </location>
<code_context>
+    """
+    Get a database session suitable for Celery Worker / tool execution contexts.
+
+    Each call creates a new engine bound to the current event loop to avoid cross-process issues.
+
+    Usage:
</code_context>

<issue_to_address>
**suggestion:** The docstring no longer matches the implementation now that engines are reused per process.

Since the implementation now caches engines per worker process via `_worker_engines`, the docstring should describe that behavior (e.g., engines are cached per PID and sessions are created from those engines) instead of saying each call creates a new engine.

```suggestion
    """
    Get a database session suitable for Celery Worker / tool execution contexts.

    Engines are cached per worker process (PID) via `_worker_engines` to avoid cross-process
    issues, and each call creates a new `AsyncSession` from the process-local sessionmaker.

    Usage:
        async with get_task_db_session() as db:
            result = await db.execute(...)
    """
```
</issue_to_address>

### Comment 2
<location> `service/app/infra/database/connection.py:175-177` </location>
<code_context>
+    # Better version that reuses engines per process
+    pid = os.getpid()
+
+    if pid not in _worker_engines:
+        task_engine = create_async_engine(ASYNC_DATABASE_URL, echo=False, future=True)
+        _worker_engines[pid] = async_sessionmaker(
+            bind=task_engine,
+            class_=AsyncSession,
</code_context>

<issue_to_address>
**suggestion:** Engine lifecycle for worker sessions is implicit; consider making disposal/shutdown behavior explicit.

Because `_worker_engines` keeps one engine per PID for the process lifetime, this is fine for long‑lived workers but couples behavior to that assumption. If you have (or plan) a worker shutdown hook, consider wiring an explicit `dispose()` there, or wrapping engine creation/cleanup in a helper so it’s easy to adapt if workers become short‑lived or need manual teardown.
</issue_to_address>

### Comment 3
<location> `service/app/infra/database/connection.py:145` </location>
<code_context>
         yield session
+
+
+_worker_engines: dict[int, async_sessionmaker[AsyncSession]] = {}
+
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider replacing the PID→sessionmaker dict and commented-out code with a lazily-initialized per-process sessionmaker helper to simplify `get_task_db_session` while preserving per-process safety.

You can keep the per-process safety without the PID→sessionmaker dict or large commented block by collapsing this into a single lazily-initialized sessionmaker that is refreshed when the PID changes.

### 1. Replace `_worker_engines` with per-process singleton

```python
_task_sessionmaker: async_sessionmaker[AsyncSession] | None = None
_task_session_pid: int | None = None


def _get_task_sessionmaker() -> async_sessionmaker[AsyncSession]:
    global _task_sessionmaker, _task_session_pid

    pid = os.getpid()
    if _task_sessionmaker is None or _task_session_pid != pid:
        # New process (or first call): create a fresh engine + sessionmaker
        task_engine = create_async_engine(ASYNC_DATABASE_URL, echo=False, future=True)
        _task_sessionmaker = async_sessionmaker(
            bind=task_engine,
            class_=AsyncSession,
            expire_on_commit=False,
        )
        _task_session_pid = pid

    return _task_sessionmaker
```

Then simplify the contextmanager:

```python
@asynccontextmanager
async def get_task_db_session() -> AsyncGenerator[AsyncSession, None]:
    SessionLocal = _get_task_sessionmaker()
    async with SessionLocal() as session:
        yield session
```

This preserves:

- Per-process isolation (PID change ⇒ new engine/sessionmaker)
- Engine reuse within a worker process

while avoiding the dict keyed by PID and extra indirection.

### 2. Remove the large commented-out alternative

Instead of the full commented version, keep at most a short rationale:

```python
# Note: we create the engine lazily per-process (based on PID) to avoid
# sharing engines across forked workers.
```

All prior versions are still visible in git history, so the full commented implementation isn’t needed here.
</issue_to_address>

Sourcery 对开源项目是免费的——如果你喜欢我们的代码审查,请考虑分享一下 ✨
帮我变得更有用!请对每条评论点👍或👎,我会根据这些反馈改进后续的代码审查。
Original comment in English

Hey - I've found 3 issues, and left some high level feedback:

  • The global _worker_engines dict keyed by PID in get_task_db_session is not concurrency-safe, so concurrent first-time calls in the same process could create multiple engines and leave some undisposed; consider protecting initialization (e.g., with a lock) or using setdefault plus a single engine creation path.
  • In get_task_db_session, engines stored in _worker_engines are never disposed, which can lead to resource leakage over the lifetime of long-running workers; you may want a cleanup hook (e.g., on worker shutdown) or an explicit disposal path.
  • The large block of commented-out code in get_task_db_session makes the function harder to read; consider removing it or moving the alternative implementation details into a docstring or design note.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The global `_worker_engines` dict keyed by PID in `get_task_db_session` is not concurrency-safe, so concurrent first-time calls in the same process could create multiple engines and leave some undisposed; consider protecting initialization (e.g., with a lock) or using `setdefault` plus a single engine creation path.
- In `get_task_db_session`, engines stored in `_worker_engines` are never disposed, which can lead to resource leakage over the lifetime of long-running workers; you may want a cleanup hook (e.g., on worker shutdown) or an explicit disposal path.
- The large block of commented-out code in `get_task_db_session` makes the function harder to read; consider removing it or moving the alternative implementation details into a docstring or design note.

## Individual Comments

### Comment 1
<location> `service/app/infra/database/connection.py:150-158` </location>
<code_context>
+    """
+    Get a database session suitable for Celery Worker / tool execution contexts.
+
+    Each call creates a new engine bound to the current event loop to avoid cross-process issues.
+
+    Usage:
</code_context>

<issue_to_address>
**suggestion:** The docstring no longer matches the implementation now that engines are reused per process.

Since the implementation now caches engines per worker process via `_worker_engines`, the docstring should describe that behavior (e.g., engines are cached per PID and sessions are created from those engines) instead of saying each call creates a new engine.

```suggestion
    """
    Get a database session suitable for Celery Worker / tool execution contexts.

    Engines are cached per worker process (PID) via `_worker_engines` to avoid cross-process
    issues, and each call creates a new `AsyncSession` from the process-local sessionmaker.

    Usage:
        async with get_task_db_session() as db:
            result = await db.execute(...)
    """
```
</issue_to_address>

### Comment 2
<location> `service/app/infra/database/connection.py:175-177` </location>
<code_context>
+    # Better version that reuses engines per process
+    pid = os.getpid()
+
+    if pid not in _worker_engines:
+        task_engine = create_async_engine(ASYNC_DATABASE_URL, echo=False, future=True)
+        _worker_engines[pid] = async_sessionmaker(
+            bind=task_engine,
+            class_=AsyncSession,
</code_context>

<issue_to_address>
**suggestion:** Engine lifecycle for worker sessions is implicit; consider making disposal/shutdown behavior explicit.

Because `_worker_engines` keeps one engine per PID for the process lifetime, this is fine for long‑lived workers but couples behavior to that assumption. If you have (or plan) a worker shutdown hook, consider wiring an explicit `dispose()` there, or wrapping engine creation/cleanup in a helper so it’s easy to adapt if workers become short‑lived or need manual teardown.
</issue_to_address>

### Comment 3
<location> `service/app/infra/database/connection.py:145` </location>
<code_context>
         yield session
+
+
+_worker_engines: dict[int, async_sessionmaker[AsyncSession]] = {}
+
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider replacing the PID→sessionmaker dict and commented-out code with a lazily-initialized per-process sessionmaker helper to simplify `get_task_db_session` while preserving per-process safety.

You can keep the per-process safety without the PID→sessionmaker dict or large commented block by collapsing this into a single lazily-initialized sessionmaker that is refreshed when the PID changes.

### 1. Replace `_worker_engines` with per-process singleton

```python
_task_sessionmaker: async_sessionmaker[AsyncSession] | None = None
_task_session_pid: int | None = None


def _get_task_sessionmaker() -> async_sessionmaker[AsyncSession]:
    global _task_sessionmaker, _task_session_pid

    pid = os.getpid()
    if _task_sessionmaker is None or _task_session_pid != pid:
        # New process (or first call): create a fresh engine + sessionmaker
        task_engine = create_async_engine(ASYNC_DATABASE_URL, echo=False, future=True)
        _task_sessionmaker = async_sessionmaker(
            bind=task_engine,
            class_=AsyncSession,
            expire_on_commit=False,
        )
        _task_session_pid = pid

    return _task_sessionmaker
```

Then simplify the contextmanager:

```python
@asynccontextmanager
async def get_task_db_session() -> AsyncGenerator[AsyncSession, None]:
    SessionLocal = _get_task_sessionmaker()
    async with SessionLocal() as session:
        yield session
```

This preserves:

- Per-process isolation (PID change ⇒ new engine/sessionmaker)
- Engine reuse within a worker process

while avoiding the dict keyed by PID and extra indirection.

### 2. Remove the large commented-out alternative

Instead of the full commented version, keep at most a short rationale:

```python
# Note: we create the engine lazily per-process (based on PID) to avoid
# sharing engines across forked workers.
```

All prior versions are still visible in git history, so the full commented implementation isn’t needed here.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +150 to +158
"""
Get a database session suitable for Celery Worker / tool execution contexts.

Each call creates a new engine bound to the current event loop to avoid cross-process issues.

Usage:
async with get_task_db_session() as db:
result = await db.execute(...)
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: 现在 engine 已经在每个进程内复用,docstring 与实现不再匹配。

由于当前实现是通过 _worker_engines 在每个 worker 进程中缓存 engine,docstring 应该描述这种行为(例如:engine 以 PID 为维度进行缓存,并从这些 engine 创建 session),而不是说每次调用都会创建一个新的 engine。

Suggested change
"""
Get a database session suitable for Celery Worker / tool execution contexts.
Each call creates a new engine bound to the current event loop to avoid cross-process issues.
Usage:
async with get_task_db_session() as db:
result = await db.execute(...)
"""
"""
Get a database session suitable for Celery Worker / tool execution contexts.
Engines are cached per worker process (PID) via `_worker_engines` to avoid cross-process
issues, and each call creates a new `AsyncSession` from the process-local sessionmaker.
Usage:
async with get_task_db_session() as db:
result = await db.execute(...)
"""
Original comment in English

suggestion: The docstring no longer matches the implementation now that engines are reused per process.

Since the implementation now caches engines per worker process via _worker_engines, the docstring should describe that behavior (e.g., engines are cached per PID and sessions are created from those engines) instead of saying each call creates a new engine.

Suggested change
"""
Get a database session suitable for Celery Worker / tool execution contexts.
Each call creates a new engine bound to the current event loop to avoid cross-process issues.
Usage:
async with get_task_db_session() as db:
result = await db.execute(...)
"""
"""
Get a database session suitable for Celery Worker / tool execution contexts.
Engines are cached per worker process (PID) via `_worker_engines` to avoid cross-process
issues, and each call creates a new `AsyncSession` from the process-local sessionmaker.
Usage:
async with get_task_db_session() as db:
result = await db.execute(...)
"""

Comment on lines +175 to +177
if pid not in _worker_engines:
task_engine = create_async_engine(ASYNC_DATABASE_URL, echo=False, future=True)
_worker_engines[pid] = async_sessionmaker(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: 当前 worker session 的 engine 生命周期是隐式的;建议将释放/关闭行为显式化。

由于 _worker_engines 在一个进程的整个生命周期内为每个 PID 保留一个 engine,这对长生命周期的 worker 来说没问题,但实现依赖于这种假设。如果你已经有(或计划有)worker 的关闭钩子,可以考虑在那儿显式调用 dispose(),或者把 engine 的创建/清理封装到一个辅助函数里,这样当 worker 变成短生命周期或需要手动拆卸时更容易适配。

Original comment in English

suggestion: Engine lifecycle for worker sessions is implicit; consider making disposal/shutdown behavior explicit.

Because _worker_engines keeps one engine per PID for the process lifetime, this is fine for long‑lived workers but couples behavior to that assumption. If you have (or plan) a worker shutdown hook, consider wiring an explicit dispose() there, or wrapping engine creation/cleanup in a helper so it’s easy to adapt if workers become short‑lived or need manual teardown.

yield session


_worker_engines: dict[int, async_sessionmaker[AsyncSession]] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议用一个延迟初始化的、按进程的 sessionmaker helper 来替代当前的 PID→sessionmaker 字典和被注释掉的代码,在保持进程级安全的同时简化 get_task_db_session

你可以在不使用 PID→sessionmaker 字典和大块注释代码的情况下保持进程级安全,只需要把逻辑折叠为一个在 PID 变化时刷新的延迟初始化 sessionmaker。

1. 用按进程的单例替换 _worker_engines

_task_sessionmaker: async_sessionmaker[AsyncSession] | None = None
_task_session_pid: int | None = None


def _get_task_sessionmaker() -> async_sessionmaker[AsyncSession]:
    global _task_sessionmaker, _task_session_pid

    pid = os.getpid()
    if _task_sessionmaker is None or _task_session_pid != pid:
        # New process (or first call): create a fresh engine + sessionmaker
        task_engine = create_async_engine(ASYNC_DATABASE_URL, echo=False, future=True)
        _task_sessionmaker = async_sessionmaker(
            bind=task_engine,
            class_=AsyncSession,
            expire_on_commit=False,
        )
        _task_session_pid = pid

    return _task_sessionmaker

然后可以把 contextmanager 简化为:

@asynccontextmanager
async def get_task_db_session() -> AsyncGenerator[AsyncSession, None]:
    SessionLocal = _get_task_sessionmaker()
    async with SessionLocal() as session:
        yield session

这样既能保留:

  • 进程级隔离(PID 变化 ⇒ 新的 engine/sessionmaker)
  • 在同一 worker 进程内重用 engine

又能避免使用以 PID 为 key 的字典和额外的间接层。

2. 移除那块大型注释的备用实现

与其保留完整的被注释版本,不如最多留一段简短的设计理由说明:

# Note: we create the engine lazily per-process (based on PID) to avoid
# sharing engines across forked workers.

之前的所有版本都可以在 git 历史里找到,因此没必要在这里保留完整的注释实现。

Original comment in English

issue (complexity): Consider replacing the PID→sessionmaker dict and commented-out code with a lazily-initialized per-process sessionmaker helper to simplify get_task_db_session while preserving per-process safety.

You can keep the per-process safety without the PID→sessionmaker dict or large commented block by collapsing this into a single lazily-initialized sessionmaker that is refreshed when the PID changes.

1. Replace _worker_engines with per-process singleton

_task_sessionmaker: async_sessionmaker[AsyncSession] | None = None
_task_session_pid: int | None = None


def _get_task_sessionmaker() -> async_sessionmaker[AsyncSession]:
    global _task_sessionmaker, _task_session_pid

    pid = os.getpid()
    if _task_sessionmaker is None or _task_session_pid != pid:
        # New process (or first call): create a fresh engine + sessionmaker
        task_engine = create_async_engine(ASYNC_DATABASE_URL, echo=False, future=True)
        _task_sessionmaker = async_sessionmaker(
            bind=task_engine,
            class_=AsyncSession,
            expire_on_commit=False,
        )
        _task_session_pid = pid

    return _task_sessionmaker

Then simplify the contextmanager:

@asynccontextmanager
async def get_task_db_session() -> AsyncGenerator[AsyncSession, None]:
    SessionLocal = _get_task_sessionmaker()
    async with SessionLocal() as session:
        yield session

This preserves:

  • Per-process isolation (PID change ⇒ new engine/sessionmaker)
  • Engine reuse within a worker process

while avoiding the dict keyed by PID and extra indirection.

2. Remove the large commented-out alternative

Instead of the full commented version, keep at most a short rationale:

# Note: we create the engine lazily per-process (based on PID) to avoid
# sharing engines across forked workers.

All prior versions are still visible in git history, so the full commented implementation isn’t needed here.

@Mile-Away Mile-Away merged commit fc4f6f4 into test Jan 22, 2026
2 checks passed
@Mile-Away Mile-Away deleted the fix/first-time-knowledge-tool-call-error branch January 22, 2026 12:19
Mile-Away added a commit that referenced this pull request Jan 22, 2026
* Feature/literature mcp (#192)

* feat: literature-MCP 完整功能

* refactor: improve boolean parsing and logging in literature search functions

* feat: enhance literature search functionality with improved query validation and detailed results formatting

* refactor: rename oa_url to access_url in LiteratureWork model and related tests

* feat: remove test-build workflow and update README for development setup

* feat: tool cost system and PPTX image handling fixes (#193)

* fix: prompt, factory

* feat: enhanced ppt generation with image slides mode

- Add image_slides mode for PPTX with full-bleed AI-generated images
- Add ImageBlock.image_id field for referencing generated images
- Add ImageSlideSpec for image-only slides
- Add ImageFetcher service for fetching images from various sources
- Reorganize knowledge module from single file to module structure
- Move document utilities from app/mcp/ to app/tools/utils/documents/
- Resolve image_ids to storage URLs in async layer (operations.py)
- Fix type errors and move tests to proper location

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: implement the tool cost

---------

Co-authored-by: Claude <noreply@anthropic.com>

* fix: fix the first time calling knowledge tool error (#194)

* fix: fix the wrong cache for second call of agent tools (#195)

* feat: several improvements (#196)

* fix: jump to latest topic when click agent

* feat: allow more than one image for generate image

* feat: allow user directly edit mcp in the chat-toolbar

* feat: improve the frontend perf

* feat: multiple UI improvements and fixes (#198)

* fix: jump to latest topic when click agent

* feat: allow more than one image for generate image

* feat: allow user directly edit mcp in the chat-toolbar

* feat: improve the frontend perf

* fix: restore previous active topic when clicking agent

Instead of always jumping to the latest topic, now tracks and restores
the previously active topic for each agent when switching between them.

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: add context menu to FocusedView agents and download button to lightbox

- Add right-click context menu (edit/delete) to compact AgentListItem variant
- Render context menu via portal to escape overflow:hidden containers
- Add edit/delete handlers to FocusedView with AgentSettingsModal and ConfirmationModal
- Add download button to image lightbox with smart filename detection

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: add web_fetch tool bundled with web_search

- Add web_fetch tool using Trafilatura for content extraction
- Bundle web_fetch with web_search in frontend toolConfig
- Group WEB_SEARCH_TOOLS for unified toggle behavior
- Only load web_fetch when web_search is available (SearXNG enabled)
- Update tool capabilities mapping for web_fetch

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>

---------

Co-authored-by: Meng Junxing <junxingmeng@gmail.com>
Co-authored-by: Harvey <q-query@outlook.com>
Co-authored-by: Claude <noreply@anthropic.com>
Mile-Away added a commit that referenced this pull request Jan 25, 2026
* Feature/literature mcp (#192)

* feat: literature-MCP 完整功能

* refactor: improve boolean parsing and logging in literature search functions

* feat: enhance literature search functionality with improved query validation and detailed results formatting

* refactor: rename oa_url to access_url in LiteratureWork model and related tests

* feat: remove test-build workflow and update README for development setup

* feat: tool cost system and PPTX image handling fixes (#193)

* fix: prompt, factory

* feat: enhanced ppt generation with image slides mode

- Add image_slides mode for PPTX with full-bleed AI-generated images
- Add ImageBlock.image_id field for referencing generated images
- Add ImageSlideSpec for image-only slides
- Add ImageFetcher service for fetching images from various sources
- Reorganize knowledge module from single file to module structure
- Move document utilities from app/mcp/ to app/tools/utils/documents/
- Resolve image_ids to storage URLs in async layer (operations.py)
- Fix type errors and move tests to proper location

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: implement the tool cost

---------

Co-authored-by: Claude <noreply@anthropic.com>

* fix: fix the first time calling knowledge tool error (#194)

* fix: fix the wrong cache for second call of agent tools (#195)

* feat: several improvements (#196)

* fix: jump to latest topic when click agent

* feat: allow more than one image for generate image

* feat: allow user directly edit mcp in the chat-toolbar

* feat: improve the frontend perf

* feat: multiple UI improvements and fixes (#198)

* fix: jump to latest topic when click agent

* feat: allow more than one image for generate image

* feat: allow user directly edit mcp in the chat-toolbar

* feat: improve the frontend perf

* fix: restore previous active topic when clicking agent

Instead of always jumping to the latest topic, now tracks and restores
the previously active topic for each agent when switching between them.

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: add context menu to FocusedView agents and download button to lightbox

- Add right-click context menu (edit/delete) to compact AgentListItem variant
- Render context menu via portal to escape overflow:hidden containers
- Add edit/delete handlers to FocusedView with AgentSettingsModal and ConfirmationModal
- Add download button to image lightbox with smart filename detection

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: add web_fetch tool bundled with web_search

- Add web_fetch tool using Trafilatura for content extraction
- Bundle web_fetch with web_search in frontend toolConfig
- Group WEB_SEARCH_TOOLS for unified toggle behavior
- Only load web_fetch when web_search is available (SearXNG enabled)
- Update tool capabilities mapping for web_fetch

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>

* feat: fix the fork issue and implement the locked fork

---------

Co-authored-by: Meng Junxing <junxingmeng@gmail.com>
Co-authored-by: Harvey <q-query@outlook.com>
Co-authored-by: Claude <noreply@anthropic.com>
Mile-Away added a commit that referenced this pull request Jan 26, 2026
* Feature/better agent community (#200)

* Feature/literature mcp (#192)

* feat: literature-MCP 完整功能

* refactor: improve boolean parsing and logging in literature search functions

* feat: enhance literature search functionality with improved query validation and detailed results formatting

* refactor: rename oa_url to access_url in LiteratureWork model and related tests

* feat: remove test-build workflow and update README for development setup

* feat: tool cost system and PPTX image handling fixes (#193)

* fix: prompt, factory

* feat: enhanced ppt generation with image slides mode

- Add image_slides mode for PPTX with full-bleed AI-generated images
- Add ImageBlock.image_id field for referencing generated images
- Add ImageSlideSpec for image-only slides
- Add ImageFetcher service for fetching images from various sources
- Reorganize knowledge module from single file to module structure
- Move document utilities from app/mcp/ to app/tools/utils/documents/
- Resolve image_ids to storage URLs in async layer (operations.py)
- Fix type errors and move tests to proper location

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: implement the tool cost

---------

Co-authored-by: Claude <noreply@anthropic.com>

* fix: fix the first time calling knowledge tool error (#194)

* fix: fix the wrong cache for second call of agent tools (#195)

* feat: several improvements (#196)

* fix: jump to latest topic when click agent

* feat: allow more than one image for generate image

* feat: allow user directly edit mcp in the chat-toolbar

* feat: improve the frontend perf

* feat: multiple UI improvements and fixes (#198)

* fix: jump to latest topic when click agent

* feat: allow more than one image for generate image

* feat: allow user directly edit mcp in the chat-toolbar

* feat: improve the frontend perf

* fix: restore previous active topic when clicking agent

Instead of always jumping to the latest topic, now tracks and restores
the previously active topic for each agent when switching between them.

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: add context menu to FocusedView agents and download button to lightbox

- Add right-click context menu (edit/delete) to compact AgentListItem variant
- Render context menu via portal to escape overflow:hidden containers
- Add edit/delete handlers to FocusedView with AgentSettingsModal and ConfirmationModal
- Add download button to image lightbox with smart filename detection

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: add web_fetch tool bundled with web_search

- Add web_fetch tool using Trafilatura for content extraction
- Bundle web_fetch with web_search in frontend toolConfig
- Group WEB_SEARCH_TOOLS for unified toggle behavior
- Only load web_fetch when web_search is available (SearXNG enabled)
- Update tool capabilities mapping for web_fetch

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>

* feat: fix the fork issue and implement the locked fork

---------

Co-authored-by: Meng Junxing <junxingmeng@gmail.com>
Co-authored-by: Harvey <q-query@outlook.com>
Co-authored-by: Claude <noreply@anthropic.com>

* fix: prevent forked agents from being republished to marketplace (#201)

* fix: prevent forked agents from being republished to marketplace

Forked agents were able to be republished, which could expose the original
agent's configuration. This fix adds validation at both API and UI levels:

- Backend: Add validation in publish endpoint to reject agents with
  original_source_id set (HTTP 400)
- Frontend: Hide publish button for forked agents in AgentSettingsModal
  and WorkflowEditor
- Types: Add original_source_id and source_version fields to Agent interface

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: address code review feedback for fork detection

- Extract `isForked` helper variable to avoid duplication
- Use explicit nullish check (`!= null`) to match backend `is not None` semantic
- Replace implicit empty div spacer with dynamic justify-* class in WorkflowEditor

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: add justfile for better command

* feat: improve AGENTS.md and fix backend fix

---------

Co-authored-by: Claude <noreply@anthropic.com>

---------

Co-authored-by: xinquiry(SII) <100398322+xinquiry@users.noreply.github.com>
Co-authored-by: Meng Junxing <junxingmeng@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants