diff --git a/service/app/agents/mcp_tools.py b/service/app/agents/mcp_tools.py index 8c9f4b05..6d053086 100644 --- a/service/app/agents/mcp_tools.py +++ b/service/app/agents/mcp_tools.py @@ -164,60 +164,20 @@ async def execute_tool_call( f"Checking tool '{tool_name}' for injection. Schema props: {list(input_schema.get('properties', {}).keys())}" ) - schema_props = input_schema.get("properties", {}) - - # Inject knowledge_set_id when tool schema supports it. - if "knowledge_set_id" in schema_props: - logger.info(f"Tool '{tool_name}' supports knowledge_set_id injection.") - effective_ks_id = None - # Prefer session-level override if available. - if session_id: - try: - from app.repos.session import SessionRepository - - session_repo = SessionRepository(db) - session_obj = await session_repo.get_session_by_id(session_id) - if session_obj and getattr(session_obj, "knowledge_set_id", None): - effective_ks_id = session_obj.knowledge_set_id - logger.info(f"Using session knowledge_set_id override: {effective_ks_id}") - except Exception as e: - logger.warning(f"Failed to load session knowledge_set_id override: {e}") - - if effective_ks_id is None and agent: - effective_ks_id = getattr(agent, "knowledge_set_id", None) - logger.info(f"Agent Knowledge Set ID: {effective_ks_id}") - - if effective_ks_id: - args_dict["knowledge_set_id"] = str(effective_ks_id) - logger.info(f"Injected knowledge_set_id: {effective_ks_id} into args") + if "knowledge_set_id" in input_schema.get("properties", {}): + logger.info(f"Tool '{tool_name}' requires knowledge_set_id.") + if agent: + ks_id = getattr(agent, "knowledge_set_id", None) + logger.info(f"Agent found. Knowledge Set ID: {ks_id}") + if ks_id: + args_dict["knowledge_set_id"] = str(ks_id) + logger.info(f"Injected knowledge_set_id: {ks_id} into args") + else: + logger.warning(f"Agent {agent.id} has NO knowledge_set_id bound!") else: - logger.warning("No knowledge_set_id available for injection") - - # Inject user_id when tool schema supports it. - if "user_id" in schema_props: - logger.info(f"Tool '{tool_name}' supports user_id injection.") - effective_user_id = None - if session_id: - try: - from app.repos.session import SessionRepository - - session_repo = SessionRepository(db) - session_obj = await session_repo.get_session_by_id(session_id) - if session_obj and getattr(session_obj, "user_id", None): - effective_user_id = session_obj.user_id - logger.info(f"Using session user_id: {effective_user_id}") - except Exception as e: - logger.warning(f"Failed to load session user_id: {e}") - - if effective_user_id is None and agent: - effective_user_id = getattr(agent, "user_id", None) - logger.info(f"Agent user_id: {effective_user_id}") - - if effective_user_id: - args_dict["user_id"] = str(effective_user_id) - logger.info("Injected user_id into args") - else: - logger.warning("No user_id available for injection") + logger.warning("No agent context available for injection!") + else: + logger.info(f"Tool '{tool_name}' does NOT require knowledge_set_id.") try: result = await call_mcp_tool(refreshed_server, tool_name, args_dict) diff --git a/service/app/mcp/literature.py b/service/app/mcp/literature.py deleted file mode 100644 index df854cfd..00000000 --- a/service/app/mcp/literature.py +++ /dev/null @@ -1,885 +0,0 @@ -"""MCP Server for Literature / Bibliography fetching. - -This module is intentionally minimal at first: it provides tool entry points and -will be progressively wired to providers (OpenAlex first) under -`app.utils.literature`. -""" - -from __future__ import annotations - -import json -import logging -from collections.abc import Iterable, Mapping -from typing import Any, cast -from uuid import UUID - - -from fastmcp import FastMCP -from fastmcp.server.auth import JWTVerifier, TokenVerifier -from fastmcp.server.dependencies import get_access_token - -from app.utils.literature.aggregator import fetch_works as agg_fetch_works -from app.utils.literature.models import LiteratureQuery -from app.middleware.auth import AuthProvider -from app.middleware.auth.token_verifier.bohr_app_token_verifier import BohrAppTokenVerifier -from app.utils.literature.exporter import ( - derive_xlsx_filename, - build_xlsx_bytes, - persist_xlsx, -) -from app.utils.literature.providers.openalex import ( - search_sources as openalex_search_sources, - search_authors as openalex_search_authors, -) - - -def _coerce_int(value: Any) -> int | None: - if value is None: - return None - if isinstance(value, bool): - return None - if isinstance(value, int): - return value - if isinstance(value, float) and value.is_integer(): - return int(value) - if isinstance(value, str): - s = value.strip() - if s.isdigit() or (s.startswith("-") and s[1:].isdigit()): - try: - return int(s) - except Exception: - return None - return None - - -def _coerce_str_list(value: Any) -> list[str] | None: - if value is None: - return None - - def normalize_one(s: str) -> list[str]: - ss = s.strip() - if not ss: - return [] - # Unwrap JSON stringified list: '["openalex"]' - if ss.startswith("[") and ss.endswith("]"): - try: - obj = json.loads(ss) - except Exception: - obj = None - if isinstance(obj, list): - out2: list[str] = [] - for item in obj: - if isinstance(item, str) and item.strip(): - out2.append(item.strip()) - return out2 - # Accept both comma-separated and single key. - parts = [p.strip() for p in ss.split(",")] - return [p for p in parts if p] - - if isinstance(value, list): - out: list[str] = [] - for v in value: - if isinstance(v, str): - out.extend(normalize_one(v)) - return out or None - if isinstance(value, str): - out = normalize_one(value) - return out or None - return None - - -def _coerce_dict(value: Any) -> dict[str, Any] | None: - if value is None: - return None - if isinstance(value, dict): - return value - if isinstance(value, str): - s = value.strip() - if not s: - return None - # Allow JSON dict passed as a string. - try: - obj = json.loads(s) - return obj if isinstance(obj, dict) else None - except Exception: - return None - return None - - -def _coerce_bool(value: Any) -> bool | None: - if value is None: - return None - if isinstance(value, bool): - return value - if isinstance(value, int) and value in (0, 1): - return bool(value) - if isinstance(value, str): - s = value.strip().lower() - if s in {"true", "1", "yes", "y", "on"}: - return True - if s in {"false", "0", "no", "n", "off"}: - return False - return None - - -def _build_retry_response( - *, - message: str, - suggested_args: dict[str, Any], - warnings: list[str], - extra_meta: dict[str, Any] | None = None, - errors: list[dict[str, Any]] | None = None, - stats: dict[str, Any] | None = None, -) -> dict[str, Any]: - # Shape matches LiteratureResult-like envelope (extra fields allowed). - meta: dict[str, Any] = { - "needs_retry": True, - "warnings": warnings, - "suggested_call": {"tool": "fetch_works", "args": suggested_args}, - } - if extra_meta: - meta.update(extra_meta) - - # Strong guidance for LLM tool-calling loops. - # - On `needs_retry`, only retry once with the suggested args. - # - On success responses, the tool will explicitly say not to re-call. - call_attempt = meta.get("call_attempt") - max_call_attempts = meta.get("max_call_attempts") - try: - attempt_i = int(call_attempt) if call_attempt is not None else None - except Exception: - attempt_i = None - try: - max_i = int(max_call_attempts) if max_call_attempts is not None else None - except Exception: - max_i = None - max_note = f" (max {max_i})" if max_i else "" - summary_note = ( - "本次请求未成功完成/需要重试:请只按 meta.suggested_call 进行下一次调用(不要反复尝试不同参数)。" - "如果仍失败,可继续重试直到成功,但请限制次数;只有用户明确提出再次检索/换条件检索时才进行新的查询。" - + (f" 当前尝试次数: {attempt_i}{max_note}." if attempt_i else "") - + " (EN: Retry needed. Re-call ONLY once using meta.suggested_call; avoid repeated/variant calls. If still failing, retry up to the limit; only start a new search if the user explicitly requests it.)" - ) - - return { - "success": False, - "results": [], - "errors": errors - if errors is not None - else [ - { - "provider": "mcp", - "message": message, - "error_code": "invalid_params", - "retryable": True, - } - ], - "stats": stats or {}, - "meta": meta, - "summary": {"note": summary_note}, - } - - -def _extract_knowledge_set_id(extra: dict[str, Any] | None, claims: dict[str, Any]) -> UUID | None: - candidates: list[Any] = [] - if extra: - candidates.extend( - [ - extra.get("knowledge_set_id"), - extra.get("knowledgeSetId"), - extra.get("knowledge_setId"), - ] - ) - candidates.extend( - [ - claims.get("knowledge_set_id"), - claims.get("knowledgeSetId"), - claims.get("knowledge_setId"), - ] - ) - for c in candidates: - if not c: - continue - try: - return UUID(str(c)) - except Exception: - continue - return None - - -def _coerce_uuid(value: Any) -> UUID | None: - if value is None: - return None - try: - return UUID(str(value)) - except Exception: - return None - - -def _coerce_user_id(value: Any) -> str | None: - if value is None: - return None - if isinstance(value, str): - s = value.strip() - return s or None - # Some callers may pass UUID-like values. - try: - s2 = str(value).strip() - return s2 or None - except Exception: - return None - - -logger = logging.getLogger(__name__) - - -# NOTE: OpenAlex politely requests a contact email via the `mailto` query param. -# This default must NOT be exposed to the LLM/tool schema, so it is applied only -# when the caller does not provide one. -_DEFAULT_OPENALEX_MAILTO = "mengjunxing@shu.edu.cn" - - -__mcp_metadata__ = { - "source": "official", - "description": "Fetch and clean literature metadata from multiple sources (OpenAlex first)", - "banner": None, -} - - -literature_mcp: FastMCP = FastMCP(name="Literature 📚", version="0.1.0") - - -def _build_preview_results( - results: Iterable[Mapping[str, Any]] | None, - *, - max_items: int = 5, - strip_fields: set[str] | None = None, -) -> list[dict[str, Any]]: - """Build a small, display-friendly preview list for LLM responses. - - We intentionally strip large/debug fields (e.g. `raw`, `referenced_works`) to - avoid context explosion in multi-call tool loops. - """ - if max_items <= 0: - return [] - if not results: - return [] - strip_fields = strip_fields or {"raw", "referenced_works"} - preview: list[dict[str, Any]] = [] - for idx, w in enumerate(results): - if idx >= max_items: - break - item = {k: v for k, v in w.items() if k not in strip_fields} - preview.append(item) - return preview - - -# --- Authentication Configuration (matches other MCP servers in this directory) --- -auth: TokenVerifier - -match AuthProvider.get_provider_name(): - case "bohrium": - auth = JWTVerifier(public_key=AuthProvider.public_key) - case "casdoor": - auth = JWTVerifier(jwks_uri=AuthProvider.jwks_uri) - case "bohr_app": - auth = BohrAppTokenVerifier( - api_url=AuthProvider.issuer, - x_app_key="xyzen-uuid1760783737", - ) - case _: - raise ValueError(f"Unsupported authentication provider: {AuthProvider.get_provider_name()}") - - -@literature_mcp.tool -async def fetch_works( - query: str | None = None, - doi: str | None = None, - title: str | None = None, - author: str | None = None, - author_id: Any = None, - year_from: Any = None, - year_to: Any = None, - limit: Any = 20, - sort_by_cited_by_count: Any = False, - journal: Any = None, - journal_source_id: Any = None, - knowledge_set_id: Any = None, - user_id: Any = None, - mailto: str | None = None, - providers: Any = None, - provider_params: Any = None, - cleaner_params: Any = None, - call_attempt: Any = 1, - max_call_attempts: Any = 3, -) -> dict[str, Any]: - """Fetch literature works metadata (papers/books/etc.) and return cleaned results. - - This tool is designed for LLM use. It supports strict/precise calls and also a - "normalize-then-retry" workflow when inputs are not in the expected format. - - **Pipeline** - - Build a unified `LiteratureQuery` - - Query one or more providers (OpenAlex first) - - Clean results before returning (DOI normalization + de-duplication by default) - - **Core parameters** - - `query`: Free-text search. - - `doi`: DOI to match. Accepts forms like `10.xxxx/...`, `doi:10.xxxx/...`, or a DOI URL. - - `title` / `author`: Optional structured hints. - - `year_from` / `year_to`: Optional year range filter. - - `limit`: Max works to fetch (1..500). Used for request size and optional post-truncation. - - **Sorting** - - `sort_by_cited_by_count`: If true, sorts by citations (descending) and truncates results to `limit`. - Also sets OpenAlex `sort=cited_by_count:desc` unless you already set a provider sort. - - **Exact journal targeting (recommended for “Nature/Science/IEEE TMI” requests)** - OpenAlex cannot reliably filter by journal *name* directly. Use the two-step ID pattern: - - Preferred: `journal_source_id`: OpenAlex Source IDs like `S2764455111`. - The tool will filter works using `primary_location.source.id:Sxxxx` (OR supported via `|`). - - Convenience: `journal`: One or more journal names. If provided *without* `journal_source_id`, - the tool performs a lookup against `/sources` and returns a **retryable** response that includes - candidate Source IDs in `meta.journal_candidates`, plus a suggested second call. - - **Exact author targeting (recommended for “papers by ” requests)** - OpenAlex cannot reliably filter by author *name* directly. Use the two-step ID pattern: - - Preferred: `author_id`: OpenAlex Author IDs like `A1234567890`. - The tool will filter works using `authorships.author.id:Axxxx` (OR supported via `|`). - - Convenience: `author`: If provided *without* `author_id`, the tool performs a lookup - against `/authors` and returns candidate Author IDs in `meta.author_candidates`, plus - a suggested second call. - - **Providers** - - `providers`: Provider keys to use. - - Standard format: lower-case provider key strings, e.g. `providers=["openalex"]`. - - Current supported values: `openalex`. - - If you pass an unsupported key, the tool will fail with an error that includes the supported provider list. - - If omitted, defaults to `openalex`. - - **OpenAlex polite pool** - - `mailto`: Optional contact email used for OpenAlex polite requests. If omitted, a - server-side default is applied (not exposed via tool defaults). - - **Knowledge Base persistence (XLSX)** - - `knowledge_set_id`: Optional UUID of the Knowledge Set to save the full results XLSX into. - Recommended: pass this explicitly (the Xyzen agent runtime can inject it). - If omitted, the server will try to infer it from token claims, which may be unavailable. - - `user_id`: Optional user id for access control. Recommended to pass explicitly (agent runtime can inject it). - If omitted, the server will try to infer it from the request token, which may not represent the end user. - - **provider_params** - Provider-specific raw params. Two supported shapes: - 1) Direct OpenAlex params (auto-wrapped): - `{ "filter": "publication_year:2024", "sort": "cited_by_count:desc" }` - 2) Explicit provider map (keys must match `providers`): - `{ "openalex": { "filter": "...", "cursor": "*" } }` - - OpenAlex extras supported via `provider_params.openalex`: - - `filter`, `search`, `sort`, `cursor`, `sample`, `seed`, `per-page`, `select`, etc. - - `include_referenced_works`: boolean. If true, includes `referenced_works` IDs in each record (can be large). - - `max_referenced_works`: int 0..200. Truncates `referenced_works` list to control payload size. - - **cleaner_params** - Cleaner configuration. If `cleaner_params["callable"]` is set, it should be a dotted-path callable - like `package.module:function` and will override the default cleaner. Any other keys are forwarded - as kwargs. - - **Normalize-then-retry behavior** - If any of these require coercion/clamping/dropping (e.g. `limit` is a string, years invalid, - `provider_params` is JSON string), the tool will NOT execute the search. Instead it returns: - - `meta.needs_retry = true` - - `meta.warnings = [...]` - - `meta.suggested_call = { tool: "fetch_works", args: { ...normalized... } }` - - **Return shape** - A JSON dict compatible with `LiteratureResult`, typically including: - - `success`: bool - - `results`: list of WorkRecord objects (includes `doi`, `title`, `authors`, `year`, `journal`, - `work_type`, `cited_by_count`, `referenced_works_count`, plus optional `referenced_works` and `raw`) - - `errors`: list of provider/cleaner errors (if any) - - `stats` / `meta`: provider timing/counts and execution metadata - """ - # --- Input coercion/validation gate --- - # Policy: if we need to coerce/clamp/drop anything, we DO NOT execute search. - # Instead, we return a retryable response that tells the LLM exactly how to re-call. - warnings: list[str] = [] - - call_attempt_i = _coerce_int(call_attempt) or 1 - max_call_attempts_i = _coerce_int(max_call_attempts) or 3 - if call_attempt_i < 1: - call_attempt_i = 1 - if max_call_attempts_i < 1: - max_call_attempts_i = 1 - if max_call_attempts_i > 10: - max_call_attempts_i = 10 - if call_attempt_i > max_call_attempts_i: - call_attempt_i = max_call_attempts_i - - limit_i = _coerce_int(limit) - if limit_i is None: - warnings.append("limit is not an int; using default 20") - limit_i = 20 - if limit_i < 1 or limit_i > 500: - clamped = min(max(limit_i, 1), 500) - warnings.append(f"limit out of range (1..500); clamped to {clamped}") - limit_i = clamped - - year_from_i = _coerce_int(year_from) - year_to_i = _coerce_int(year_to) - if year_from is not None and year_from_i is None: - warnings.append("year_from is not an int; dropping") - if year_to is not None and year_to_i is None: - warnings.append("year_to is not an int; dropping") - if year_from_i is not None and year_from_i < 0: - warnings.append("year_from < 0; dropping") - year_from_i = None - if year_to_i is not None and year_to_i < 0: - warnings.append("year_to < 0; dropping") - year_to_i = None - if year_from_i is not None and year_to_i is not None and year_from_i > year_to_i: - warnings.append("year_from > year_to; swapping") - year_from_i, year_to_i = year_to_i, year_from_i - - sort_by_cited_by_count_b = _coerce_bool(sort_by_cited_by_count) - if sort_by_cited_by_count is not None and sort_by_cited_by_count_b is None: - warnings.append("sort_by_cited_by_count is not a bool; dropping") - sort_by_cited_by_count_b = False - - providers_l = _coerce_str_list(providers) - if providers is not None and providers_l is None: - warnings.append("providers is not a list[str]; dropping") - - provider_params_d = _coerce_dict(provider_params) - if provider_params is not None and provider_params_d is None: - warnings.append("provider_params is not a dict (or JSON dict string); dropping") - - cleaner_params_d = _coerce_dict(cleaner_params) - if cleaner_params is not None and cleaner_params_d is None: - warnings.append("cleaner_params is not a dict (or JSON dict string); dropping") - - journal_names = _coerce_str_list(journal) - if journal is not None and journal_names is None: - warnings.append("journal is not a string/list[str]; dropping") - - journal_source_ids = _coerce_str_list(journal_source_id) - if journal_source_id is not None and journal_source_ids is None: - warnings.append("journal_source_id is not a string/list[str]; dropping") - - author_ids = _coerce_str_list(author_id) - if author_id is not None and author_ids is None: - warnings.append("author_id is not a string/list[str]; dropping") - - knowledge_set_uuid = _coerce_uuid(knowledge_set_id) - if knowledge_set_id is not None and knowledge_set_uuid is None: - warnings.append("knowledge_set_id is not a valid UUID; dropping") - - user_id_s = _coerce_user_id(user_id) - if user_id is not None and user_id_s is None: - warnings.append("user_id is not a valid string; dropping") - - # If any coercion/clamp/drop happened, ask LLM to retry with normalized args. - # (This keeps the next call precise and reproducible.) - if warnings: - suggested_args: dict[str, Any] = { - "query": query, - "doi": doi, - "title": title, - "author": author, - "author_id": author_ids, - "year_from": year_from_i, - "year_to": year_to_i, - "limit": limit_i, - "sort_by_cited_by_count": sort_by_cited_by_count_b, - "journal": journal_names, - "journal_source_id": journal_source_ids, - "knowledge_set_id": str(knowledge_set_uuid) if knowledge_set_uuid else None, - "user_id": user_id_s, - "mailto": mailto, - "providers": providers_l, - "provider_params": provider_params_d, - "cleaner_params": cleaner_params_d, - "call_attempt": call_attempt_i, - "max_call_attempts": max_call_attempts_i, - } - # Remove explicit nulls to keep the suggestion compact. - suggested_args = {k: v for k, v in suggested_args.items() if v is not None} - return _build_retry_response( - message="Some parameters were not in the expected format/range. Please re-call fetch_works with the suggested normalized args.", - suggested_args=suggested_args, - warnings=warnings, - extra_meta={"call_attempt": call_attempt_i, "max_call_attempts": max_call_attempts_i}, - ) - - # Normalize provider params: - # - If caller passes OpenAlex raw params directly, wrap into {"openalex": {...}} - # - If caller passes {"openalex": {...}}, keep as-is. - normalized_provider_params: dict[str, dict[str, Any]] = {} - if provider_params_d: - if isinstance(provider_params_d.get("openalex"), dict): - normalized_provider_params = {"openalex": provider_params_d["openalex"]} - else: - normalized_provider_params = {"openalex": provider_params_d} - - # Ensure OpenAlex gets a `mailto` value (caller-provided wins). - openalex_params = normalized_provider_params.setdefault("openalex", {}) - if mailto: - openalex_params["mailto"] = mailto - elif "mailto" not in openalex_params: - openalex_params["mailto"] = _DEFAULT_OPENALEX_MAILTO - - # Journal / Author targeting: - # - Preferred: explicit IDs (journal_source_id, author_id) - # - Convenience: name lookup (two-step) - # - Fuzzy fallback: ONLY when call_attempt == max_call_attempts - mailto_effective: str | None = mailto - if mailto_effective is None: - mv = openalex_params.get("mailto") - mailto_effective = mv if isinstance(mv, str) else None - - precision_meta: dict[str, Any] = {} - precision_warnings: list[str] = [] - is_last_attempt = call_attempt_i >= max_call_attempts_i - - resolved_journal_ids: list[str] = [] - journal_candidates_by_name: dict[str, Any] = {} - if journal_names and not journal_source_ids: - for jn in journal_names: - cands = await openalex_search_sources(jn, mailto=mailto_effective, per_page=10) - journal_candidates_by_name[jn] = cands - exact = [ - c - for c in cands - if isinstance(c.get("display_name"), str) and c["display_name"].strip().lower() == jn.strip().lower() - ] - if len(exact) == 1 and isinstance(exact[0].get("id"), str): - resolved_journal_ids.append(exact[0]["id"]) - elif len(cands) == 1 and isinstance(cands[0].get("id"), str): - resolved_journal_ids.append(cands[0]["id"]) - precision_meta["journal_candidates"] = journal_candidates_by_name - precision_warnings.append("journal name lookup performed; use journal_source_id for exact filtering") - - resolved_author_ids: list[str] = [] - author_candidates_by_name: dict[str, Any] = {} - if author and not author_ids: - cands = await openalex_search_authors(author, mailto=mailto_effective, per_page=10) - author_candidates_by_name[author] = cands - exact = [ - c - for c in cands - if isinstance(c.get("display_name"), str) and c["display_name"].strip().lower() == author.strip().lower() - ] - if len(exact) == 1 and isinstance(exact[0].get("id"), str): - resolved_author_ids.append(exact[0]["id"]) - elif len(cands) == 1 and isinstance(cands[0].get("id"), str): - resolved_author_ids.append(cands[0]["id"]) - precision_meta["author_candidates"] = author_candidates_by_name - precision_warnings.append("author name lookup performed; use author_id for exact filtering") - - # Decide whether we must stop and ask for a precise retry. - journal_ambiguous = bool( - journal_names - and not journal_source_id - and ((not resolved_journal_ids) or (resolved_journal_ids and len(resolved_journal_ids) != len(journal_names))) - ) - author_ambiguous = bool(author and not author_id and not resolved_author_ids) - - # If not last attempt, do not run a fuzzy works search. - # Return candidates and ask caller to retry with explicit IDs. - if (journal_ambiguous or author_ambiguous) and not is_last_attempt: - suggested_args: dict[str, Any] = { - "query": query, - "doi": doi, - "title": title, - "author": author, - "author_id": resolved_author_ids or None, - "year_from": year_from_i, - "year_to": year_to_i, - "limit": limit_i, - "sort_by_cited_by_count": sort_by_cited_by_count_b, - "journal": journal_names, - "journal_source_id": resolved_journal_ids or None, - "knowledge_set_id": str(knowledge_set_uuid) if knowledge_set_uuid else None, - "user_id": user_id_s, - "mailto": mailto, - "providers": providers_l, - "provider_params": provider_params_d, - "cleaner_params": cleaner_params_d, - "call_attempt": min(call_attempt_i + 1, max_call_attempts_i), - "max_call_attempts": max_call_attempts_i, - } - suggested_args = {k: v for k, v in suggested_args.items() if v is not None} - extra_meta: dict[str, Any] = dict(precision_meta) - extra_meta.update({"call_attempt": call_attempt_i, "max_call_attempts": max_call_attempts_i}) - return _build_retry_response( - message=( - "Journal/author names are ambiguous in OpenAlex. Please retry with explicit journal_source_id and/or author_id." - ), - suggested_args=suggested_args, - warnings=precision_warnings or ["name lookup performed; retry with explicit IDs for exact filtering"], - extra_meta=extra_meta, - ) - - # If we resolved IDs confidently from names, apply them as if caller provided them. - if resolved_journal_ids and not journal_source_ids: - journal_source_ids = resolved_journal_ids - if resolved_author_ids and not author_ids: - author_ids = resolved_author_ids - - # If journal is ambiguous (no IDs), we still try one fuzzy works call (best-effort) - # ONLY on the last attempt. - if is_last_attempt and journal_names and not journal_source_ids: - base_parts = [p for p in [query, title, author] if isinstance(p, str) and p.strip()] - base_search = openalex_params.get("search") if isinstance(openalex_params.get("search"), str) else None - if not base_search: - base_search = " ".join(base_parts) - journal_hint = " ".join(journal_names) - combined = (base_search + " " + journal_hint).strip() if base_search else journal_hint - if combined: - openalex_params["search"] = combined - - # Keep precision metadata for UI/LLM even on fuzzy fallback. - if precision_warnings: - precision_meta.setdefault("call_attempt", call_attempt_i) - precision_meta.setdefault("max_call_attempts", max_call_attempts_i) - - if journal_source_ids: - journal_filter = "primary_location.source.id:" + "|".join(journal_source_ids) - if isinstance(openalex_params.get("filter"), str) and openalex_params["filter"].strip(): - openalex_params["filter"] = f"{openalex_params['filter']},{journal_filter}" - else: - openalex_params["filter"] = journal_filter - - if author_ids: - author_filter = "authorships.author.id:" + "|".join(author_ids) - if isinstance(openalex_params.get("filter"), str) and openalex_params["filter"].strip(): - openalex_params["filter"] = f"{openalex_params['filter']},{author_filter}" - else: - openalex_params["filter"] = author_filter - - # Optional: ask provider to sort by citations. - if sort_by_cited_by_count_b: - openalex_params.setdefault("sort", "cited_by_count:desc") - - cleaner_callable: str | None = None - if cleaner_params_d and isinstance(cleaner_params_d.get("callable"), str): - cleaner_callable = cleaner_params_d.get("callable") - - lq = LiteratureQuery( - query=query, - doi=doi, - title=title, - author=author, - year_from=year_from_i, - year_to=year_to_i, - limit=limit_i, - providers=providers_l, - provider_params=normalized_provider_params, - ) - - result = await agg_fetch_works( - lq, - cleaner_callable=cleaner_callable, - cleaner_params=cleaner_params_d, - ) - - # If provider fetch failed (no usable results + errors), allow limited tool-level retries. - # Note: providers already do internal HTTP retries; this is to cap LLM-level repeated calls. - if not result.success: - full_fail_payload: dict[str, Any] = result.model_dump(mode="json") - next_attempt = call_attempt_i + 1 - suggested_args: dict[str, Any] = { - "query": query, - "doi": doi, - "title": title, - "author": author, - "year_from": year_from_i, - "year_to": year_to_i, - "limit": limit_i, - "sort_by_cited_by_count": sort_by_cited_by_count_b, - "journal": journal_names, - "journal_source_id": journal_source_ids, - "knowledge_set_id": str(knowledge_set_uuid) if knowledge_set_uuid else None, - "user_id": user_id_s, - "mailto": mailto, - "providers": providers_l, - "provider_params": provider_params_d, - "cleaner_params": cleaner_params_d, - "call_attempt": min(next_attempt, max_call_attempts_i), - "max_call_attempts": max_call_attempts_i, - } - suggested_args = {k: v for k, v in suggested_args.items() if v is not None} - errors_any = full_fail_payload.get("errors") - stats_any = full_fail_payload.get("stats") - errors_list: list[dict[str, Any]] = ( - [cast(dict[str, Any], e) for e in errors_any] if isinstance(errors_any, list) else [] - ) - stats_dict: dict[str, Any] = cast(dict[str, Any], stats_any) if isinstance(stats_any, dict) else {} - if call_attempt_i >= max_call_attempts_i: - # Hard stop: let the caller decide next action. - return { - "success": False, - "results": [], - "errors": errors_list, - "stats": stats_dict, - "meta": { - "needs_retry": False, - "call_attempt": call_attempt_i, - "max_call_attempts": max_call_attempts_i, - "return_policy": "Retry limit reached; do not automatically re-call.", - }, - "summary": { - "note": ( - "文献源请求失败,且已达到最大重试次数;请不要继续自动调用MCP。" - "只有当用户明确要求再次检索/更换条件,或你需要人工调整参数/等待后再试时,才发起新的调用。" - " (EN: Fetch failed and retry limit reached; do not auto re-call. Only re-call if the user explicitly requests a new search or after changing conditions.)" - ) - }, - } - - return _build_retry_response( - message="Provider fetch failed. Please retry with the same parameters (or wait briefly) using the suggested call.", - suggested_args=suggested_args, - warnings=["provider fetch failed; limited retry is allowed"], - extra_meta={"call_attempt": call_attempt_i, "max_call_attempts": max_call_attempts_i}, - errors=errors_list, - stats=stats_dict, - ) - - # Optional: post-sort and truncate to limit. - if sort_by_cited_by_count_b and result.results: - result.results.sort(key=lambda w: (w.cited_by_count is not None, w.cited_by_count or 0), reverse=True) - if len(result.results) > limit_i: - result.results = result.results[:limit_i] - result.meta = result.meta or {} - result.meta["sorted_by"] = "cited_by_count:desc" - result.meta["truncated_to"] = limit_i - - # --- Always-on persistence to Knowledge Base as XLSX --- - persist_meta: dict[str, Any] = {"saved": False} - - # Build the *full* payload for persistence/export before we truncate anything. - full_payload: dict[str, Any] = result.model_dump(mode="json") - try: - access_token = get_access_token() - if access_token: - user_info = AuthProvider.parse_user_info(access_token.claims) - effective_user_id = user_id_s or user_info.id - ksid = knowledge_set_uuid or _extract_knowledge_set_id(user_info.extra, access_token.claims) - if ksid: - filename = derive_xlsx_filename(topic=query or title) - xlsx_bytes = build_xlsx_bytes(full_payload, query, title) - # Persist locally (storage + DB + link) - write_res = await persist_xlsx(effective_user_id, ksid, filename, xlsx_bytes) - persist_meta = { - "saved": bool(write_res.get("success")), - "filename": filename, - "knowledge_set_id": str(ksid), - "error": write_res.get("error"), - } - else: - persist_meta = { - "saved": False, - "error": "knowledge_set_id missing. Pass knowledge_set_id explicitly (preferred) or include it in token claims.", - } - else: - persist_meta = {"saved": False, "error": "access_token missing"} - except Exception as e: - logger.error(f"Persist literature XLSX failed: {e}") - persist_meta = {"saved": False, "error": str(e)} - - # --- Truncate what we return to the LLM (avoid context explosion) --- - # We keep the standard LiteratureResult envelope fields, but only return a small - # preview list for in-chat display. - preview_limit = 5 - results_any = full_payload.get("results") - results_raw_list: list[Any] = results_any if isinstance(results_any, list) else [] - results_list: list[dict[str, Any]] = [cast(dict[str, Any], w) for w in results_raw_list if isinstance(w, dict)] - preview_results = _build_preview_results(results_list, max_items=preview_limit) - - out = dict(full_payload) - out["results"] = preview_results - out.setdefault("meta", {}) - if precision_warnings: - out["meta"].setdefault("warnings", []) - if isinstance(out["meta"].get("warnings"), list): - out["meta"]["warnings"].extend(precision_warnings) - if precision_meta: - out["meta"].setdefault("precision", {}) - if isinstance(out["meta"].get("precision"), dict): - out["meta"]["precision"].update(precision_meta) - out["meta"]["persistence"] = persist_meta - out["meta"].setdefault("preview", {}) - out["meta"]["preview"].update( - { - "returned_results": len(preview_results), - "total_results": len(results_list), - "limit": limit_i, - "fields_stripped": ["raw", "referenced_works"], - "return_policy": "Only a small preview is returned to the LLM; full cleaned results are persisted to XLSX when possible.", - } - ) - # A short human-readable summary for LLM/UI. - providers_used = None - try: - meta_any = out.get("meta") - if isinstance(meta_any, dict): - providers_used = meta_any.get("providers") or meta_any.get("requested_providers") - except Exception: - providers_used = None - if isinstance(providers_used, list): - providers_list_any = cast(list[Any], providers_used) - providers_list_str = [p for p in providers_list_any if isinstance(p, str) and p] - providers_str = ",".join(providers_list_str) - else: - providers_str = "" - saved_ok = bool(persist_meta.get("saved")) - ksid_s = persist_meta.get("knowledge_set_id") - filename_s = persist_meta.get("filename") - - precision_hint = "" - try: - suggested_call_any = None - meta_any = out.get("meta") - if isinstance(meta_any, dict): - precision_any = meta_any.get("precision") - if isinstance(precision_any, dict): - suggested_call_any = precision_any.get("suggested_call") - if suggested_call_any: - precision_hint = ( - " 若需精确限定期刊/作者,请参考 meta.precision.suggested_call 再调用一次(建议只重试一次)。" - " (EN: For exact journal/author filtering, re-call once using meta.precision.suggested_call.)" - ) - except Exception: - precision_hint = "" - - if saved_ok and filename_s and ksid_s: - note = ( - f"已成功获取并清洗文献数据:对话中仅返回前{preview_limit}条用于展示;完整结果已保存为XLSX({filename_s},knowledge_set_id={ksid_s})。" - "请不要为了“确认”或“补全列表”而重复调用本MCP;只有当用户明确提出需要再次检索/换条件检索文献时,才再次调用。 " - + precision_hint - + f"(EN: Data fetched and cleaned. Only first {preview_limit} items are returned for chat display; full results saved to XLSX. Do NOT re-call this MCP unless the user explicitly requests a new/modified literature search.)" - ) - else: - note = ( - f"已成功获取并清洗文献数据:对话中仅返回前{preview_limit}条用于展示,以避免上下文爆炸。" - "请不要为了“确认”或“补全列表”而重复调用本MCP;只有当用户明确提出需要再次检索/换条件检索文献时,才再次调用。 " - + precision_hint - + f"(EN: Data fetched and cleaned. Only first {preview_limit} items are returned to avoid context explosion. Do NOT re-call unless the user explicitly requests a new/modified literature search.)" - ) - - out["summary"] = { - "query": query, - "doi": doi, - "title": title, - "author": author, - "year_from": year_from_i, - "year_to": year_to_i, - "providers": providers_str, - "total_results": len(results_list), - "returned_results": len(preview_results), - "xlsx_saved": saved_ok, - "xlsx_filename": filename_s, - "note": note, - } - return out diff --git a/service/app/utils/literature/__init__.py b/service/app/utils/literature/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/service/app/utils/literature/aggregator.py b/service/app/utils/literature/aggregator.py deleted file mode 100644 index 12bdf3a7..00000000 --- a/service/app/utils/literature/aggregator.py +++ /dev/null @@ -1,192 +0,0 @@ -from __future__ import annotations - -import asyncio -import json -import logging -import random -import time -from typing import Any - -import httpx - -from app.utils.literature.cleaners.base import resolve_cleaner_callable, run_cleaner -from app.utils.literature.models import LiteratureQuery, LiteratureResult, ProviderError, ProviderStats, WorkRecord -from app.utils.literature.providers.openalex import OpenAlexProvider - -logger = logging.getLogger(__name__) - - -DEFAULT_CLEANER_CALLABLE = "app.utils.literature.cleaners.doi:clean_works" - - -def _normalize_provider_keys(value: Any) -> list[str]: - """Normalize provider keys coming from LLM/tool calls. - - We accept: - - ["openalex"] - - ["[\"openalex\"]"] (JSON stringified list in a list) - - "openalex" / "OpenAlex" - - "openalex,crossref" (comma-separated) - """ - out: list[str] = [] - seen: set[str] = set() - - def add_one(s: str) -> None: - ss = s.strip() - if not ss: - return - - # If LLM passed a JSON list string (e.g. '["openalex"]'), unwrap it. - if (ss.startswith("[") and ss.endswith("]")) or (ss.startswith("{") and ss.endswith("}")): - try: - obj = json.loads(ss) - except Exception: - obj = None - if isinstance(obj, list): - for item in obj: - if isinstance(item, str): - add_one(item) - return - - # Accept comma-separated strings. - if "," in ss: - for part in ss.split(","): - add_one(part) - return - - key = ss.lower() - if key not in seen: - seen.add(key) - out.append(key) - - if value is None: - return [] - if isinstance(value, str): - add_one(value) - return out - if isinstance(value, (list, tuple, set)): - for v in value: - if isinstance(v, str): - add_one(v) - return out - return [] - - -def _default_providers() -> dict[str, Any]: - # NOTE: Provider instances are lightweight; we can recreate per request. - return { - "openalex": OpenAlexProvider(), - } - - -def _is_retryable_http_error(exc: Exception) -> tuple[bool, int | None]: - if isinstance(exc, httpx.HTTPStatusError): - status = exc.response.status_code - return status in {408, 429, 500, 502, 503, 504}, status - if isinstance(exc, httpx.RequestError): - return True, None - return False, None - - -async def fetch_works( - query: LiteratureQuery, - *, - max_concurrency: int = 5, - retries: int = 3, - base_backoff_s: float = 0.5, - cleaner_callable: str | None = None, - cleaner_params: dict[str, Any] | None = None, -) -> LiteratureResult: - """Fetch works from one or more providers and run cleaner before returning. - - - Concurrency is bounded across providers. - - Provider failures degrade gracefully into `errors`. - - If cleaner fails, returns uncleaned works with an error entry. - """ - - provider_registry = _default_providers() - requested_providers_raw = query.providers - requested_providers = _normalize_provider_keys(requested_providers_raw) - if not requested_providers: - requested_providers = ["openalex"] - - chosen = [p for p in requested_providers if p in provider_registry] - meta_warnings: list[str] = [] - if not chosen: - supported = sorted(provider_registry.keys()) - # Degrade gracefully: fall back to the default/supported providers instead - # of hard-failing the whole tool call. - fallback = ["openalex"] if "openalex" in provider_registry else supported - chosen = fallback - meta_warnings.append("No supported providers matched the request; falling back to: " + ", ".join(fallback)) - meta_warnings.append("Supported providers: " + ", ".join(supported)) - - semaphore = asyncio.Semaphore(max(1, max_concurrency)) - errors: list[ProviderError] = [] - stats: dict[str, ProviderStats] = {} - all_works: list[WorkRecord] = [] - - async def run_one(provider_name: str) -> None: - provider = provider_registry[provider_name] - async with semaphore: - attempt = 0 - started = time.perf_counter() - while True: - try: - resp = await provider.search_works(query) - all_works.extend(resp.works) - took_ms = int((time.perf_counter() - started) * 1000) - stats[provider_name] = ProviderStats( - provider=provider_name, - requests=attempt + 1, - fetched=len(resp.works), - took_ms=took_ms, - ) - return - except Exception as exc: - retryable, status_code = _is_retryable_http_error(exc) - attempt += 1 - if attempt > retries or not retryable: - took_ms = int((time.perf_counter() - started) * 1000) - stats[provider_name] = ProviderStats( - provider=provider_name, - requests=attempt, - fetched=0, - took_ms=took_ms, - ) - errors.append( - ProviderError( - provider=provider_name, - message=str(exc), - status_code=status_code, - retryable=retryable, - ) - ) - return - - delay = base_backoff_s * (2 ** (attempt - 1)) - delay = delay + random.uniform(0, delay * 0.25) - await asyncio.sleep(delay) - - await asyncio.gather(*(run_one(p) for p in chosen)) - - # Always run cleaner by default (as requested). Callers can override. - effective_cleaner_callable = cleaner_callable or DEFAULT_CLEANER_CALLABLE - cleaner = resolve_cleaner_callable(effective_cleaner_callable) - cleaned_works, cleaner_error = await run_cleaner(all_works, cleaner=cleaner, params=cleaner_params) - if cleaner_error: - errors.append(ProviderError(provider="cleaner", message=cleaner_error, retryable=False)) - - return LiteratureResult( - success=len(cleaned_works) > 0 or len(errors) == 0, - results=cleaned_works, - errors=errors, - stats=stats, - meta={ - "providers": chosen, - "requested_providers": requested_providers, - "requested_providers_raw": requested_providers_raw, - "warnings": meta_warnings, - "cleaner_callable": effective_cleaner_callable, - }, - ) diff --git a/service/app/utils/literature/cleaners/__init__.py b/service/app/utils/literature/cleaners/__init__.py deleted file mode 100644 index 5932925d..00000000 --- a/service/app/utils/literature/cleaners/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from app.utils.literature.cleaners.doi import clean_works, normalize_doi - -__all__ = ["clean_works", "normalize_doi"] diff --git a/service/app/utils/literature/cleaners/base.py b/service/app/utils/literature/cleaners/base.py deleted file mode 100644 index ffed0e09..00000000 --- a/service/app/utils/literature/cleaners/base.py +++ /dev/null @@ -1,90 +0,0 @@ -from __future__ import annotations - -import importlib -import inspect -from typing import Any, Callable - -from app.utils.literature.models import WorkRecord - - -CleanerCallable = Callable[..., Any] - - -def resolve_cleaner_callable(callable_path: str) -> CleanerCallable: - """Resolve a dotted-path callable. - - Supported formats: - - "package.module:function" - - "package.module.function" (last segment treated as attribute) - """ - - path = callable_path.strip() - if not path: - raise ValueError("cleaner callable path is empty") - - if ":" in path: - module_name, attr = path.split(":", 1) - else: - module_name, _, attr = path.rpartition(".") - if not module_name: - raise ValueError("cleaner callable must include a module path") - - module = importlib.import_module(module_name) - fn = getattr(module, attr, None) - if fn is None or not callable(fn): - raise ValueError(f"cleaner callable not found or not callable: {callable_path}") - return fn - - -async def run_cleaner( - works: list[WorkRecord], - *, - cleaner: CleanerCallable | None, - params: dict[str, Any] | None = None, -) -> tuple[list[WorkRecord], str | None]: - """Run cleaner over works. - - Cleaner can be sync or async. It may accept either: - - list[WorkRecord] - - list[dict] (model_dump) - - Cleaner may return either: - - list[WorkRecord] - - list[dict] - """ - - if cleaner is None: - return works, None - - kwargs = params or {} - try: - # Prefer passing WorkRecord objects; if cleaner rejects, user can adjust. - if inspect.iscoroutinefunction(cleaner): - out = await cleaner(works, **kwargs) - else: - out = cleaner(works, **kwargs) - - if out is None: - return works, None - - if isinstance(out, list): - if not out: - return [], None - if isinstance(out[0], WorkRecord): - return out, None - if isinstance(out[0], dict): - return [WorkRecord.model_validate(x) for x in out], None - - # Fallback: allow returning a dict like {"results": [...]} - if isinstance(out, dict) and isinstance(out.get("results"), list): - results = out["results"] - if not results: - return [], None - if isinstance(results[0], WorkRecord): - return results, None - if isinstance(results[0], dict): - return [WorkRecord.model_validate(x) for x in results], None - - return works, "Cleaner returned unsupported output type" - except Exception as exc: - return works, f"Cleaner failed: {exc}" diff --git a/service/app/utils/literature/cleaners/doi.py b/service/app/utils/literature/cleaners/doi.py deleted file mode 100644 index cbb9b40e..00000000 --- a/service/app/utils/literature/cleaners/doi.py +++ /dev/null @@ -1,161 +0,0 @@ -from __future__ import annotations - -import re -from typing import Any - -from app.utils.literature.models import WorkRecord - - -_DOI_IN_URL_RE = re.compile(r"doi\.org/(10\.\d{4,9}/\S+)", re.IGNORECASE) -_DOI_RE = re.compile(r"^(10\.\d{4,9}/\S+)$", re.IGNORECASE) - - -def normalize_doi(value: str | None) -> str | None: - """Normalize DOI into canonical lowercase '10.xxxx/...' form. - - Accepts values like: - - '10.1234/ABC' - - 'doi:10.1234/ABC' - - 'https://doi.org/10.1234/ABC' - - 'http://doi.org/10.1234/ABC' - """ - - if not value: - return None - - v = value.strip() - if not v: - return None - - # URL forms - m = _DOI_IN_URL_RE.search(v) - if m: - return m.group(1).strip().lower() - - # doi: prefix - if v.lower().startswith("doi:"): - v = v[4:].strip() - - # raw form - m2 = _DOI_RE.match(v) - if m2: - return m2.group(1).strip().lower() - - # Best-effort: sometimes DOI comes with trailing punctuation. - v2 = v.rstrip(".);,]") - m3 = _DOI_RE.match(v2) - if m3: - return m3.group(1).strip().lower() - - return None - - -def _merge_records(primary: WorkRecord, incoming: WorkRecord) -> WorkRecord: - """Merge two records with the same DOI. - - Policy: keep primary, fill missing fields from incoming, prefer richer author list. - Keep the original `raw` but annotate merged provenance into it (safe, since raw is dict). - """ - - data = primary.model_dump() - - def take(field: str) -> None: - if data.get(field) in (None, "", []): - val = getattr(incoming, field) - if val not in (None, "", []): - data[field] = val - - for f in ("title", "venue", "url", "pdf_url", "source_id", "year"): - take(f) - - for f in ("journal", "work_type"): - take(f) - - # Prefer the larger counts when available. - try: - p_cited = primary.cited_by_count - i_cited = incoming.cited_by_count - if isinstance(p_cited, int) or isinstance(i_cited, int): - data["cited_by_count"] = max([x for x in (p_cited, i_cited) if isinstance(x, int)], default=None) - except Exception: - pass - - try: - p_rc = primary.referenced_works_count - i_rc = incoming.referenced_works_count - if isinstance(p_rc, int) or isinstance(i_rc, int): - data["referenced_works_count"] = max([x for x in (p_rc, i_rc) if isinstance(x, int)], default=None) - except Exception: - pass - - # Merge referenced work IDs conservatively (cap to avoid huge payloads). - try: - refs: list[str] = [] - for src in (primary.referenced_works or [], incoming.referenced_works or []): - for r in src: - if isinstance(r, str) and r and r not in refs: - refs.append(r) - if len(refs) >= 200: - break - if len(refs) >= 200: - break - if refs: - data["referenced_works"] = refs - except Exception: - pass - - # Prefer longer authors list. - if len(incoming.authors) > len(primary.authors): - data["authors"] = [a.model_dump() for a in incoming.authors] - - # Preserve raw and add merge info. - raw: dict[str, Any] = {} - if isinstance(primary.raw, dict): - raw.update(primary.raw) - merged_from = raw.get("_merged_from") - if not isinstance(merged_from, list): - merged_from = [] - merged_from.append({"source": incoming.source, "source_id": incoming.source_id}) - raw["_merged_from"] = merged_from - data["raw"] = raw - - return WorkRecord.model_validate(data) - - -def clean_works( - works: list[WorkRecord], - *, - drop_without_doi: bool = False, -) -> list[WorkRecord]: - """Normalize DOI, deduplicate by DOI, and return cleaned records. - - This is intended to be used as the default cleaner. - - Args: - works: WorkRecord list from any providers. - drop_without_doi: If True, drop records that have no parsable DOI. - - Returns: - Deduplicated list. Records with same DOI are merged. - """ - - dedup: dict[str, WorkRecord] = {} - kept_without_doi: list[WorkRecord] = [] - - for w in works: - doi_norm = normalize_doi(w.doi) - if doi_norm is None: - if not drop_without_doi: - kept_without_doi.append(w) - continue - - w2 = w.model_copy(update={"doi": doi_norm}) - if doi_norm not in dedup: - dedup[doi_norm] = w2 - else: - dedup[doi_norm] = _merge_records(dedup[doi_norm], w2) - - # Stable-ish ordering: DOI-sorted + append non-DOI works. - cleaned = [dedup[k] for k in sorted(dedup.keys())] - cleaned.extend(kept_without_doi) - return cleaned diff --git a/service/app/utils/literature/cleaners/subprocess.py b/service/app/utils/literature/cleaners/subprocess.py deleted file mode 100644 index e69de29b..00000000 diff --git a/service/app/utils/literature/exporter.py b/service/app/utils/literature/exporter.py deleted file mode 100644 index 37d41ed5..00000000 --- a/service/app/utils/literature/exporter.py +++ /dev/null @@ -1,336 +0,0 @@ -from __future__ import annotations - -import io -import logging -import mimetypes -from datetime import datetime, timezone -from typing import Any, cast -from uuid import UUID - -from app.core.storage import FileCategory, FileScope, generate_storage_key, get_storage_service -from app.infra.database import AsyncSessionLocal -from app.models.file import FileCreate -from app.repos.file import FileRepository -from app.repos.knowledge_set import KnowledgeSetRepository - - -logger = logging.getLogger(__name__) - - -def sanitize_topic(topic: str | None) -> str: - s = (topic or "literature").strip().lower() - out: list[str] = [] - for ch in s: - if ch.isalnum(): - out.append(ch) - elif ch in {" ", "_", "-", "."}: - out.append("-") - sanitized = "".join(out).strip("-") - return sanitized or "literature" - - -def derive_xlsx_filename(topic: str | None) -> str: - ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") - return f"{sanitize_topic(topic)}-{ts}.xlsx" - - -def _extract_first_author_affiliation(raw: dict[str, Any] | None) -> str: - if not isinstance(raw, dict): - return "" - # OpenAlex style - try: - auths = raw.get("authorships") - if isinstance(auths, list) and auths: - insts = auths[0].get("institutions") - if isinstance(insts, list) and insts: - name = insts[0].get("display_name") or insts[0].get("name") - if isinstance(name, str): - return name - except Exception: - pass - # Crossref style - try: - authors = raw.get("author") - if isinstance(authors, list) and authors: - affs = authors[0].get("affiliation") - if isinstance(affs, list) and affs: - name = affs[0].get("name") - if isinstance(name, str): - return name - except Exception: - pass - # Semantic Scholar style - try: - authors = raw.get("authors") - if isinstance(authors, list) and authors: - affs = authors[0].get("affiliations") - if isinstance(affs, list) and affs: - name = affs[0].get("name") - if isinstance(name, str): - return name - except Exception: - pass - return "" - - -def _extract_reference_fields(raw: dict[str, Any] | None) -> dict[str, str]: - if not isinstance(raw, dict): - return {"journal": "", "publisher": "", "volume": "", "issue": "", "pages": "", "published_date": ""} - journal = "" - publisher = "" - volume = "" - issue = "" - pages = "" - published_date = "" - # OpenAlex - try: - hv = raw.get("host_venue") or {} - if isinstance(hv, dict): - jn = hv.get("display_name") or hv.get("name") - if isinstance(jn, str): - journal = jn - pub = hv.get("publisher") - if isinstance(pub, str): - publisher = pub - biblio = raw.get("biblio") or {} - if isinstance(biblio, dict): - vol = biblio.get("volume") - iss = biblio.get("issue") - fp = biblio.get("first_page") - lp = biblio.get("last_page") - if isinstance(vol, (str, int)): - volume = str(vol) - if isinstance(iss, (str, int)): - issue = str(iss) - fp_s = str(fp) if isinstance(fp, (str, int)) else "" - lp_s = str(lp) if isinstance(lp, (str, int)) else "" - if fp_s or lp_s: - pages = f"{fp_s}-{lp_s}".strip("-") - pd = raw.get("publication_date") or raw.get("published_date") - if isinstance(pd, str): - published_date = pd - except Exception: - pass - # Crossref - try: - cr = raw.get("container-title") - if isinstance(cr, list) and cr: - journal = journal or (cr[0] if isinstance(cr[0], str) else "") - pub = raw.get("publisher") - if isinstance(pub, str): - publisher = publisher or pub - vol = raw.get("volume") - iss = raw.get("issue") - pg = raw.get("page") - if isinstance(vol, (str, int)): - volume = volume or str(vol) - if isinstance(iss, (str, int)): - issue = issue or str(iss) - if isinstance(pg, str): - pages = pages or pg - # date-parts - issued = raw.get("issued") or {} - parts_any = issued.get("date-parts") - if isinstance(parts_any, list) and parts_any and isinstance(parts_any[0], list): - first = cast(list[Any], parts_any[0]) - y = first[0] if len(first) > 0 else None - m = first[1] if len(first) > 1 else None - d = first[2] if len(first) > 2 else None - published_date = published_date or "-".join([str(x) for x in [y, m, d] if x is not None]) - except Exception: - pass - return { - "journal": journal or "", - "publisher": publisher or "", - "volume": volume or "", - "issue": issue or "", - "pages": pages or "", - "published_date": published_date or "", - } - - -def build_xlsx_bytes(payload: dict[str, Any], query: str | None, title_hint: str | None) -> bytes: - """Build an XLSX file (bytes) for literature export. - - We intentionally use XLSX to avoid CSV encoding issues for non-English characters. - """ - - try: - import openpyxl - except ImportError: - raise ImportError("openpyxl is required for XLSX export. Please install 'openpyxl'.") - - wb = openpyxl.Workbook() - ws_summary = wb.active - if ws_summary is None: - ws_summary = wb.create_sheet(title="Summary") - else: - ws_summary.title = "Summary" - - meta_any = payload.get("meta") - meta = cast(dict[str, Any], meta_any) if isinstance(meta_any, dict) else {} - stats_any = payload.get("stats") - stats = cast(dict[str, Any], stats_any) if isinstance(stats_any, dict) else {} - created_at = payload.get("created_at") - results = cast(list[dict[str, Any]], payload.get("results") or []) - - # Summary - ws_summary.append(["Query", query or ""]) - ws_summary.append(["TitleHint", title_hint or ""]) - ws_summary.append( - [ - "Providers", - ",".join(meta.get("providers", meta.get("requested_providers", [])) or []), - ] - ) - ws_summary.append(["Fetched", len(results)]) - ws_summary.append(["CreatedAt", str(created_at or datetime.now(timezone.utc).isoformat())]) - - if stats: - ws_summary.append([]) - ws_summary.append(["Provider", "requests", "fetched", "took_ms"]) - for pname, pstats in stats.items(): - if isinstance(pstats, dict): - ws_summary.append( - [ - str(pname), - pstats.get("requests"), - pstats.get("fetched"), - pstats.get("took_ms"), - ] - ) - - # Works - ws_works = wb.create_sheet(title="Works") - ws_works.append( - [ - "source", - "source_id", - "doi", - "title", - "authors", - "year", - "venue", - "url", - "pdf_url", - "first_author", - "first_author_affiliation", - "journal", - "publisher", - "volume", - "issue", - "pages", - "published_date", - ] - ) - - for w in results: - authors = cast(list[dict[str, Any]], w.get("authors") or []) - names: list[str] = [] - fa_name = "" - for a in authors: - nm = a.get("name") - if isinstance(nm, str) and nm: - names.append(nm) - if names: - fa_name = names[0] - authors_str = "; ".join(names) - raw = cast(dict[str, Any] | None, w.get("raw")) - first_affil = _extract_first_author_affiliation(raw) - ref = _extract_reference_fields(raw) - ws_works.append( - [ - w.get("source") or "", - w.get("source_id") or "", - w.get("doi") or "", - w.get("title") or "", - authors_str, - w.get("year") or "", - w.get("venue") or "", - w.get("url") or "", - w.get("pdf_url") or "", - fa_name, - first_affil, - ref.get("journal") or "", - ref.get("publisher") or "", - ref.get("volume") or "", - ref.get("issue") or "", - ref.get("pages") or "", - ref.get("published_date") or "", - ] - ) - - buffer = io.BytesIO() - wb.save(buffer) - return buffer.getvalue() - - -async def persist_xlsx(user_id: str, knowledge_set_id: UUID, filename: str, content: bytes) -> dict[str, Any]: - try: - filename = filename.strip("/").split("/")[-1] - - async with AsyncSessionLocal() as db: - file_repo = FileRepository(db) - ks_repo = KnowledgeSetRepository(db) - storage = get_storage_service() - - # Validate access and get file IDs (match knowledge.py behavior) - try: - await ks_repo.validate_access(user_id, knowledge_set_id) - file_ids = await ks_repo.get_files_in_knowledge_set(knowledge_set_id) - except ValueError as e: - return {"error": f"Access denied: {e}", "success": False} - - # Determine content type (match knowledge.py fallback style) - content_type, _ = mimetypes.guess_type(filename) - if not content_type: - if filename.endswith(".docx"): - content_type = "application/vnd.openxmlformats-officedocument.wordprocessingml.document" - elif filename.endswith(".xlsx"): - content_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" - elif filename.endswith(".pptx"): - content_type = "application/vnd.openxmlformats-officedocument.presentationml.presentation" - elif filename.endswith(".pdf"): - content_type = "application/pdf" - else: - content_type = "text/plain" - - new_key = generate_storage_key(user_id, filename, FileScope.PRIVATE) - data = io.BytesIO(content) - file_size_bytes = len(content) - await storage.upload_file(data, new_key, content_type=content_type) - - # Check if file exists in knowledge set by name - existing_file = None - for file_id in file_ids: - file = await file_repo.get_file_by_id(file_id) - if file and file.original_filename == filename and not file.is_deleted: - existing_file = file - break - - if existing_file: - existing_file.storage_key = new_key - existing_file.file_size = file_size_bytes - existing_file.content_type = content_type - existing_file.updated_at = datetime.now(timezone.utc) - db.add(existing_file) - await db.commit() - return {"success": True, "message": f"Updated file: {filename}"} - else: - new_file = FileCreate( - user_id=user_id, - folder_id=None, - original_filename=filename, - storage_key=new_key, - file_size=file_size_bytes, - content_type=content_type, - scope=FileScope.PRIVATE, - category=FileCategory.DOCUMENT, - ) - created = await file_repo.create_file(new_file) - await ks_repo.link_file_to_knowledge_set(created.id, knowledge_set_id) - await db.commit() - return {"success": True, "message": f"Created file: {filename}"} - except Exception as e: - logger.exception("persist_xlsx failed") - return {"error": f"Internal error: {e!s}", "success": False} diff --git a/service/app/utils/literature/models.py b/service/app/utils/literature/models.py deleted file mode 100644 index 9e7c27f6..00000000 --- a/service/app/utils/literature/models.py +++ /dev/null @@ -1,118 +0,0 @@ -from __future__ import annotations - -from datetime import datetime -from typing import Any, Literal - -from pydantic import BaseModel, ConfigDict, Field - - -SourceName = Literal[ - "openalex", - "crossref", - "semanticscholar", - "pubmed", - "unknown", -] - - -class LiteratureQuery(BaseModel): - """Provider-agnostic literature query. - - Design goals: - - Keep a stable, generic shape for the MCP tool surface. - - Allow provider-specific parameter passthrough via `provider_params`. - """ - - model_config = ConfigDict(extra="forbid") - - query: str | None = None - doi: str | None = None - title: str | None = None - author: str | None = None - - year_from: int | None = Field(default=None, ge=0) - year_to: int | None = Field(default=None, ge=0) - - limit: int = Field(default=20, ge=1, le=500) - providers: list[str] | None = None - - # Provider-specific passthrough parameters. - # Example: {"openalex": {"filter": "type:journal-article", "select": "id,doi,title"}} - provider_params: dict[str, dict[str, Any]] = Field(default_factory=dict) - - -class WorkAuthor(BaseModel): - """A minimal author representation.""" - - model_config = ConfigDict(extra="forbid") - - name: str - orcid: str | None = None - source_id: str | None = None - - -class WorkRecord(BaseModel): - """Normalized literature metadata record. - - Providers must map their native payloads into this shape so the aggregator - and cleaner can work consistently. - """ - - model_config = ConfigDict(extra="forbid") - - source: SourceName = "unknown" - source_id: str | None = None - - doi: str | None = None - title: str | None = None - - authors: list[WorkAuthor] = Field(default_factory=list) - year: int | None = None - venue: str | None = None - journal: str | None = None - work_type: str | None = None - cited_by_count: int | None = Field(default=None, ge=0) - referenced_works_count: int | None = Field(default=None, ge=0) - # Potentially large; providers should only populate when explicitly requested. - referenced_works: list[str] | None = None - - url: str | None = None - pdf_url: str | None = None - - # Keep the original provider response to support debugging and future - # feature expansion. - raw: dict[str, Any] | None = None - - -class ProviderError(BaseModel): - model_config = ConfigDict(extra="allow") - - provider: str - message: str - status_code: int | None = None - error_code: str | None = None - retryable: bool | None = None - - -class ProviderStats(BaseModel): - model_config = ConfigDict(extra="forbid") - - provider: str - requests: int = 0 - fetched: int = 0 - took_ms: int | None = None - - -class LiteratureResult(BaseModel): - """Standard tool response envelope.""" - - model_config = ConfigDict(extra="allow") - - success: bool = True - results: list[WorkRecord] = Field(default_factory=list) - errors: list[ProviderError] = Field(default_factory=list) - stats: dict[str, ProviderStats] = Field(default_factory=dict) - - # Free-form metadata for debugging/traceability. - meta: dict[str, Any] = Field(default_factory=dict) - created_at: datetime = Field(default_factory=datetime.utcnow) diff --git a/service/app/utils/literature/providers/__init__.py b/service/app/utils/literature/providers/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/service/app/utils/literature/providers/base.py b/service/app/utils/literature/providers/base.py deleted file mode 100644 index 9ae83e15..00000000 --- a/service/app/utils/literature/providers/base.py +++ /dev/null @@ -1,24 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any, Protocol - -from app.utils.literature.models import LiteratureQuery, WorkRecord - - -@dataclass(frozen=True, slots=True) -class ProviderResponse: - works: list[WorkRecord] - raw: dict[str, Any] | None = None - - -class LiteratureProvider(Protocol): - """A literature metadata source. - - Providers should be pure data fetchers: map source payloads into WorkRecord. - Cleaning (DOI normalization, dedup) is handled elsewhere. - """ - - name: str - - async def search_works(self, query: LiteratureQuery) -> ProviderResponse: ... diff --git a/service/app/utils/literature/providers/openalex.py b/service/app/utils/literature/providers/openalex.py deleted file mode 100644 index 231d6a54..00000000 --- a/service/app/utils/literature/providers/openalex.py +++ /dev/null @@ -1,405 +0,0 @@ -from __future__ import annotations - -import re -import time -from typing import Any, cast - -import httpx - -from app.utils.literature.models import LiteratureQuery, WorkAuthor, WorkRecord -from app.utils.literature.providers.base import ProviderResponse - - -def _short_openalex_id(value: str | None) -> str | None: - if not value or not isinstance(value, str): - return None - s = value.strip() - if not s: - return None - if s.startswith("http://") or s.startswith("https://"): - return s.rstrip("/").split("/")[-1] - return s - - -async def search_sources( - name: str, - *, - base_url: str = "https://api.openalex.org", - timeout_s: float = 20.0, - mailto: str | None = None, - per_page: int = 10, -) -> list[dict[str, Any]]: - """Lookup OpenAlex Sources by name. - - Returns a compact list of candidate sources with IDs that can be used in works filters, - e.g. primary_location.source.id:Sxxxx. - """ - - q = (name or "").strip() - if not q: - return [] - - params: dict[str, Any] = { - "search": q, - "per-page": max(1, min(int(per_page), 50)), - "select": "id,display_name,issn_l,issn,host_organization,type,works_count,cited_by_count", - } - if mailto: - params["mailto"] = mailto - - url = base_url.rstrip("/") + "/sources" - async with httpx.AsyncClient(timeout=timeout_s) as client: - resp = await client.get(url, params=params) - resp.raise_for_status() - data = resp.json() - - out: list[dict[str, Any]] = [] - for item in data.get("results") or []: - if not isinstance(item, dict): - continue - sid = _short_openalex_id(item.get("id") if isinstance(item.get("id"), str) else None) - if not sid: - continue - out.append( - { - "id": sid, - "display_name": item.get("display_name"), - "type": item.get("type"), - "issn_l": item.get("issn_l"), - "issn": item.get("issn"), - "host_organization": _short_openalex_id( - item.get("host_organization") if isinstance(item.get("host_organization"), str) else None - ), - "works_count": item.get("works_count"), - "cited_by_count": item.get("cited_by_count"), - } - ) - return out - - -async def search_authors( - name: str, - *, - base_url: str = "https://api.openalex.org", - timeout_s: float = 20.0, - mailto: str | None = None, - per_page: int = 10, -) -> list[dict[str, Any]]: - """Lookup OpenAlex Authors by name. - - Returns a compact list of candidate authors with IDs that can be used in works filters, - e.g. authorships.author.id:Axxxx. - """ - - q = (name or "").strip() - if not q: - return [] - - params: dict[str, Any] = { - "search": q, - "per-page": max(1, min(int(per_page), 50)), - "select": "id,display_name,orcid,works_count,cited_by_count,last_known_institution", - } - if mailto: - params["mailto"] = mailto - - url = base_url.rstrip("/") + "/authors" - async with httpx.AsyncClient(timeout=timeout_s) as client: - resp = await client.get(url, params=params) - resp.raise_for_status() - data = resp.json() - - out: list[dict[str, Any]] = [] - for item in data.get("results") or []: - if not isinstance(item, dict): - continue - aid = _short_openalex_id(item.get("id") if isinstance(item.get("id"), str) else None) - if not aid: - continue - inst = item.get("last_known_institution") - inst_id = None - inst_name = None - if isinstance(inst, dict): - inst_id = _short_openalex_id(inst.get("id") if isinstance(inst.get("id"), str) else None) - inst_name = inst.get("display_name") if isinstance(inst.get("display_name"), str) else None - out.append( - { - "id": aid, - "display_name": item.get("display_name"), - "orcid": item.get("orcid"), - "works_count": item.get("works_count"), - "cited_by_count": item.get("cited_by_count"), - "last_known_institution": {"id": inst_id, "display_name": inst_name} if inst_id or inst_name else None, - } - ) - return out - - -class OpenAlexProvider: - name = "openalex" - - def __init__( - self, - *, - base_url: str = "https://api.openalex.org", - timeout_s: float = 20.0, - mailto: str | None = None, - user_agent: str | None = None, - ) -> None: - self._base_url = base_url.rstrip("/") - self._timeout_s = timeout_s - self._mailto = mailto - self._user_agent = user_agent - - async def search_works(self, query: LiteratureQuery) -> ProviderResponse: - provider_params: dict[str, Any] = (query.provider_params.get("openalex") or {}).copy() - - include_referenced_works = bool(provider_params.pop("include_referenced_works", False)) - max_referenced_works = provider_params.pop("max_referenced_works", None) - max_refs_i: int | None = None - if isinstance(max_referenced_works, int): - max_refs_i = max_referenced_works - elif isinstance(max_referenced_works, str) and max_referenced_works.strip().isdigit(): - try: - max_refs_i = int(max_referenced_works.strip()) - except Exception: - max_refs_i = None - if max_refs_i is not None: - max_refs_i = max(0, min(max_refs_i, 200)) - per_page = min(max(int(provider_params.pop("per-page", query.limit)), 1), 200) - - params: dict[str, Any] = { - "per-page": per_page, - # Default select keeps payload small while still mapping into WorkRecord. - "select": provider_params.pop( - "select", - "id,doi,title,display_name,publication_year,authorships,primary_location,best_oa_location,type,cited_by_count,referenced_works_count", - ), - } - - if include_referenced_works: - # Potentially large; only include when explicitly requested. - params["select"] = str(params.get("select") or "") + ",referenced_works" - - mailto = provider_params.pop("mailto", self._mailto) - if mailto: - params["mailto"] = mailto - - # Build a conservative query: prefer exact DOI filter; otherwise use search. - filter_parts: list[str] = [] - if query.doi: - filter_parts.append(f"doi:{_normalize_doi_for_openalex_filter(query.doi)}") - - if query.year_from is not None or query.year_to is not None: - year_from = query.year_from - year_to = query.year_to - if year_from is not None and year_to is not None: - filter_parts.append(f"publication_year:{year_from}-{year_to}") - elif year_from is not None: - filter_parts.append(f"publication_year:>={year_from}") - elif year_to is not None: - filter_parts.append(f"publication_year:<={year_to}") - - # If the user provided an explicit OpenAlex filter, respect it. - if "filter" in provider_params: - params["filter"] = provider_params.pop("filter") - elif filter_parts: - params["filter"] = ",".join(filter_parts) - - if "search" in provider_params: - params["search"] = provider_params.pop("search") - else: - derived_search = _build_openalex_search(query) - if derived_search: - params["search"] = derived_search - - # Let caller override/extend everything else (sort, cursor, sample, seed, api_key, etc.). - params.update(provider_params) - - headers: dict[str, str] = {} - if self._user_agent: - headers["User-Agent"] = self._user_agent - - url = f"{self._base_url}/works" - started = time.perf_counter() - async with httpx.AsyncClient(timeout=self._timeout_s, headers=headers) as client: - try: - resp = await client.get(url, params=params) - resp.raise_for_status() - except httpx.HTTPStatusError as exc: - summary = _summarize_http_error(exc) - # Re-raise with a richer message so callers (and the LLM) see why OpenAlex rejected the request. - raise httpx.HTTPStatusError(summary, request=exc.request, response=exc.response) from exc - except httpx.RequestError as exc: - # Surface network/runtime errors with provider context. - raise httpx.RequestError(f"OpenAlex request error: {exc}", request=exc.request) from exc - - data = resp.json() - _ = (time.perf_counter() - started) * 1000 - - results: list[WorkRecord] = [] - for item in data.get("results", []) or []: - if isinstance(item, dict): - results.append(_map_work(cast(dict[str, Any], item), max_referenced_works=max_refs_i)) - - raw = cast(dict[str, Any], data) if isinstance(data, dict) else None - return ProviderResponse(works=results, raw=raw) - - -_DOI_RE = re.compile(r"^10\.\d{4,9}/\S+$", re.IGNORECASE) - - -def _normalize_doi_for_openalex_filter(doi: str) -> str: - """OpenAlex 'doi' filter expects the DOI URL form. - - Docs examples: filter=doi:https://doi.org/10.xxxx/yyy - """ - - doi = doi.strip() - if not doi: - return doi - if doi.lower().startswith("https://doi.org/"): - return doi - if doi.lower().startswith("http://doi.org/"): - return "https://doi.org/" + doi[len("http://doi.org/") :] - if doi.lower().startswith("doi:"): - doi = doi[4:].strip() - if _DOI_RE.match(doi): - return "https://doi.org/" + doi - # Fallback: pass through; cleaner will handle later. - return doi - - -def _build_openalex_search(query: LiteratureQuery) -> str | None: - parts: list[str] = [] - if query.query: - parts.append(query.query) - # Title and author names are not guaranteed to be searchable as related entities, - # but using fulltext search is a reasonable best-effort fallback. - if query.title: - parts.append(query.title) - if query.author: - parts.append(query.author) - s = " ".join(p.strip() for p in parts if p and p.strip()) - return s or None - - -def _map_work(item: dict[str, Any], *, max_referenced_works: int | None = None) -> WorkRecord: - authors: list[WorkAuthor] = [] - for authorship in item.get("authorships", []) or []: - if not isinstance(authorship, dict): - continue - author_obj_unknown = authorship.get("author") - if not isinstance(author_obj_unknown, dict): - continue - author_obj = cast(dict[str, Any], author_obj_unknown) - name = author_obj.get("display_name") - if not isinstance(name, str) or not name: - continue - authors.append( - WorkAuthor( - name=name, - orcid=cast(str, author_obj.get("orcid")) if isinstance(author_obj.get("orcid"), str) else None, - source_id=cast(str, author_obj.get("id")) if isinstance(author_obj.get("id"), str) else None, - ) - ) - - year = item.get("publication_year") - if not isinstance(year, int): - year = None - - primary_location = item.get("primary_location") if isinstance(item.get("primary_location"), dict) else None - best_oa_location = item.get("best_oa_location") if isinstance(item.get("best_oa_location"), dict) else None - - venue: str | None = None - if primary_location and isinstance(primary_location.get("source"), dict): - venue_val = primary_location["source"].get("display_name") - if isinstance(venue_val, str) and venue_val: - venue = venue_val - - journal: str | None = None - # OpenAlex now prefers primary_location/locations instead of host_venue. - for loc in (primary_location, best_oa_location): - if loc and isinstance(loc.get("source"), dict): - jv = loc["source"].get("display_name") or loc["source"].get("name") - if isinstance(jv, str) and jv: - journal = jv - break - journal = journal or venue - - url: str | None = None - for loc in (primary_location, best_oa_location): - if loc and isinstance(loc.get("landing_page_url"), str) and loc.get("landing_page_url"): - url = loc["landing_page_url"] - break - - pdf_url: str | None = None - for loc in (best_oa_location, primary_location): - if loc and isinstance(loc.get("pdf_url"), str) and loc.get("pdf_url"): - pdf_url = loc["pdf_url"] - break - - title = item.get("title") if isinstance(item.get("title"), str) else None - if not title: - title = item.get("display_name") if isinstance(item.get("display_name"), str) else None - - work_type = item.get("type") if isinstance(item.get("type"), str) else None - - cited_by_count = item.get("cited_by_count") - if not isinstance(cited_by_count, int): - cited_by_count = None - - referenced_works_count = item.get("referenced_works_count") - if not isinstance(referenced_works_count, int): - referenced_works_count = None - - referenced_works: list[str] | None = None - refs_any = item.get("referenced_works") - if isinstance(refs_any, list): - refs: list[str] = [r for r in refs_any if isinstance(r, str) and r] - if max_referenced_works is not None: - refs = refs[:max_referenced_works] - referenced_works = refs - - return WorkRecord( - source="openalex", - source_id=item.get("id") if isinstance(item.get("id"), str) else None, - doi=item.get("doi") if isinstance(item.get("doi"), str) else None, - title=title, - authors=authors, - year=year, - venue=venue, - journal=journal, - work_type=work_type, - cited_by_count=cited_by_count, - referenced_works_count=referenced_works_count, - referenced_works=referenced_works, - url=url, - pdf_url=pdf_url, - raw=item, - ) - - -def _summarize_http_error(exc: httpx.HTTPStatusError) -> str: - status = exc.response.status_code - reason = exc.response.reason_phrase - url = str(exc.request.url) - - detail: str | None = None - try: - payload = exc.response.json() - if isinstance(payload, dict): - for key in ("error", "detail", "message", "description"): - val = payload.get(key) - if isinstance(val, str) and val.strip(): - detail = val.strip() - break - except Exception: - pass - - if not detail: - text = (exc.response.text or "").strip().replace("\n", " ") - if text: - detail = text[:400] - - return f"OpenAlex HTTP {status} {reason}: {detail or 'No error detail'} (url={url})"