diff --git a/service/app/agents/mcp_tools.py b/service/app/agents/mcp_tools.py index 6d053086..8c9f4b05 100644 --- a/service/app/agents/mcp_tools.py +++ b/service/app/agents/mcp_tools.py @@ -164,20 +164,60 @@ async def execute_tool_call( f"Checking tool '{tool_name}' for injection. Schema props: {list(input_schema.get('properties', {}).keys())}" ) - if "knowledge_set_id" in input_schema.get("properties", {}): - logger.info(f"Tool '{tool_name}' requires knowledge_set_id.") - if agent: - ks_id = getattr(agent, "knowledge_set_id", None) - logger.info(f"Agent found. Knowledge Set ID: {ks_id}") - if ks_id: - args_dict["knowledge_set_id"] = str(ks_id) - logger.info(f"Injected knowledge_set_id: {ks_id} into args") - else: - logger.warning(f"Agent {agent.id} has NO knowledge_set_id bound!") + schema_props = input_schema.get("properties", {}) + + # Inject knowledge_set_id when tool schema supports it. + if "knowledge_set_id" in schema_props: + logger.info(f"Tool '{tool_name}' supports knowledge_set_id injection.") + effective_ks_id = None + # Prefer session-level override if available. + if session_id: + try: + from app.repos.session import SessionRepository + + session_repo = SessionRepository(db) + session_obj = await session_repo.get_session_by_id(session_id) + if session_obj and getattr(session_obj, "knowledge_set_id", None): + effective_ks_id = session_obj.knowledge_set_id + logger.info(f"Using session knowledge_set_id override: {effective_ks_id}") + except Exception as e: + logger.warning(f"Failed to load session knowledge_set_id override: {e}") + + if effective_ks_id is None and agent: + effective_ks_id = getattr(agent, "knowledge_set_id", None) + logger.info(f"Agent Knowledge Set ID: {effective_ks_id}") + + if effective_ks_id: + args_dict["knowledge_set_id"] = str(effective_ks_id) + logger.info(f"Injected knowledge_set_id: {effective_ks_id} into args") else: - logger.warning("No agent context available for injection!") - else: - logger.info(f"Tool '{tool_name}' does NOT require knowledge_set_id.") + logger.warning("No knowledge_set_id available for injection") + + # Inject user_id when tool schema supports it. + if "user_id" in schema_props: + logger.info(f"Tool '{tool_name}' supports user_id injection.") + effective_user_id = None + if session_id: + try: + from app.repos.session import SessionRepository + + session_repo = SessionRepository(db) + session_obj = await session_repo.get_session_by_id(session_id) + if session_obj and getattr(session_obj, "user_id", None): + effective_user_id = session_obj.user_id + logger.info(f"Using session user_id: {effective_user_id}") + except Exception as e: + logger.warning(f"Failed to load session user_id: {e}") + + if effective_user_id is None and agent: + effective_user_id = getattr(agent, "user_id", None) + logger.info(f"Agent user_id: {effective_user_id}") + + if effective_user_id: + args_dict["user_id"] = str(effective_user_id) + logger.info("Injected user_id into args") + else: + logger.warning("No user_id available for injection") try: result = await call_mcp_tool(refreshed_server, tool_name, args_dict) diff --git a/service/app/mcp/literature.py b/service/app/mcp/literature.py new file mode 100644 index 00000000..df854cfd --- /dev/null +++ b/service/app/mcp/literature.py @@ -0,0 +1,885 @@ +"""MCP Server for Literature / Bibliography fetching. + +This module is intentionally minimal at first: it provides tool entry points and +will be progressively wired to providers (OpenAlex first) under +`app.utils.literature`. +""" + +from __future__ import annotations + +import json +import logging +from collections.abc import Iterable, Mapping +from typing import Any, cast +from uuid import UUID + + +from fastmcp import FastMCP +from fastmcp.server.auth import JWTVerifier, TokenVerifier +from fastmcp.server.dependencies import get_access_token + +from app.utils.literature.aggregator import fetch_works as agg_fetch_works +from app.utils.literature.models import LiteratureQuery +from app.middleware.auth import AuthProvider +from app.middleware.auth.token_verifier.bohr_app_token_verifier import BohrAppTokenVerifier +from app.utils.literature.exporter import ( + derive_xlsx_filename, + build_xlsx_bytes, + persist_xlsx, +) +from app.utils.literature.providers.openalex import ( + search_sources as openalex_search_sources, + search_authors as openalex_search_authors, +) + + +def _coerce_int(value: Any) -> int | None: + if value is None: + return None + if isinstance(value, bool): + return None + if isinstance(value, int): + return value + if isinstance(value, float) and value.is_integer(): + return int(value) + if isinstance(value, str): + s = value.strip() + if s.isdigit() or (s.startswith("-") and s[1:].isdigit()): + try: + return int(s) + except Exception: + return None + return None + + +def _coerce_str_list(value: Any) -> list[str] | None: + if value is None: + return None + + def normalize_one(s: str) -> list[str]: + ss = s.strip() + if not ss: + return [] + # Unwrap JSON stringified list: '["openalex"]' + if ss.startswith("[") and ss.endswith("]"): + try: + obj = json.loads(ss) + except Exception: + obj = None + if isinstance(obj, list): + out2: list[str] = [] + for item in obj: + if isinstance(item, str) and item.strip(): + out2.append(item.strip()) + return out2 + # Accept both comma-separated and single key. + parts = [p.strip() for p in ss.split(",")] + return [p for p in parts if p] + + if isinstance(value, list): + out: list[str] = [] + for v in value: + if isinstance(v, str): + out.extend(normalize_one(v)) + return out or None + if isinstance(value, str): + out = normalize_one(value) + return out or None + return None + + +def _coerce_dict(value: Any) -> dict[str, Any] | None: + if value is None: + return None + if isinstance(value, dict): + return value + if isinstance(value, str): + s = value.strip() + if not s: + return None + # Allow JSON dict passed as a string. + try: + obj = json.loads(s) + return obj if isinstance(obj, dict) else None + except Exception: + return None + return None + + +def _coerce_bool(value: Any) -> bool | None: + if value is None: + return None + if isinstance(value, bool): + return value + if isinstance(value, int) and value in (0, 1): + return bool(value) + if isinstance(value, str): + s = value.strip().lower() + if s in {"true", "1", "yes", "y", "on"}: + return True + if s in {"false", "0", "no", "n", "off"}: + return False + return None + + +def _build_retry_response( + *, + message: str, + suggested_args: dict[str, Any], + warnings: list[str], + extra_meta: dict[str, Any] | None = None, + errors: list[dict[str, Any]] | None = None, + stats: dict[str, Any] | None = None, +) -> dict[str, Any]: + # Shape matches LiteratureResult-like envelope (extra fields allowed). + meta: dict[str, Any] = { + "needs_retry": True, + "warnings": warnings, + "suggested_call": {"tool": "fetch_works", "args": suggested_args}, + } + if extra_meta: + meta.update(extra_meta) + + # Strong guidance for LLM tool-calling loops. + # - On `needs_retry`, only retry once with the suggested args. + # - On success responses, the tool will explicitly say not to re-call. + call_attempt = meta.get("call_attempt") + max_call_attempts = meta.get("max_call_attempts") + try: + attempt_i = int(call_attempt) if call_attempt is not None else None + except Exception: + attempt_i = None + try: + max_i = int(max_call_attempts) if max_call_attempts is not None else None + except Exception: + max_i = None + max_note = f" (max {max_i})" if max_i else "" + summary_note = ( + "本次请求未成功完成/需要重试:请只按 meta.suggested_call 进行下一次调用(不要反复尝试不同参数)。" + "如果仍失败,可继续重试直到成功,但请限制次数;只有用户明确提出再次检索/换条件检索时才进行新的查询。" + + (f" 当前尝试次数: {attempt_i}{max_note}." if attempt_i else "") + + " (EN: Retry needed. Re-call ONLY once using meta.suggested_call; avoid repeated/variant calls. If still failing, retry up to the limit; only start a new search if the user explicitly requests it.)" + ) + + return { + "success": False, + "results": [], + "errors": errors + if errors is not None + else [ + { + "provider": "mcp", + "message": message, + "error_code": "invalid_params", + "retryable": True, + } + ], + "stats": stats or {}, + "meta": meta, + "summary": {"note": summary_note}, + } + + +def _extract_knowledge_set_id(extra: dict[str, Any] | None, claims: dict[str, Any]) -> UUID | None: + candidates: list[Any] = [] + if extra: + candidates.extend( + [ + extra.get("knowledge_set_id"), + extra.get("knowledgeSetId"), + extra.get("knowledge_setId"), + ] + ) + candidates.extend( + [ + claims.get("knowledge_set_id"), + claims.get("knowledgeSetId"), + claims.get("knowledge_setId"), + ] + ) + for c in candidates: + if not c: + continue + try: + return UUID(str(c)) + except Exception: + continue + return None + + +def _coerce_uuid(value: Any) -> UUID | None: + if value is None: + return None + try: + return UUID(str(value)) + except Exception: + return None + + +def _coerce_user_id(value: Any) -> str | None: + if value is None: + return None + if isinstance(value, str): + s = value.strip() + return s or None + # Some callers may pass UUID-like values. + try: + s2 = str(value).strip() + return s2 or None + except Exception: + return None + + +logger = logging.getLogger(__name__) + + +# NOTE: OpenAlex politely requests a contact email via the `mailto` query param. +# This default must NOT be exposed to the LLM/tool schema, so it is applied only +# when the caller does not provide one. +_DEFAULT_OPENALEX_MAILTO = "mengjunxing@shu.edu.cn" + + +__mcp_metadata__ = { + "source": "official", + "description": "Fetch and clean literature metadata from multiple sources (OpenAlex first)", + "banner": None, +} + + +literature_mcp: FastMCP = FastMCP(name="Literature 📚", version="0.1.0") + + +def _build_preview_results( + results: Iterable[Mapping[str, Any]] | None, + *, + max_items: int = 5, + strip_fields: set[str] | None = None, +) -> list[dict[str, Any]]: + """Build a small, display-friendly preview list for LLM responses. + + We intentionally strip large/debug fields (e.g. `raw`, `referenced_works`) to + avoid context explosion in multi-call tool loops. + """ + if max_items <= 0: + return [] + if not results: + return [] + strip_fields = strip_fields or {"raw", "referenced_works"} + preview: list[dict[str, Any]] = [] + for idx, w in enumerate(results): + if idx >= max_items: + break + item = {k: v for k, v in w.items() if k not in strip_fields} + preview.append(item) + return preview + + +# --- Authentication Configuration (matches other MCP servers in this directory) --- +auth: TokenVerifier + +match AuthProvider.get_provider_name(): + case "bohrium": + auth = JWTVerifier(public_key=AuthProvider.public_key) + case "casdoor": + auth = JWTVerifier(jwks_uri=AuthProvider.jwks_uri) + case "bohr_app": + auth = BohrAppTokenVerifier( + api_url=AuthProvider.issuer, + x_app_key="xyzen-uuid1760783737", + ) + case _: + raise ValueError(f"Unsupported authentication provider: {AuthProvider.get_provider_name()}") + + +@literature_mcp.tool +async def fetch_works( + query: str | None = None, + doi: str | None = None, + title: str | None = None, + author: str | None = None, + author_id: Any = None, + year_from: Any = None, + year_to: Any = None, + limit: Any = 20, + sort_by_cited_by_count: Any = False, + journal: Any = None, + journal_source_id: Any = None, + knowledge_set_id: Any = None, + user_id: Any = None, + mailto: str | None = None, + providers: Any = None, + provider_params: Any = None, + cleaner_params: Any = None, + call_attempt: Any = 1, + max_call_attempts: Any = 3, +) -> dict[str, Any]: + """Fetch literature works metadata (papers/books/etc.) and return cleaned results. + + This tool is designed for LLM use. It supports strict/precise calls and also a + "normalize-then-retry" workflow when inputs are not in the expected format. + + **Pipeline** + - Build a unified `LiteratureQuery` + - Query one or more providers (OpenAlex first) + - Clean results before returning (DOI normalization + de-duplication by default) + + **Core parameters** + - `query`: Free-text search. + - `doi`: DOI to match. Accepts forms like `10.xxxx/...`, `doi:10.xxxx/...`, or a DOI URL. + - `title` / `author`: Optional structured hints. + - `year_from` / `year_to`: Optional year range filter. + - `limit`: Max works to fetch (1..500). Used for request size and optional post-truncation. + + **Sorting** + - `sort_by_cited_by_count`: If true, sorts by citations (descending) and truncates results to `limit`. + Also sets OpenAlex `sort=cited_by_count:desc` unless you already set a provider sort. + + **Exact journal targeting (recommended for “Nature/Science/IEEE TMI” requests)** + OpenAlex cannot reliably filter by journal *name* directly. Use the two-step ID pattern: + - Preferred: `journal_source_id`: OpenAlex Source IDs like `S2764455111`. + The tool will filter works using `primary_location.source.id:Sxxxx` (OR supported via `|`). + - Convenience: `journal`: One or more journal names. If provided *without* `journal_source_id`, + the tool performs a lookup against `/sources` and returns a **retryable** response that includes + candidate Source IDs in `meta.journal_candidates`, plus a suggested second call. + + **Exact author targeting (recommended for “papers by ” requests)** + OpenAlex cannot reliably filter by author *name* directly. Use the two-step ID pattern: + - Preferred: `author_id`: OpenAlex Author IDs like `A1234567890`. + The tool will filter works using `authorships.author.id:Axxxx` (OR supported via `|`). + - Convenience: `author`: If provided *without* `author_id`, the tool performs a lookup + against `/authors` and returns candidate Author IDs in `meta.author_candidates`, plus + a suggested second call. + + **Providers** + - `providers`: Provider keys to use. + - Standard format: lower-case provider key strings, e.g. `providers=["openalex"]`. + - Current supported values: `openalex`. + - If you pass an unsupported key, the tool will fail with an error that includes the supported provider list. + - If omitted, defaults to `openalex`. + + **OpenAlex polite pool** + - `mailto`: Optional contact email used for OpenAlex polite requests. If omitted, a + server-side default is applied (not exposed via tool defaults). + + **Knowledge Base persistence (XLSX)** + - `knowledge_set_id`: Optional UUID of the Knowledge Set to save the full results XLSX into. + Recommended: pass this explicitly (the Xyzen agent runtime can inject it). + If omitted, the server will try to infer it from token claims, which may be unavailable. + - `user_id`: Optional user id for access control. Recommended to pass explicitly (agent runtime can inject it). + If omitted, the server will try to infer it from the request token, which may not represent the end user. + + **provider_params** + Provider-specific raw params. Two supported shapes: + 1) Direct OpenAlex params (auto-wrapped): + `{ "filter": "publication_year:2024", "sort": "cited_by_count:desc" }` + 2) Explicit provider map (keys must match `providers`): + `{ "openalex": { "filter": "...", "cursor": "*" } }` + + OpenAlex extras supported via `provider_params.openalex`: + - `filter`, `search`, `sort`, `cursor`, `sample`, `seed`, `per-page`, `select`, etc. + - `include_referenced_works`: boolean. If true, includes `referenced_works` IDs in each record (can be large). + - `max_referenced_works`: int 0..200. Truncates `referenced_works` list to control payload size. + + **cleaner_params** + Cleaner configuration. If `cleaner_params["callable"]` is set, it should be a dotted-path callable + like `package.module:function` and will override the default cleaner. Any other keys are forwarded + as kwargs. + + **Normalize-then-retry behavior** + If any of these require coercion/clamping/dropping (e.g. `limit` is a string, years invalid, + `provider_params` is JSON string), the tool will NOT execute the search. Instead it returns: + - `meta.needs_retry = true` + - `meta.warnings = [...]` + - `meta.suggested_call = { tool: "fetch_works", args: { ...normalized... } }` + + **Return shape** + A JSON dict compatible with `LiteratureResult`, typically including: + - `success`: bool + - `results`: list of WorkRecord objects (includes `doi`, `title`, `authors`, `year`, `journal`, + `work_type`, `cited_by_count`, `referenced_works_count`, plus optional `referenced_works` and `raw`) + - `errors`: list of provider/cleaner errors (if any) + - `stats` / `meta`: provider timing/counts and execution metadata + """ + # --- Input coercion/validation gate --- + # Policy: if we need to coerce/clamp/drop anything, we DO NOT execute search. + # Instead, we return a retryable response that tells the LLM exactly how to re-call. + warnings: list[str] = [] + + call_attempt_i = _coerce_int(call_attempt) or 1 + max_call_attempts_i = _coerce_int(max_call_attempts) or 3 + if call_attempt_i < 1: + call_attempt_i = 1 + if max_call_attempts_i < 1: + max_call_attempts_i = 1 + if max_call_attempts_i > 10: + max_call_attempts_i = 10 + if call_attempt_i > max_call_attempts_i: + call_attempt_i = max_call_attempts_i + + limit_i = _coerce_int(limit) + if limit_i is None: + warnings.append("limit is not an int; using default 20") + limit_i = 20 + if limit_i < 1 or limit_i > 500: + clamped = min(max(limit_i, 1), 500) + warnings.append(f"limit out of range (1..500); clamped to {clamped}") + limit_i = clamped + + year_from_i = _coerce_int(year_from) + year_to_i = _coerce_int(year_to) + if year_from is not None and year_from_i is None: + warnings.append("year_from is not an int; dropping") + if year_to is not None and year_to_i is None: + warnings.append("year_to is not an int; dropping") + if year_from_i is not None and year_from_i < 0: + warnings.append("year_from < 0; dropping") + year_from_i = None + if year_to_i is not None and year_to_i < 0: + warnings.append("year_to < 0; dropping") + year_to_i = None + if year_from_i is not None and year_to_i is not None and year_from_i > year_to_i: + warnings.append("year_from > year_to; swapping") + year_from_i, year_to_i = year_to_i, year_from_i + + sort_by_cited_by_count_b = _coerce_bool(sort_by_cited_by_count) + if sort_by_cited_by_count is not None and sort_by_cited_by_count_b is None: + warnings.append("sort_by_cited_by_count is not a bool; dropping") + sort_by_cited_by_count_b = False + + providers_l = _coerce_str_list(providers) + if providers is not None and providers_l is None: + warnings.append("providers is not a list[str]; dropping") + + provider_params_d = _coerce_dict(provider_params) + if provider_params is not None and provider_params_d is None: + warnings.append("provider_params is not a dict (or JSON dict string); dropping") + + cleaner_params_d = _coerce_dict(cleaner_params) + if cleaner_params is not None and cleaner_params_d is None: + warnings.append("cleaner_params is not a dict (or JSON dict string); dropping") + + journal_names = _coerce_str_list(journal) + if journal is not None and journal_names is None: + warnings.append("journal is not a string/list[str]; dropping") + + journal_source_ids = _coerce_str_list(journal_source_id) + if journal_source_id is not None and journal_source_ids is None: + warnings.append("journal_source_id is not a string/list[str]; dropping") + + author_ids = _coerce_str_list(author_id) + if author_id is not None and author_ids is None: + warnings.append("author_id is not a string/list[str]; dropping") + + knowledge_set_uuid = _coerce_uuid(knowledge_set_id) + if knowledge_set_id is not None and knowledge_set_uuid is None: + warnings.append("knowledge_set_id is not a valid UUID; dropping") + + user_id_s = _coerce_user_id(user_id) + if user_id is not None and user_id_s is None: + warnings.append("user_id is not a valid string; dropping") + + # If any coercion/clamp/drop happened, ask LLM to retry with normalized args. + # (This keeps the next call precise and reproducible.) + if warnings: + suggested_args: dict[str, Any] = { + "query": query, + "doi": doi, + "title": title, + "author": author, + "author_id": author_ids, + "year_from": year_from_i, + "year_to": year_to_i, + "limit": limit_i, + "sort_by_cited_by_count": sort_by_cited_by_count_b, + "journal": journal_names, + "journal_source_id": journal_source_ids, + "knowledge_set_id": str(knowledge_set_uuid) if knowledge_set_uuid else None, + "user_id": user_id_s, + "mailto": mailto, + "providers": providers_l, + "provider_params": provider_params_d, + "cleaner_params": cleaner_params_d, + "call_attempt": call_attempt_i, + "max_call_attempts": max_call_attempts_i, + } + # Remove explicit nulls to keep the suggestion compact. + suggested_args = {k: v for k, v in suggested_args.items() if v is not None} + return _build_retry_response( + message="Some parameters were not in the expected format/range. Please re-call fetch_works with the suggested normalized args.", + suggested_args=suggested_args, + warnings=warnings, + extra_meta={"call_attempt": call_attempt_i, "max_call_attempts": max_call_attempts_i}, + ) + + # Normalize provider params: + # - If caller passes OpenAlex raw params directly, wrap into {"openalex": {...}} + # - If caller passes {"openalex": {...}}, keep as-is. + normalized_provider_params: dict[str, dict[str, Any]] = {} + if provider_params_d: + if isinstance(provider_params_d.get("openalex"), dict): + normalized_provider_params = {"openalex": provider_params_d["openalex"]} + else: + normalized_provider_params = {"openalex": provider_params_d} + + # Ensure OpenAlex gets a `mailto` value (caller-provided wins). + openalex_params = normalized_provider_params.setdefault("openalex", {}) + if mailto: + openalex_params["mailto"] = mailto + elif "mailto" not in openalex_params: + openalex_params["mailto"] = _DEFAULT_OPENALEX_MAILTO + + # Journal / Author targeting: + # - Preferred: explicit IDs (journal_source_id, author_id) + # - Convenience: name lookup (two-step) + # - Fuzzy fallback: ONLY when call_attempt == max_call_attempts + mailto_effective: str | None = mailto + if mailto_effective is None: + mv = openalex_params.get("mailto") + mailto_effective = mv if isinstance(mv, str) else None + + precision_meta: dict[str, Any] = {} + precision_warnings: list[str] = [] + is_last_attempt = call_attempt_i >= max_call_attempts_i + + resolved_journal_ids: list[str] = [] + journal_candidates_by_name: dict[str, Any] = {} + if journal_names and not journal_source_ids: + for jn in journal_names: + cands = await openalex_search_sources(jn, mailto=mailto_effective, per_page=10) + journal_candidates_by_name[jn] = cands + exact = [ + c + for c in cands + if isinstance(c.get("display_name"), str) and c["display_name"].strip().lower() == jn.strip().lower() + ] + if len(exact) == 1 and isinstance(exact[0].get("id"), str): + resolved_journal_ids.append(exact[0]["id"]) + elif len(cands) == 1 and isinstance(cands[0].get("id"), str): + resolved_journal_ids.append(cands[0]["id"]) + precision_meta["journal_candidates"] = journal_candidates_by_name + precision_warnings.append("journal name lookup performed; use journal_source_id for exact filtering") + + resolved_author_ids: list[str] = [] + author_candidates_by_name: dict[str, Any] = {} + if author and not author_ids: + cands = await openalex_search_authors(author, mailto=mailto_effective, per_page=10) + author_candidates_by_name[author] = cands + exact = [ + c + for c in cands + if isinstance(c.get("display_name"), str) and c["display_name"].strip().lower() == author.strip().lower() + ] + if len(exact) == 1 and isinstance(exact[0].get("id"), str): + resolved_author_ids.append(exact[0]["id"]) + elif len(cands) == 1 and isinstance(cands[0].get("id"), str): + resolved_author_ids.append(cands[0]["id"]) + precision_meta["author_candidates"] = author_candidates_by_name + precision_warnings.append("author name lookup performed; use author_id for exact filtering") + + # Decide whether we must stop and ask for a precise retry. + journal_ambiguous = bool( + journal_names + and not journal_source_id + and ((not resolved_journal_ids) or (resolved_journal_ids and len(resolved_journal_ids) != len(journal_names))) + ) + author_ambiguous = bool(author and not author_id and not resolved_author_ids) + + # If not last attempt, do not run a fuzzy works search. + # Return candidates and ask caller to retry with explicit IDs. + if (journal_ambiguous or author_ambiguous) and not is_last_attempt: + suggested_args: dict[str, Any] = { + "query": query, + "doi": doi, + "title": title, + "author": author, + "author_id": resolved_author_ids or None, + "year_from": year_from_i, + "year_to": year_to_i, + "limit": limit_i, + "sort_by_cited_by_count": sort_by_cited_by_count_b, + "journal": journal_names, + "journal_source_id": resolved_journal_ids or None, + "knowledge_set_id": str(knowledge_set_uuid) if knowledge_set_uuid else None, + "user_id": user_id_s, + "mailto": mailto, + "providers": providers_l, + "provider_params": provider_params_d, + "cleaner_params": cleaner_params_d, + "call_attempt": min(call_attempt_i + 1, max_call_attempts_i), + "max_call_attempts": max_call_attempts_i, + } + suggested_args = {k: v for k, v in suggested_args.items() if v is not None} + extra_meta: dict[str, Any] = dict(precision_meta) + extra_meta.update({"call_attempt": call_attempt_i, "max_call_attempts": max_call_attempts_i}) + return _build_retry_response( + message=( + "Journal/author names are ambiguous in OpenAlex. Please retry with explicit journal_source_id and/or author_id." + ), + suggested_args=suggested_args, + warnings=precision_warnings or ["name lookup performed; retry with explicit IDs for exact filtering"], + extra_meta=extra_meta, + ) + + # If we resolved IDs confidently from names, apply them as if caller provided them. + if resolved_journal_ids and not journal_source_ids: + journal_source_ids = resolved_journal_ids + if resolved_author_ids and not author_ids: + author_ids = resolved_author_ids + + # If journal is ambiguous (no IDs), we still try one fuzzy works call (best-effort) + # ONLY on the last attempt. + if is_last_attempt and journal_names and not journal_source_ids: + base_parts = [p for p in [query, title, author] if isinstance(p, str) and p.strip()] + base_search = openalex_params.get("search") if isinstance(openalex_params.get("search"), str) else None + if not base_search: + base_search = " ".join(base_parts) + journal_hint = " ".join(journal_names) + combined = (base_search + " " + journal_hint).strip() if base_search else journal_hint + if combined: + openalex_params["search"] = combined + + # Keep precision metadata for UI/LLM even on fuzzy fallback. + if precision_warnings: + precision_meta.setdefault("call_attempt", call_attempt_i) + precision_meta.setdefault("max_call_attempts", max_call_attempts_i) + + if journal_source_ids: + journal_filter = "primary_location.source.id:" + "|".join(journal_source_ids) + if isinstance(openalex_params.get("filter"), str) and openalex_params["filter"].strip(): + openalex_params["filter"] = f"{openalex_params['filter']},{journal_filter}" + else: + openalex_params["filter"] = journal_filter + + if author_ids: + author_filter = "authorships.author.id:" + "|".join(author_ids) + if isinstance(openalex_params.get("filter"), str) and openalex_params["filter"].strip(): + openalex_params["filter"] = f"{openalex_params['filter']},{author_filter}" + else: + openalex_params["filter"] = author_filter + + # Optional: ask provider to sort by citations. + if sort_by_cited_by_count_b: + openalex_params.setdefault("sort", "cited_by_count:desc") + + cleaner_callable: str | None = None + if cleaner_params_d and isinstance(cleaner_params_d.get("callable"), str): + cleaner_callable = cleaner_params_d.get("callable") + + lq = LiteratureQuery( + query=query, + doi=doi, + title=title, + author=author, + year_from=year_from_i, + year_to=year_to_i, + limit=limit_i, + providers=providers_l, + provider_params=normalized_provider_params, + ) + + result = await agg_fetch_works( + lq, + cleaner_callable=cleaner_callable, + cleaner_params=cleaner_params_d, + ) + + # If provider fetch failed (no usable results + errors), allow limited tool-level retries. + # Note: providers already do internal HTTP retries; this is to cap LLM-level repeated calls. + if not result.success: + full_fail_payload: dict[str, Any] = result.model_dump(mode="json") + next_attempt = call_attempt_i + 1 + suggested_args: dict[str, Any] = { + "query": query, + "doi": doi, + "title": title, + "author": author, + "year_from": year_from_i, + "year_to": year_to_i, + "limit": limit_i, + "sort_by_cited_by_count": sort_by_cited_by_count_b, + "journal": journal_names, + "journal_source_id": journal_source_ids, + "knowledge_set_id": str(knowledge_set_uuid) if knowledge_set_uuid else None, + "user_id": user_id_s, + "mailto": mailto, + "providers": providers_l, + "provider_params": provider_params_d, + "cleaner_params": cleaner_params_d, + "call_attempt": min(next_attempt, max_call_attempts_i), + "max_call_attempts": max_call_attempts_i, + } + suggested_args = {k: v for k, v in suggested_args.items() if v is not None} + errors_any = full_fail_payload.get("errors") + stats_any = full_fail_payload.get("stats") + errors_list: list[dict[str, Any]] = ( + [cast(dict[str, Any], e) for e in errors_any] if isinstance(errors_any, list) else [] + ) + stats_dict: dict[str, Any] = cast(dict[str, Any], stats_any) if isinstance(stats_any, dict) else {} + if call_attempt_i >= max_call_attempts_i: + # Hard stop: let the caller decide next action. + return { + "success": False, + "results": [], + "errors": errors_list, + "stats": stats_dict, + "meta": { + "needs_retry": False, + "call_attempt": call_attempt_i, + "max_call_attempts": max_call_attempts_i, + "return_policy": "Retry limit reached; do not automatically re-call.", + }, + "summary": { + "note": ( + "文献源请求失败,且已达到最大重试次数;请不要继续自动调用MCP。" + "只有当用户明确要求再次检索/更换条件,或你需要人工调整参数/等待后再试时,才发起新的调用。" + " (EN: Fetch failed and retry limit reached; do not auto re-call. Only re-call if the user explicitly requests a new search or after changing conditions.)" + ) + }, + } + + return _build_retry_response( + message="Provider fetch failed. Please retry with the same parameters (or wait briefly) using the suggested call.", + suggested_args=suggested_args, + warnings=["provider fetch failed; limited retry is allowed"], + extra_meta={"call_attempt": call_attempt_i, "max_call_attempts": max_call_attempts_i}, + errors=errors_list, + stats=stats_dict, + ) + + # Optional: post-sort and truncate to limit. + if sort_by_cited_by_count_b and result.results: + result.results.sort(key=lambda w: (w.cited_by_count is not None, w.cited_by_count or 0), reverse=True) + if len(result.results) > limit_i: + result.results = result.results[:limit_i] + result.meta = result.meta or {} + result.meta["sorted_by"] = "cited_by_count:desc" + result.meta["truncated_to"] = limit_i + + # --- Always-on persistence to Knowledge Base as XLSX --- + persist_meta: dict[str, Any] = {"saved": False} + + # Build the *full* payload for persistence/export before we truncate anything. + full_payload: dict[str, Any] = result.model_dump(mode="json") + try: + access_token = get_access_token() + if access_token: + user_info = AuthProvider.parse_user_info(access_token.claims) + effective_user_id = user_id_s or user_info.id + ksid = knowledge_set_uuid or _extract_knowledge_set_id(user_info.extra, access_token.claims) + if ksid: + filename = derive_xlsx_filename(topic=query or title) + xlsx_bytes = build_xlsx_bytes(full_payload, query, title) + # Persist locally (storage + DB + link) + write_res = await persist_xlsx(effective_user_id, ksid, filename, xlsx_bytes) + persist_meta = { + "saved": bool(write_res.get("success")), + "filename": filename, + "knowledge_set_id": str(ksid), + "error": write_res.get("error"), + } + else: + persist_meta = { + "saved": False, + "error": "knowledge_set_id missing. Pass knowledge_set_id explicitly (preferred) or include it in token claims.", + } + else: + persist_meta = {"saved": False, "error": "access_token missing"} + except Exception as e: + logger.error(f"Persist literature XLSX failed: {e}") + persist_meta = {"saved": False, "error": str(e)} + + # --- Truncate what we return to the LLM (avoid context explosion) --- + # We keep the standard LiteratureResult envelope fields, but only return a small + # preview list for in-chat display. + preview_limit = 5 + results_any = full_payload.get("results") + results_raw_list: list[Any] = results_any if isinstance(results_any, list) else [] + results_list: list[dict[str, Any]] = [cast(dict[str, Any], w) for w in results_raw_list if isinstance(w, dict)] + preview_results = _build_preview_results(results_list, max_items=preview_limit) + + out = dict(full_payload) + out["results"] = preview_results + out.setdefault("meta", {}) + if precision_warnings: + out["meta"].setdefault("warnings", []) + if isinstance(out["meta"].get("warnings"), list): + out["meta"]["warnings"].extend(precision_warnings) + if precision_meta: + out["meta"].setdefault("precision", {}) + if isinstance(out["meta"].get("precision"), dict): + out["meta"]["precision"].update(precision_meta) + out["meta"]["persistence"] = persist_meta + out["meta"].setdefault("preview", {}) + out["meta"]["preview"].update( + { + "returned_results": len(preview_results), + "total_results": len(results_list), + "limit": limit_i, + "fields_stripped": ["raw", "referenced_works"], + "return_policy": "Only a small preview is returned to the LLM; full cleaned results are persisted to XLSX when possible.", + } + ) + # A short human-readable summary for LLM/UI. + providers_used = None + try: + meta_any = out.get("meta") + if isinstance(meta_any, dict): + providers_used = meta_any.get("providers") or meta_any.get("requested_providers") + except Exception: + providers_used = None + if isinstance(providers_used, list): + providers_list_any = cast(list[Any], providers_used) + providers_list_str = [p for p in providers_list_any if isinstance(p, str) and p] + providers_str = ",".join(providers_list_str) + else: + providers_str = "" + saved_ok = bool(persist_meta.get("saved")) + ksid_s = persist_meta.get("knowledge_set_id") + filename_s = persist_meta.get("filename") + + precision_hint = "" + try: + suggested_call_any = None + meta_any = out.get("meta") + if isinstance(meta_any, dict): + precision_any = meta_any.get("precision") + if isinstance(precision_any, dict): + suggested_call_any = precision_any.get("suggested_call") + if suggested_call_any: + precision_hint = ( + " 若需精确限定期刊/作者,请参考 meta.precision.suggested_call 再调用一次(建议只重试一次)。" + " (EN: For exact journal/author filtering, re-call once using meta.precision.suggested_call.)" + ) + except Exception: + precision_hint = "" + + if saved_ok and filename_s and ksid_s: + note = ( + f"已成功获取并清洗文献数据:对话中仅返回前{preview_limit}条用于展示;完整结果已保存为XLSX({filename_s},knowledge_set_id={ksid_s})。" + "请不要为了“确认”或“补全列表”而重复调用本MCP;只有当用户明确提出需要再次检索/换条件检索文献时,才再次调用。 " + + precision_hint + + f"(EN: Data fetched and cleaned. Only first {preview_limit} items are returned for chat display; full results saved to XLSX. Do NOT re-call this MCP unless the user explicitly requests a new/modified literature search.)" + ) + else: + note = ( + f"已成功获取并清洗文献数据:对话中仅返回前{preview_limit}条用于展示,以避免上下文爆炸。" + "请不要为了“确认”或“补全列表”而重复调用本MCP;只有当用户明确提出需要再次检索/换条件检索文献时,才再次调用。 " + + precision_hint + + f"(EN: Data fetched and cleaned. Only first {preview_limit} items are returned to avoid context explosion. Do NOT re-call unless the user explicitly requests a new/modified literature search.)" + ) + + out["summary"] = { + "query": query, + "doi": doi, + "title": title, + "author": author, + "year_from": year_from_i, + "year_to": year_to_i, + "providers": providers_str, + "total_results": len(results_list), + "returned_results": len(preview_results), + "xlsx_saved": saved_ok, + "xlsx_filename": filename_s, + "note": note, + } + return out diff --git a/service/app/utils/literature/__init__.py b/service/app/utils/literature/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/service/app/utils/literature/aggregator.py b/service/app/utils/literature/aggregator.py new file mode 100644 index 00000000..12bdf3a7 --- /dev/null +++ b/service/app/utils/literature/aggregator.py @@ -0,0 +1,192 @@ +from __future__ import annotations + +import asyncio +import json +import logging +import random +import time +from typing import Any + +import httpx + +from app.utils.literature.cleaners.base import resolve_cleaner_callable, run_cleaner +from app.utils.literature.models import LiteratureQuery, LiteratureResult, ProviderError, ProviderStats, WorkRecord +from app.utils.literature.providers.openalex import OpenAlexProvider + +logger = logging.getLogger(__name__) + + +DEFAULT_CLEANER_CALLABLE = "app.utils.literature.cleaners.doi:clean_works" + + +def _normalize_provider_keys(value: Any) -> list[str]: + """Normalize provider keys coming from LLM/tool calls. + + We accept: + - ["openalex"] + - ["[\"openalex\"]"] (JSON stringified list in a list) + - "openalex" / "OpenAlex" + - "openalex,crossref" (comma-separated) + """ + out: list[str] = [] + seen: set[str] = set() + + def add_one(s: str) -> None: + ss = s.strip() + if not ss: + return + + # If LLM passed a JSON list string (e.g. '["openalex"]'), unwrap it. + if (ss.startswith("[") and ss.endswith("]")) or (ss.startswith("{") and ss.endswith("}")): + try: + obj = json.loads(ss) + except Exception: + obj = None + if isinstance(obj, list): + for item in obj: + if isinstance(item, str): + add_one(item) + return + + # Accept comma-separated strings. + if "," in ss: + for part in ss.split(","): + add_one(part) + return + + key = ss.lower() + if key not in seen: + seen.add(key) + out.append(key) + + if value is None: + return [] + if isinstance(value, str): + add_one(value) + return out + if isinstance(value, (list, tuple, set)): + for v in value: + if isinstance(v, str): + add_one(v) + return out + return [] + + +def _default_providers() -> dict[str, Any]: + # NOTE: Provider instances are lightweight; we can recreate per request. + return { + "openalex": OpenAlexProvider(), + } + + +def _is_retryable_http_error(exc: Exception) -> tuple[bool, int | None]: + if isinstance(exc, httpx.HTTPStatusError): + status = exc.response.status_code + return status in {408, 429, 500, 502, 503, 504}, status + if isinstance(exc, httpx.RequestError): + return True, None + return False, None + + +async def fetch_works( + query: LiteratureQuery, + *, + max_concurrency: int = 5, + retries: int = 3, + base_backoff_s: float = 0.5, + cleaner_callable: str | None = None, + cleaner_params: dict[str, Any] | None = None, +) -> LiteratureResult: + """Fetch works from one or more providers and run cleaner before returning. + + - Concurrency is bounded across providers. + - Provider failures degrade gracefully into `errors`. + - If cleaner fails, returns uncleaned works with an error entry. + """ + + provider_registry = _default_providers() + requested_providers_raw = query.providers + requested_providers = _normalize_provider_keys(requested_providers_raw) + if not requested_providers: + requested_providers = ["openalex"] + + chosen = [p for p in requested_providers if p in provider_registry] + meta_warnings: list[str] = [] + if not chosen: + supported = sorted(provider_registry.keys()) + # Degrade gracefully: fall back to the default/supported providers instead + # of hard-failing the whole tool call. + fallback = ["openalex"] if "openalex" in provider_registry else supported + chosen = fallback + meta_warnings.append("No supported providers matched the request; falling back to: " + ", ".join(fallback)) + meta_warnings.append("Supported providers: " + ", ".join(supported)) + + semaphore = asyncio.Semaphore(max(1, max_concurrency)) + errors: list[ProviderError] = [] + stats: dict[str, ProviderStats] = {} + all_works: list[WorkRecord] = [] + + async def run_one(provider_name: str) -> None: + provider = provider_registry[provider_name] + async with semaphore: + attempt = 0 + started = time.perf_counter() + while True: + try: + resp = await provider.search_works(query) + all_works.extend(resp.works) + took_ms = int((time.perf_counter() - started) * 1000) + stats[provider_name] = ProviderStats( + provider=provider_name, + requests=attempt + 1, + fetched=len(resp.works), + took_ms=took_ms, + ) + return + except Exception as exc: + retryable, status_code = _is_retryable_http_error(exc) + attempt += 1 + if attempt > retries or not retryable: + took_ms = int((time.perf_counter() - started) * 1000) + stats[provider_name] = ProviderStats( + provider=provider_name, + requests=attempt, + fetched=0, + took_ms=took_ms, + ) + errors.append( + ProviderError( + provider=provider_name, + message=str(exc), + status_code=status_code, + retryable=retryable, + ) + ) + return + + delay = base_backoff_s * (2 ** (attempt - 1)) + delay = delay + random.uniform(0, delay * 0.25) + await asyncio.sleep(delay) + + await asyncio.gather(*(run_one(p) for p in chosen)) + + # Always run cleaner by default (as requested). Callers can override. + effective_cleaner_callable = cleaner_callable or DEFAULT_CLEANER_CALLABLE + cleaner = resolve_cleaner_callable(effective_cleaner_callable) + cleaned_works, cleaner_error = await run_cleaner(all_works, cleaner=cleaner, params=cleaner_params) + if cleaner_error: + errors.append(ProviderError(provider="cleaner", message=cleaner_error, retryable=False)) + + return LiteratureResult( + success=len(cleaned_works) > 0 or len(errors) == 0, + results=cleaned_works, + errors=errors, + stats=stats, + meta={ + "providers": chosen, + "requested_providers": requested_providers, + "requested_providers_raw": requested_providers_raw, + "warnings": meta_warnings, + "cleaner_callable": effective_cleaner_callable, + }, + ) diff --git a/service/app/utils/literature/cleaners/__init__.py b/service/app/utils/literature/cleaners/__init__.py new file mode 100644 index 00000000..5932925d --- /dev/null +++ b/service/app/utils/literature/cleaners/__init__.py @@ -0,0 +1,3 @@ +from app.utils.literature.cleaners.doi import clean_works, normalize_doi + +__all__ = ["clean_works", "normalize_doi"] diff --git a/service/app/utils/literature/cleaners/base.py b/service/app/utils/literature/cleaners/base.py new file mode 100644 index 00000000..ffed0e09 --- /dev/null +++ b/service/app/utils/literature/cleaners/base.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +import importlib +import inspect +from typing import Any, Callable + +from app.utils.literature.models import WorkRecord + + +CleanerCallable = Callable[..., Any] + + +def resolve_cleaner_callable(callable_path: str) -> CleanerCallable: + """Resolve a dotted-path callable. + + Supported formats: + - "package.module:function" + - "package.module.function" (last segment treated as attribute) + """ + + path = callable_path.strip() + if not path: + raise ValueError("cleaner callable path is empty") + + if ":" in path: + module_name, attr = path.split(":", 1) + else: + module_name, _, attr = path.rpartition(".") + if not module_name: + raise ValueError("cleaner callable must include a module path") + + module = importlib.import_module(module_name) + fn = getattr(module, attr, None) + if fn is None or not callable(fn): + raise ValueError(f"cleaner callable not found or not callable: {callable_path}") + return fn + + +async def run_cleaner( + works: list[WorkRecord], + *, + cleaner: CleanerCallable | None, + params: dict[str, Any] | None = None, +) -> tuple[list[WorkRecord], str | None]: + """Run cleaner over works. + + Cleaner can be sync or async. It may accept either: + - list[WorkRecord] + - list[dict] (model_dump) + + Cleaner may return either: + - list[WorkRecord] + - list[dict] + """ + + if cleaner is None: + return works, None + + kwargs = params or {} + try: + # Prefer passing WorkRecord objects; if cleaner rejects, user can adjust. + if inspect.iscoroutinefunction(cleaner): + out = await cleaner(works, **kwargs) + else: + out = cleaner(works, **kwargs) + + if out is None: + return works, None + + if isinstance(out, list): + if not out: + return [], None + if isinstance(out[0], WorkRecord): + return out, None + if isinstance(out[0], dict): + return [WorkRecord.model_validate(x) for x in out], None + + # Fallback: allow returning a dict like {"results": [...]} + if isinstance(out, dict) and isinstance(out.get("results"), list): + results = out["results"] + if not results: + return [], None + if isinstance(results[0], WorkRecord): + return results, None + if isinstance(results[0], dict): + return [WorkRecord.model_validate(x) for x in results], None + + return works, "Cleaner returned unsupported output type" + except Exception as exc: + return works, f"Cleaner failed: {exc}" diff --git a/service/app/utils/literature/cleaners/doi.py b/service/app/utils/literature/cleaners/doi.py new file mode 100644 index 00000000..cbb9b40e --- /dev/null +++ b/service/app/utils/literature/cleaners/doi.py @@ -0,0 +1,161 @@ +from __future__ import annotations + +import re +from typing import Any + +from app.utils.literature.models import WorkRecord + + +_DOI_IN_URL_RE = re.compile(r"doi\.org/(10\.\d{4,9}/\S+)", re.IGNORECASE) +_DOI_RE = re.compile(r"^(10\.\d{4,9}/\S+)$", re.IGNORECASE) + + +def normalize_doi(value: str | None) -> str | None: + """Normalize DOI into canonical lowercase '10.xxxx/...' form. + + Accepts values like: + - '10.1234/ABC' + - 'doi:10.1234/ABC' + - 'https://doi.org/10.1234/ABC' + - 'http://doi.org/10.1234/ABC' + """ + + if not value: + return None + + v = value.strip() + if not v: + return None + + # URL forms + m = _DOI_IN_URL_RE.search(v) + if m: + return m.group(1).strip().lower() + + # doi: prefix + if v.lower().startswith("doi:"): + v = v[4:].strip() + + # raw form + m2 = _DOI_RE.match(v) + if m2: + return m2.group(1).strip().lower() + + # Best-effort: sometimes DOI comes with trailing punctuation. + v2 = v.rstrip(".);,]") + m3 = _DOI_RE.match(v2) + if m3: + return m3.group(1).strip().lower() + + return None + + +def _merge_records(primary: WorkRecord, incoming: WorkRecord) -> WorkRecord: + """Merge two records with the same DOI. + + Policy: keep primary, fill missing fields from incoming, prefer richer author list. + Keep the original `raw` but annotate merged provenance into it (safe, since raw is dict). + """ + + data = primary.model_dump() + + def take(field: str) -> None: + if data.get(field) in (None, "", []): + val = getattr(incoming, field) + if val not in (None, "", []): + data[field] = val + + for f in ("title", "venue", "url", "pdf_url", "source_id", "year"): + take(f) + + for f in ("journal", "work_type"): + take(f) + + # Prefer the larger counts when available. + try: + p_cited = primary.cited_by_count + i_cited = incoming.cited_by_count + if isinstance(p_cited, int) or isinstance(i_cited, int): + data["cited_by_count"] = max([x for x in (p_cited, i_cited) if isinstance(x, int)], default=None) + except Exception: + pass + + try: + p_rc = primary.referenced_works_count + i_rc = incoming.referenced_works_count + if isinstance(p_rc, int) or isinstance(i_rc, int): + data["referenced_works_count"] = max([x for x in (p_rc, i_rc) if isinstance(x, int)], default=None) + except Exception: + pass + + # Merge referenced work IDs conservatively (cap to avoid huge payloads). + try: + refs: list[str] = [] + for src in (primary.referenced_works or [], incoming.referenced_works or []): + for r in src: + if isinstance(r, str) and r and r not in refs: + refs.append(r) + if len(refs) >= 200: + break + if len(refs) >= 200: + break + if refs: + data["referenced_works"] = refs + except Exception: + pass + + # Prefer longer authors list. + if len(incoming.authors) > len(primary.authors): + data["authors"] = [a.model_dump() for a in incoming.authors] + + # Preserve raw and add merge info. + raw: dict[str, Any] = {} + if isinstance(primary.raw, dict): + raw.update(primary.raw) + merged_from = raw.get("_merged_from") + if not isinstance(merged_from, list): + merged_from = [] + merged_from.append({"source": incoming.source, "source_id": incoming.source_id}) + raw["_merged_from"] = merged_from + data["raw"] = raw + + return WorkRecord.model_validate(data) + + +def clean_works( + works: list[WorkRecord], + *, + drop_without_doi: bool = False, +) -> list[WorkRecord]: + """Normalize DOI, deduplicate by DOI, and return cleaned records. + + This is intended to be used as the default cleaner. + + Args: + works: WorkRecord list from any providers. + drop_without_doi: If True, drop records that have no parsable DOI. + + Returns: + Deduplicated list. Records with same DOI are merged. + """ + + dedup: dict[str, WorkRecord] = {} + kept_without_doi: list[WorkRecord] = [] + + for w in works: + doi_norm = normalize_doi(w.doi) + if doi_norm is None: + if not drop_without_doi: + kept_without_doi.append(w) + continue + + w2 = w.model_copy(update={"doi": doi_norm}) + if doi_norm not in dedup: + dedup[doi_norm] = w2 + else: + dedup[doi_norm] = _merge_records(dedup[doi_norm], w2) + + # Stable-ish ordering: DOI-sorted + append non-DOI works. + cleaned = [dedup[k] for k in sorted(dedup.keys())] + cleaned.extend(kept_without_doi) + return cleaned diff --git a/service/app/utils/literature/cleaners/subprocess.py b/service/app/utils/literature/cleaners/subprocess.py new file mode 100644 index 00000000..e69de29b diff --git a/service/app/utils/literature/exporter.py b/service/app/utils/literature/exporter.py new file mode 100644 index 00000000..37d41ed5 --- /dev/null +++ b/service/app/utils/literature/exporter.py @@ -0,0 +1,336 @@ +from __future__ import annotations + +import io +import logging +import mimetypes +from datetime import datetime, timezone +from typing import Any, cast +from uuid import UUID + +from app.core.storage import FileCategory, FileScope, generate_storage_key, get_storage_service +from app.infra.database import AsyncSessionLocal +from app.models.file import FileCreate +from app.repos.file import FileRepository +from app.repos.knowledge_set import KnowledgeSetRepository + + +logger = logging.getLogger(__name__) + + +def sanitize_topic(topic: str | None) -> str: + s = (topic or "literature").strip().lower() + out: list[str] = [] + for ch in s: + if ch.isalnum(): + out.append(ch) + elif ch in {" ", "_", "-", "."}: + out.append("-") + sanitized = "".join(out).strip("-") + return sanitized or "literature" + + +def derive_xlsx_filename(topic: str | None) -> str: + ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") + return f"{sanitize_topic(topic)}-{ts}.xlsx" + + +def _extract_first_author_affiliation(raw: dict[str, Any] | None) -> str: + if not isinstance(raw, dict): + return "" + # OpenAlex style + try: + auths = raw.get("authorships") + if isinstance(auths, list) and auths: + insts = auths[0].get("institutions") + if isinstance(insts, list) and insts: + name = insts[0].get("display_name") or insts[0].get("name") + if isinstance(name, str): + return name + except Exception: + pass + # Crossref style + try: + authors = raw.get("author") + if isinstance(authors, list) and authors: + affs = authors[0].get("affiliation") + if isinstance(affs, list) and affs: + name = affs[0].get("name") + if isinstance(name, str): + return name + except Exception: + pass + # Semantic Scholar style + try: + authors = raw.get("authors") + if isinstance(authors, list) and authors: + affs = authors[0].get("affiliations") + if isinstance(affs, list) and affs: + name = affs[0].get("name") + if isinstance(name, str): + return name + except Exception: + pass + return "" + + +def _extract_reference_fields(raw: dict[str, Any] | None) -> dict[str, str]: + if not isinstance(raw, dict): + return {"journal": "", "publisher": "", "volume": "", "issue": "", "pages": "", "published_date": ""} + journal = "" + publisher = "" + volume = "" + issue = "" + pages = "" + published_date = "" + # OpenAlex + try: + hv = raw.get("host_venue") or {} + if isinstance(hv, dict): + jn = hv.get("display_name") or hv.get("name") + if isinstance(jn, str): + journal = jn + pub = hv.get("publisher") + if isinstance(pub, str): + publisher = pub + biblio = raw.get("biblio") or {} + if isinstance(biblio, dict): + vol = biblio.get("volume") + iss = biblio.get("issue") + fp = biblio.get("first_page") + lp = biblio.get("last_page") + if isinstance(vol, (str, int)): + volume = str(vol) + if isinstance(iss, (str, int)): + issue = str(iss) + fp_s = str(fp) if isinstance(fp, (str, int)) else "" + lp_s = str(lp) if isinstance(lp, (str, int)) else "" + if fp_s or lp_s: + pages = f"{fp_s}-{lp_s}".strip("-") + pd = raw.get("publication_date") or raw.get("published_date") + if isinstance(pd, str): + published_date = pd + except Exception: + pass + # Crossref + try: + cr = raw.get("container-title") + if isinstance(cr, list) and cr: + journal = journal or (cr[0] if isinstance(cr[0], str) else "") + pub = raw.get("publisher") + if isinstance(pub, str): + publisher = publisher or pub + vol = raw.get("volume") + iss = raw.get("issue") + pg = raw.get("page") + if isinstance(vol, (str, int)): + volume = volume or str(vol) + if isinstance(iss, (str, int)): + issue = issue or str(iss) + if isinstance(pg, str): + pages = pages or pg + # date-parts + issued = raw.get("issued") or {} + parts_any = issued.get("date-parts") + if isinstance(parts_any, list) and parts_any and isinstance(parts_any[0], list): + first = cast(list[Any], parts_any[0]) + y = first[0] if len(first) > 0 else None + m = first[1] if len(first) > 1 else None + d = first[2] if len(first) > 2 else None + published_date = published_date or "-".join([str(x) for x in [y, m, d] if x is not None]) + except Exception: + pass + return { + "journal": journal or "", + "publisher": publisher or "", + "volume": volume or "", + "issue": issue or "", + "pages": pages or "", + "published_date": published_date or "", + } + + +def build_xlsx_bytes(payload: dict[str, Any], query: str | None, title_hint: str | None) -> bytes: + """Build an XLSX file (bytes) for literature export. + + We intentionally use XLSX to avoid CSV encoding issues for non-English characters. + """ + + try: + import openpyxl + except ImportError: + raise ImportError("openpyxl is required for XLSX export. Please install 'openpyxl'.") + + wb = openpyxl.Workbook() + ws_summary = wb.active + if ws_summary is None: + ws_summary = wb.create_sheet(title="Summary") + else: + ws_summary.title = "Summary" + + meta_any = payload.get("meta") + meta = cast(dict[str, Any], meta_any) if isinstance(meta_any, dict) else {} + stats_any = payload.get("stats") + stats = cast(dict[str, Any], stats_any) if isinstance(stats_any, dict) else {} + created_at = payload.get("created_at") + results = cast(list[dict[str, Any]], payload.get("results") or []) + + # Summary + ws_summary.append(["Query", query or ""]) + ws_summary.append(["TitleHint", title_hint or ""]) + ws_summary.append( + [ + "Providers", + ",".join(meta.get("providers", meta.get("requested_providers", [])) or []), + ] + ) + ws_summary.append(["Fetched", len(results)]) + ws_summary.append(["CreatedAt", str(created_at or datetime.now(timezone.utc).isoformat())]) + + if stats: + ws_summary.append([]) + ws_summary.append(["Provider", "requests", "fetched", "took_ms"]) + for pname, pstats in stats.items(): + if isinstance(pstats, dict): + ws_summary.append( + [ + str(pname), + pstats.get("requests"), + pstats.get("fetched"), + pstats.get("took_ms"), + ] + ) + + # Works + ws_works = wb.create_sheet(title="Works") + ws_works.append( + [ + "source", + "source_id", + "doi", + "title", + "authors", + "year", + "venue", + "url", + "pdf_url", + "first_author", + "first_author_affiliation", + "journal", + "publisher", + "volume", + "issue", + "pages", + "published_date", + ] + ) + + for w in results: + authors = cast(list[dict[str, Any]], w.get("authors") or []) + names: list[str] = [] + fa_name = "" + for a in authors: + nm = a.get("name") + if isinstance(nm, str) and nm: + names.append(nm) + if names: + fa_name = names[0] + authors_str = "; ".join(names) + raw = cast(dict[str, Any] | None, w.get("raw")) + first_affil = _extract_first_author_affiliation(raw) + ref = _extract_reference_fields(raw) + ws_works.append( + [ + w.get("source") or "", + w.get("source_id") or "", + w.get("doi") or "", + w.get("title") or "", + authors_str, + w.get("year") or "", + w.get("venue") or "", + w.get("url") or "", + w.get("pdf_url") or "", + fa_name, + first_affil, + ref.get("journal") or "", + ref.get("publisher") or "", + ref.get("volume") or "", + ref.get("issue") or "", + ref.get("pages") or "", + ref.get("published_date") or "", + ] + ) + + buffer = io.BytesIO() + wb.save(buffer) + return buffer.getvalue() + + +async def persist_xlsx(user_id: str, knowledge_set_id: UUID, filename: str, content: bytes) -> dict[str, Any]: + try: + filename = filename.strip("/").split("/")[-1] + + async with AsyncSessionLocal() as db: + file_repo = FileRepository(db) + ks_repo = KnowledgeSetRepository(db) + storage = get_storage_service() + + # Validate access and get file IDs (match knowledge.py behavior) + try: + await ks_repo.validate_access(user_id, knowledge_set_id) + file_ids = await ks_repo.get_files_in_knowledge_set(knowledge_set_id) + except ValueError as e: + return {"error": f"Access denied: {e}", "success": False} + + # Determine content type (match knowledge.py fallback style) + content_type, _ = mimetypes.guess_type(filename) + if not content_type: + if filename.endswith(".docx"): + content_type = "application/vnd.openxmlformats-officedocument.wordprocessingml.document" + elif filename.endswith(".xlsx"): + content_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + elif filename.endswith(".pptx"): + content_type = "application/vnd.openxmlformats-officedocument.presentationml.presentation" + elif filename.endswith(".pdf"): + content_type = "application/pdf" + else: + content_type = "text/plain" + + new_key = generate_storage_key(user_id, filename, FileScope.PRIVATE) + data = io.BytesIO(content) + file_size_bytes = len(content) + await storage.upload_file(data, new_key, content_type=content_type) + + # Check if file exists in knowledge set by name + existing_file = None + for file_id in file_ids: + file = await file_repo.get_file_by_id(file_id) + if file and file.original_filename == filename and not file.is_deleted: + existing_file = file + break + + if existing_file: + existing_file.storage_key = new_key + existing_file.file_size = file_size_bytes + existing_file.content_type = content_type + existing_file.updated_at = datetime.now(timezone.utc) + db.add(existing_file) + await db.commit() + return {"success": True, "message": f"Updated file: {filename}"} + else: + new_file = FileCreate( + user_id=user_id, + folder_id=None, + original_filename=filename, + storage_key=new_key, + file_size=file_size_bytes, + content_type=content_type, + scope=FileScope.PRIVATE, + category=FileCategory.DOCUMENT, + ) + created = await file_repo.create_file(new_file) + await ks_repo.link_file_to_knowledge_set(created.id, knowledge_set_id) + await db.commit() + return {"success": True, "message": f"Created file: {filename}"} + except Exception as e: + logger.exception("persist_xlsx failed") + return {"error": f"Internal error: {e!s}", "success": False} diff --git a/service/app/utils/literature/models.py b/service/app/utils/literature/models.py new file mode 100644 index 00000000..9e7c27f6 --- /dev/null +++ b/service/app/utils/literature/models.py @@ -0,0 +1,118 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any, Literal + +from pydantic import BaseModel, ConfigDict, Field + + +SourceName = Literal[ + "openalex", + "crossref", + "semanticscholar", + "pubmed", + "unknown", +] + + +class LiteratureQuery(BaseModel): + """Provider-agnostic literature query. + + Design goals: + - Keep a stable, generic shape for the MCP tool surface. + - Allow provider-specific parameter passthrough via `provider_params`. + """ + + model_config = ConfigDict(extra="forbid") + + query: str | None = None + doi: str | None = None + title: str | None = None + author: str | None = None + + year_from: int | None = Field(default=None, ge=0) + year_to: int | None = Field(default=None, ge=0) + + limit: int = Field(default=20, ge=1, le=500) + providers: list[str] | None = None + + # Provider-specific passthrough parameters. + # Example: {"openalex": {"filter": "type:journal-article", "select": "id,doi,title"}} + provider_params: dict[str, dict[str, Any]] = Field(default_factory=dict) + + +class WorkAuthor(BaseModel): + """A minimal author representation.""" + + model_config = ConfigDict(extra="forbid") + + name: str + orcid: str | None = None + source_id: str | None = None + + +class WorkRecord(BaseModel): + """Normalized literature metadata record. + + Providers must map their native payloads into this shape so the aggregator + and cleaner can work consistently. + """ + + model_config = ConfigDict(extra="forbid") + + source: SourceName = "unknown" + source_id: str | None = None + + doi: str | None = None + title: str | None = None + + authors: list[WorkAuthor] = Field(default_factory=list) + year: int | None = None + venue: str | None = None + journal: str | None = None + work_type: str | None = None + cited_by_count: int | None = Field(default=None, ge=0) + referenced_works_count: int | None = Field(default=None, ge=0) + # Potentially large; providers should only populate when explicitly requested. + referenced_works: list[str] | None = None + + url: str | None = None + pdf_url: str | None = None + + # Keep the original provider response to support debugging and future + # feature expansion. + raw: dict[str, Any] | None = None + + +class ProviderError(BaseModel): + model_config = ConfigDict(extra="allow") + + provider: str + message: str + status_code: int | None = None + error_code: str | None = None + retryable: bool | None = None + + +class ProviderStats(BaseModel): + model_config = ConfigDict(extra="forbid") + + provider: str + requests: int = 0 + fetched: int = 0 + took_ms: int | None = None + + +class LiteratureResult(BaseModel): + """Standard tool response envelope.""" + + model_config = ConfigDict(extra="allow") + + success: bool = True + results: list[WorkRecord] = Field(default_factory=list) + errors: list[ProviderError] = Field(default_factory=list) + stats: dict[str, ProviderStats] = Field(default_factory=dict) + + # Free-form metadata for debugging/traceability. + meta: dict[str, Any] = Field(default_factory=dict) + created_at: datetime = Field(default_factory=datetime.utcnow) diff --git a/service/app/utils/literature/providers/__init__.py b/service/app/utils/literature/providers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/service/app/utils/literature/providers/base.py b/service/app/utils/literature/providers/base.py new file mode 100644 index 00000000..9ae83e15 --- /dev/null +++ b/service/app/utils/literature/providers/base.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Protocol + +from app.utils.literature.models import LiteratureQuery, WorkRecord + + +@dataclass(frozen=True, slots=True) +class ProviderResponse: + works: list[WorkRecord] + raw: dict[str, Any] | None = None + + +class LiteratureProvider(Protocol): + """A literature metadata source. + + Providers should be pure data fetchers: map source payloads into WorkRecord. + Cleaning (DOI normalization, dedup) is handled elsewhere. + """ + + name: str + + async def search_works(self, query: LiteratureQuery) -> ProviderResponse: ... diff --git a/service/app/utils/literature/providers/openalex.py b/service/app/utils/literature/providers/openalex.py new file mode 100644 index 00000000..231d6a54 --- /dev/null +++ b/service/app/utils/literature/providers/openalex.py @@ -0,0 +1,405 @@ +from __future__ import annotations + +import re +import time +from typing import Any, cast + +import httpx + +from app.utils.literature.models import LiteratureQuery, WorkAuthor, WorkRecord +from app.utils.literature.providers.base import ProviderResponse + + +def _short_openalex_id(value: str | None) -> str | None: + if not value or not isinstance(value, str): + return None + s = value.strip() + if not s: + return None + if s.startswith("http://") or s.startswith("https://"): + return s.rstrip("/").split("/")[-1] + return s + + +async def search_sources( + name: str, + *, + base_url: str = "https://api.openalex.org", + timeout_s: float = 20.0, + mailto: str | None = None, + per_page: int = 10, +) -> list[dict[str, Any]]: + """Lookup OpenAlex Sources by name. + + Returns a compact list of candidate sources with IDs that can be used in works filters, + e.g. primary_location.source.id:Sxxxx. + """ + + q = (name or "").strip() + if not q: + return [] + + params: dict[str, Any] = { + "search": q, + "per-page": max(1, min(int(per_page), 50)), + "select": "id,display_name,issn_l,issn,host_organization,type,works_count,cited_by_count", + } + if mailto: + params["mailto"] = mailto + + url = base_url.rstrip("/") + "/sources" + async with httpx.AsyncClient(timeout=timeout_s) as client: + resp = await client.get(url, params=params) + resp.raise_for_status() + data = resp.json() + + out: list[dict[str, Any]] = [] + for item in data.get("results") or []: + if not isinstance(item, dict): + continue + sid = _short_openalex_id(item.get("id") if isinstance(item.get("id"), str) else None) + if not sid: + continue + out.append( + { + "id": sid, + "display_name": item.get("display_name"), + "type": item.get("type"), + "issn_l": item.get("issn_l"), + "issn": item.get("issn"), + "host_organization": _short_openalex_id( + item.get("host_organization") if isinstance(item.get("host_organization"), str) else None + ), + "works_count": item.get("works_count"), + "cited_by_count": item.get("cited_by_count"), + } + ) + return out + + +async def search_authors( + name: str, + *, + base_url: str = "https://api.openalex.org", + timeout_s: float = 20.0, + mailto: str | None = None, + per_page: int = 10, +) -> list[dict[str, Any]]: + """Lookup OpenAlex Authors by name. + + Returns a compact list of candidate authors with IDs that can be used in works filters, + e.g. authorships.author.id:Axxxx. + """ + + q = (name or "").strip() + if not q: + return [] + + params: dict[str, Any] = { + "search": q, + "per-page": max(1, min(int(per_page), 50)), + "select": "id,display_name,orcid,works_count,cited_by_count,last_known_institution", + } + if mailto: + params["mailto"] = mailto + + url = base_url.rstrip("/") + "/authors" + async with httpx.AsyncClient(timeout=timeout_s) as client: + resp = await client.get(url, params=params) + resp.raise_for_status() + data = resp.json() + + out: list[dict[str, Any]] = [] + for item in data.get("results") or []: + if not isinstance(item, dict): + continue + aid = _short_openalex_id(item.get("id") if isinstance(item.get("id"), str) else None) + if not aid: + continue + inst = item.get("last_known_institution") + inst_id = None + inst_name = None + if isinstance(inst, dict): + inst_id = _short_openalex_id(inst.get("id") if isinstance(inst.get("id"), str) else None) + inst_name = inst.get("display_name") if isinstance(inst.get("display_name"), str) else None + out.append( + { + "id": aid, + "display_name": item.get("display_name"), + "orcid": item.get("orcid"), + "works_count": item.get("works_count"), + "cited_by_count": item.get("cited_by_count"), + "last_known_institution": {"id": inst_id, "display_name": inst_name} if inst_id or inst_name else None, + } + ) + return out + + +class OpenAlexProvider: + name = "openalex" + + def __init__( + self, + *, + base_url: str = "https://api.openalex.org", + timeout_s: float = 20.0, + mailto: str | None = None, + user_agent: str | None = None, + ) -> None: + self._base_url = base_url.rstrip("/") + self._timeout_s = timeout_s + self._mailto = mailto + self._user_agent = user_agent + + async def search_works(self, query: LiteratureQuery) -> ProviderResponse: + provider_params: dict[str, Any] = (query.provider_params.get("openalex") or {}).copy() + + include_referenced_works = bool(provider_params.pop("include_referenced_works", False)) + max_referenced_works = provider_params.pop("max_referenced_works", None) + max_refs_i: int | None = None + if isinstance(max_referenced_works, int): + max_refs_i = max_referenced_works + elif isinstance(max_referenced_works, str) and max_referenced_works.strip().isdigit(): + try: + max_refs_i = int(max_referenced_works.strip()) + except Exception: + max_refs_i = None + if max_refs_i is not None: + max_refs_i = max(0, min(max_refs_i, 200)) + per_page = min(max(int(provider_params.pop("per-page", query.limit)), 1), 200) + + params: dict[str, Any] = { + "per-page": per_page, + # Default select keeps payload small while still mapping into WorkRecord. + "select": provider_params.pop( + "select", + "id,doi,title,display_name,publication_year,authorships,primary_location,best_oa_location,type,cited_by_count,referenced_works_count", + ), + } + + if include_referenced_works: + # Potentially large; only include when explicitly requested. + params["select"] = str(params.get("select") or "") + ",referenced_works" + + mailto = provider_params.pop("mailto", self._mailto) + if mailto: + params["mailto"] = mailto + + # Build a conservative query: prefer exact DOI filter; otherwise use search. + filter_parts: list[str] = [] + if query.doi: + filter_parts.append(f"doi:{_normalize_doi_for_openalex_filter(query.doi)}") + + if query.year_from is not None or query.year_to is not None: + year_from = query.year_from + year_to = query.year_to + if year_from is not None and year_to is not None: + filter_parts.append(f"publication_year:{year_from}-{year_to}") + elif year_from is not None: + filter_parts.append(f"publication_year:>={year_from}") + elif year_to is not None: + filter_parts.append(f"publication_year:<={year_to}") + + # If the user provided an explicit OpenAlex filter, respect it. + if "filter" in provider_params: + params["filter"] = provider_params.pop("filter") + elif filter_parts: + params["filter"] = ",".join(filter_parts) + + if "search" in provider_params: + params["search"] = provider_params.pop("search") + else: + derived_search = _build_openalex_search(query) + if derived_search: + params["search"] = derived_search + + # Let caller override/extend everything else (sort, cursor, sample, seed, api_key, etc.). + params.update(provider_params) + + headers: dict[str, str] = {} + if self._user_agent: + headers["User-Agent"] = self._user_agent + + url = f"{self._base_url}/works" + started = time.perf_counter() + async with httpx.AsyncClient(timeout=self._timeout_s, headers=headers) as client: + try: + resp = await client.get(url, params=params) + resp.raise_for_status() + except httpx.HTTPStatusError as exc: + summary = _summarize_http_error(exc) + # Re-raise with a richer message so callers (and the LLM) see why OpenAlex rejected the request. + raise httpx.HTTPStatusError(summary, request=exc.request, response=exc.response) from exc + except httpx.RequestError as exc: + # Surface network/runtime errors with provider context. + raise httpx.RequestError(f"OpenAlex request error: {exc}", request=exc.request) from exc + + data = resp.json() + _ = (time.perf_counter() - started) * 1000 + + results: list[WorkRecord] = [] + for item in data.get("results", []) or []: + if isinstance(item, dict): + results.append(_map_work(cast(dict[str, Any], item), max_referenced_works=max_refs_i)) + + raw = cast(dict[str, Any], data) if isinstance(data, dict) else None + return ProviderResponse(works=results, raw=raw) + + +_DOI_RE = re.compile(r"^10\.\d{4,9}/\S+$", re.IGNORECASE) + + +def _normalize_doi_for_openalex_filter(doi: str) -> str: + """OpenAlex 'doi' filter expects the DOI URL form. + + Docs examples: filter=doi:https://doi.org/10.xxxx/yyy + """ + + doi = doi.strip() + if not doi: + return doi + if doi.lower().startswith("https://doi.org/"): + return doi + if doi.lower().startswith("http://doi.org/"): + return "https://doi.org/" + doi[len("http://doi.org/") :] + if doi.lower().startswith("doi:"): + doi = doi[4:].strip() + if _DOI_RE.match(doi): + return "https://doi.org/" + doi + # Fallback: pass through; cleaner will handle later. + return doi + + +def _build_openalex_search(query: LiteratureQuery) -> str | None: + parts: list[str] = [] + if query.query: + parts.append(query.query) + # Title and author names are not guaranteed to be searchable as related entities, + # but using fulltext search is a reasonable best-effort fallback. + if query.title: + parts.append(query.title) + if query.author: + parts.append(query.author) + s = " ".join(p.strip() for p in parts if p and p.strip()) + return s or None + + +def _map_work(item: dict[str, Any], *, max_referenced_works: int | None = None) -> WorkRecord: + authors: list[WorkAuthor] = [] + for authorship in item.get("authorships", []) or []: + if not isinstance(authorship, dict): + continue + author_obj_unknown = authorship.get("author") + if not isinstance(author_obj_unknown, dict): + continue + author_obj = cast(dict[str, Any], author_obj_unknown) + name = author_obj.get("display_name") + if not isinstance(name, str) or not name: + continue + authors.append( + WorkAuthor( + name=name, + orcid=cast(str, author_obj.get("orcid")) if isinstance(author_obj.get("orcid"), str) else None, + source_id=cast(str, author_obj.get("id")) if isinstance(author_obj.get("id"), str) else None, + ) + ) + + year = item.get("publication_year") + if not isinstance(year, int): + year = None + + primary_location = item.get("primary_location") if isinstance(item.get("primary_location"), dict) else None + best_oa_location = item.get("best_oa_location") if isinstance(item.get("best_oa_location"), dict) else None + + venue: str | None = None + if primary_location and isinstance(primary_location.get("source"), dict): + venue_val = primary_location["source"].get("display_name") + if isinstance(venue_val, str) and venue_val: + venue = venue_val + + journal: str | None = None + # OpenAlex now prefers primary_location/locations instead of host_venue. + for loc in (primary_location, best_oa_location): + if loc and isinstance(loc.get("source"), dict): + jv = loc["source"].get("display_name") or loc["source"].get("name") + if isinstance(jv, str) and jv: + journal = jv + break + journal = journal or venue + + url: str | None = None + for loc in (primary_location, best_oa_location): + if loc and isinstance(loc.get("landing_page_url"), str) and loc.get("landing_page_url"): + url = loc["landing_page_url"] + break + + pdf_url: str | None = None + for loc in (best_oa_location, primary_location): + if loc and isinstance(loc.get("pdf_url"), str) and loc.get("pdf_url"): + pdf_url = loc["pdf_url"] + break + + title = item.get("title") if isinstance(item.get("title"), str) else None + if not title: + title = item.get("display_name") if isinstance(item.get("display_name"), str) else None + + work_type = item.get("type") if isinstance(item.get("type"), str) else None + + cited_by_count = item.get("cited_by_count") + if not isinstance(cited_by_count, int): + cited_by_count = None + + referenced_works_count = item.get("referenced_works_count") + if not isinstance(referenced_works_count, int): + referenced_works_count = None + + referenced_works: list[str] | None = None + refs_any = item.get("referenced_works") + if isinstance(refs_any, list): + refs: list[str] = [r for r in refs_any if isinstance(r, str) and r] + if max_referenced_works is not None: + refs = refs[:max_referenced_works] + referenced_works = refs + + return WorkRecord( + source="openalex", + source_id=item.get("id") if isinstance(item.get("id"), str) else None, + doi=item.get("doi") if isinstance(item.get("doi"), str) else None, + title=title, + authors=authors, + year=year, + venue=venue, + journal=journal, + work_type=work_type, + cited_by_count=cited_by_count, + referenced_works_count=referenced_works_count, + referenced_works=referenced_works, + url=url, + pdf_url=pdf_url, + raw=item, + ) + + +def _summarize_http_error(exc: httpx.HTTPStatusError) -> str: + status = exc.response.status_code + reason = exc.response.reason_phrase + url = str(exc.request.url) + + detail: str | None = None + try: + payload = exc.response.json() + if isinstance(payload, dict): + for key in ("error", "detail", "message", "description"): + val = payload.get(key) + if isinstance(val, str) and val.strip(): + detail = val.strip() + break + except Exception: + pass + + if not detail: + text = (exc.response.text or "").strip().replace("\n", " ") + if text: + detail = text[:400] + + return f"OpenAlex HTTP {status} {reason}: {detail or 'No error detail'} (url={url})"