diff --git a/CHANGES b/CHANGES index c7364538..1433c2ae 100644 --- a/CHANGES +++ b/CHANGES @@ -31,6 +31,73 @@ $ uvx --from 'vcspull' --prerelease allow vcspull +### New features + +#### New command: `vcspull search` (#494) + +Search across all configured repositories using ripgrep-inspired syntax with field-scoped queries, regex patterns, and flexible output formats. + +**Basic usage:** + +Search for a term across all fields: + +```console +$ vcspull search django +``` + +Search by repository name: + +```console +$ vcspull search "name:flask" +``` + +Search by URL: + +```console +$ vcspull search "url:github.com" +``` + +**Key features:** + +- **Field-scoped queries**: Target specific fields with `name:`, `url:`, `path:`, or `vcs:` prefixes +- **Regex patterns**: Full regex support with `-i/--ignore-case`, `-S/--smart-case`, `-F/--fixed-strings`, and `--word-regexp` +- **Boolean logic**: AND (default), OR (`--any`), and inverted matching (`-v/--invert-match`) +- **Context display**: Show matching fields with `--field` filtering +- **Output formats**: Human-readable (default), `--json`, or `--ndjson` for automation +- **Color control**: `--color {auto,always,never}` with `NO_COLOR` support + +**Advanced examples:** + +Case-insensitive search across all fields: + +```console +$ vcspull search -i DJANGO +``` + +Find repos by VCS type: + +```console +$ vcspull search "vcs:git" +``` + +Match any term (OR logic): + +```console +$ vcspull search --any flask django requests +``` + +Invert match (exclude repos): + +```console +$ vcspull search -v "url:gitlab" +``` + +JSON output for scripting: + +```console +$ vcspull search --json "name:lib" +``` + ### Development #### Makefile -> Justfile (#493) diff --git a/README.md b/README.md index 92cc7524..327d8a9d 100644 --- a/README.md +++ b/README.md @@ -136,6 +136,14 @@ $ vcspull list --json | jq '.[].name' `--json` emits a single JSON array, while `--ndjson` streams newline-delimited objects that are easy to consume from shell pipelines. +Search across repositories with an rg-like query syntax: + +```console +$ vcspull search django +$ vcspull search name:django url:github +$ vcspull search --fixed-strings 'git+https://github.com/org/repo.git' +``` + ### Check repository status Get a quick health check for all configured workspaces: diff --git a/docs/api/cli/index.md b/docs/api/cli/index.md index b4bee3d4..8ca2beb7 100644 --- a/docs/api/cli/index.md +++ b/docs/api/cli/index.md @@ -12,6 +12,7 @@ sync add discover list +search status fmt ``` diff --git a/docs/api/cli/search.md b/docs/api/cli/search.md new file mode 100644 index 00000000..e9a6cd9d --- /dev/null +++ b/docs/api/cli/search.md @@ -0,0 +1,3 @@ +# vcspull search - `vcspull.cli.search` + +.. automodule:: vcspull.cli.search diff --git a/docs/cli/index.md b/docs/cli/index.md index 0de86c86..69cf1611 100644 --- a/docs/cli/index.md +++ b/docs/cli/index.md @@ -10,6 +10,7 @@ sync add discover list +search status fmt ``` @@ -36,5 +37,5 @@ completion :nodescription: subparser_name : @replace - See :ref:`cli-sync`, :ref:`cli-add`, :ref:`cli-discover`, :ref:`cli-list`, :ref:`cli-status`, :ref:`cli-fmt` + See :ref:`cli-sync`, :ref:`cli-add`, :ref:`cli-discover`, :ref:`cli-list`, :ref:`cli-search`, :ref:`cli-status`, :ref:`cli-fmt` ``` diff --git a/docs/cli/search.md b/docs/cli/search.md new file mode 100644 index 00000000..6bd61706 --- /dev/null +++ b/docs/cli/search.md @@ -0,0 +1,102 @@ +(cli-search)= + +# vcspull search + +The `vcspull search` command looks up repositories across your vcspull +configuration with an rg-like query syntax. Queries are regex by default, can +scope to specific fields, and can emit structured JSON for automation. + +## Command + +```{eval-rst} +.. argparse:: + :module: vcspull.cli + :func: create_parser + :prog: vcspull + :path: search + :nodescription: +``` + +## Basic usage + +Search all fields (name, path, url, workspace) with regex: + +```console +$ vcspull search django +• django → ~/code/django +``` + +## Field-scoped queries + +Target specific fields with prefixes: + +```console +$ vcspull search name:django url:github +• django → ~/code/django + url: git+https://github.com/django/django.git +``` + +Available field prefixes: +- `name:` +- `path:` +- `url:` +- `workspace:` (alias: `root:` or `ws:`) + +## Literal matches + +Use `-F/--fixed-strings` to match literal text instead of regex: + +```console +$ vcspull search --fixed-strings 'git+https://github.com/org/repo.git' +``` + +## Case handling + +`-i/--ignore-case` forces case-insensitive matching. `-S/--smart-case` matches +case-insensitively unless your query includes uppercase characters. + +```console +$ vcspull search -S Django +``` + +## Boolean matching + +By default all terms must match. Use `--any` to match if *any* term matches: + +```console +$ vcspull search --any django flask +``` + +Invert matches with `-v/--invert-match`: + +```console +$ vcspull search -v --fixed-strings github +``` + +## JSON output + +Emit matches as JSON for automation: + +```console +$ vcspull search --json django +``` + +Output format: + +```json +[ + { + "name": "django", + "url": "git+https://github.com/django/django.git", + "path": "~/code/django", + "workspace_root": "~/code/", + "matched_fields": ["name", "url"] + } +] +``` + +Use NDJSON for streaming: + +```console +$ vcspull search --ndjson django +``` diff --git a/src/vcspull/cli/__init__.py b/src/vcspull/cli/__init__.py index 5a181c20..de66ef79 100644 --- a/src/vcspull/cli/__init__.py +++ b/src/vcspull/cli/__init__.py @@ -19,6 +19,7 @@ from .discover import create_discover_subparser, discover_repos from .fmt import create_fmt_subparser, format_config_file from .list import create_list_subparser, list_repos +from .search import create_search_subparser, search_repos from .status import create_status_subparser, status_repos from .sync import create_sync_subparser, sync @@ -70,6 +71,15 @@ def build_description( "vcspull list --json", ], ), + ( + "search", + [ + "vcspull search django", + "vcspull search name:django url:github", + "vcspull search --fixed-strings 'git+https://github.com/org/repo.git'", + "vcspull search --ignore-case --any django flask", + ], + ), ( "add", [ @@ -133,6 +143,26 @@ def build_description( ), ) +SEARCH_DESCRIPTION = build_description( + """ + Search configured repositories. + + Query terms use regex by default, with optional field prefixes like + name:, path:, url:, or workspace:. + """, + ( + ( + None, + [ + "vcspull search django", + "vcspull search name:django url:github", + "vcspull search --fixed-strings 'git+https://github.com/org/repo.git'", + "vcspull search --ignore-case --any django flask", + ], + ), + ), +) + STATUS_DESCRIPTION = build_description( """ Check status of repositories. @@ -267,6 +297,15 @@ def create_parser( ) create_status_subparser(status_parser) + # Search command + search_parser = subparsers.add_parser( + "search", + help="search configured repositories", + formatter_class=VcspullHelpFormatter, + description=SEARCH_DESCRIPTION, + ) + create_search_subparser(search_parser) + # Add command add_parser = subparsers.add_parser( "add", @@ -300,6 +339,7 @@ def create_parser( sync_parser, list_parser, status_parser, + search_parser, add_parser, discover_parser, fmt_parser, @@ -314,6 +354,7 @@ def cli(_args: list[str] | None = None) -> None: sync_parser, _list_parser, _status_parser, + _search_parser, _add_parser, _discover_parser, _fmt_parser, @@ -367,6 +408,22 @@ def cli(_args: list[str] | None = None) -> None: concurrent=not getattr(args, "no_concurrent", False), max_concurrent=getattr(args, "max_concurrent", None), ) + elif args.subparser_name == "search": + search_repos( + query_terms=args.query_terms, + config_path=pathlib.Path(args.config) if args.config else None, + workspace_root=getattr(args, "workspace_root", None), + output_json=args.output_json, + output_ndjson=args.output_ndjson, + color=args.color, + fields=getattr(args, "fields", None), + ignore_case=getattr(args, "ignore_case", False), + smart_case=getattr(args, "smart_case", False), + fixed_strings=getattr(args, "fixed_strings", False), + word_regexp=getattr(args, "word_regexp", False), + invert_match=getattr(args, "invert_match", False), + match_any=getattr(args, "match_any", False), + ) elif args.subparser_name == "add": handle_add_command(args) elif args.subparser_name == "discover": diff --git a/src/vcspull/cli/_formatter.py b/src/vcspull/cli/_formatter.py index da7ffcc1..2a59733a 100644 --- a/src/vcspull/cli/_formatter.py +++ b/src/vcspull/cli/_formatter.py @@ -15,6 +15,7 @@ "--log-level", "--path", "--color", + "--field", } OPTIONS_FLAG_ONLY = { @@ -32,6 +33,16 @@ "--ndjson", "--tree", "--detailed", + "-i", + "--ignore-case", + "-S", + "--smart-case", + "-F", + "--fixed-strings", + "--word-regexp", + "-v", + "--invert-match", + "--any", } diff --git a/src/vcspull/cli/search.py b/src/vcspull/cli/search.py new file mode 100644 index 00000000..899ce5bd --- /dev/null +++ b/src/vcspull/cli/search.py @@ -0,0 +1,743 @@ +"""Search repositories functionality for vcspull.""" + +from __future__ import annotations + +import argparse +import logging +import pathlib +import re +import typing as t +from dataclasses import dataclass + +from vcspull._internal.private_path import PrivatePath +from vcspull.config import find_config_files, load_configs +from vcspull.types import ConfigDict + +from ._colors import Colors, get_color_mode +from ._output import OutputFormatter, get_output_mode +from ._workspaces import filter_by_workspace + +log = logging.getLogger(__name__) + +FIELD_ALIASES = { + "name": "name", + "path": "path", + "url": "url", + "workspace": "workspace", + "root": "workspace", + "ws": "workspace", +} +DEFAULT_FIELDS = ("name", "path", "url", "workspace") + + +class SearchToken(t.NamedTuple): + """Parsed query token with optional field restrictions.""" + + fields: tuple[str, ...] + pattern: str + + +@dataclass(frozen=True) +class SearchPattern: + """Compiled search pattern tied to repository fields.""" + + fields: tuple[str, ...] + raw: str + regex: re.Pattern[str] + + +def normalize_fields(fields: list[str] | None) -> tuple[str, ...]: + """Normalize and validate search fields. + + Parameters + ---------- + fields : list[str] | None + Raw field list, optionally comma-delimited + + Returns + ------- + tuple[str, ...] + Normalized field names + + Examples + -------- + >>> normalize_fields(["name", "url"]) + ('name', 'url') + >>> normalize_fields(["name,url", "workspace"]) + ('name', 'url', 'workspace') + >>> normalize_fields(None) + ('name', 'path', 'url', 'workspace') + """ + if not fields: + return DEFAULT_FIELDS + + normalized: list[str] = [] + for entry in fields: + if not entry: + continue + for raw in entry.split(","): + raw = raw.strip().lower() + if not raw: + continue + field = FIELD_ALIASES.get(raw) + if field is None: + message = f"Unknown search field: {raw}" + raise ValueError(message) + if field not in normalized: + normalized.append(field) + + return tuple(normalized or DEFAULT_FIELDS) + + +def parse_query_terms( + terms: list[str], + *, + default_fields: tuple[str, ...], +) -> list[SearchToken]: + """Parse raw search terms into field-scoped tokens. + + Parameters + ---------- + terms : list[str] + Raw query terms + default_fields : tuple[str, ...] + Fields to apply when no field prefix is provided + + Returns + ------- + list[SearchToken] + Parsed search tokens + + Examples + -------- + >>> tokens = parse_query_terms( + ... ["name:django", "github"], + ... default_fields=("name", "url"), + ... ) + >>> tokens[0] + SearchToken(fields=('name',), pattern='django') + >>> tokens[1] + SearchToken(fields=('name', 'url'), pattern='github') + """ + tokens: list[SearchToken] = [] + for term in terms: + if term is None: + continue + prefix, sep, rest = term.partition(":") + if sep: + field = FIELD_ALIASES.get(prefix.strip().lower()) + if field is not None: + if not rest: + message = "Search term cannot be empty after field prefix" + raise ValueError(message) + tokens.append(SearchToken(fields=(field,), pattern=rest)) + continue + tokens.append(SearchToken(fields=default_fields, pattern=term)) + + return tokens + + +def compile_search_patterns( + tokens: list[SearchToken], + *, + ignore_case: bool, + smart_case: bool, + fixed_strings: bool, + word_regexp: bool, +) -> list[SearchPattern]: + """Compile search tokens into regex patterns. + + Parameters + ---------- + tokens : list[SearchToken] + Parsed tokens + ignore_case : bool + Force case-insensitive matching + smart_case : bool + Enable smart-case matching + fixed_strings : bool + Treat patterns as literal strings + word_regexp : bool + Match whole words only + + Returns + ------- + list[SearchPattern] + Compiled search patterns + + Examples + -------- + >>> tokens = [SearchToken(fields=("name",), pattern="django")] + >>> patterns = compile_search_patterns( + ... tokens, + ... ignore_case=True, + ... smart_case=False, + ... fixed_strings=False, + ... word_regexp=False, + ... ) + >>> bool(patterns[0].regex.search("Django")) + True + """ + if not tokens: + return [] + + use_ignore_case = ignore_case + if not ignore_case and smart_case: + has_upper = any( + any(char.isupper() for char in token.pattern) for token in tokens + ) + use_ignore_case = not has_upper + + flags = re.IGNORECASE if use_ignore_case else 0 + patterns: list[SearchPattern] = [] + + for token in tokens: + raw = token.pattern + if raw == "": + message = "Search pattern cannot be empty" + raise ValueError(message) + + pattern = re.escape(raw) if fixed_strings else raw + if word_regexp: + pattern = rf"\b(?:{pattern})\b" + + try: + regex = re.compile(pattern, flags) + except re.error as exc: + message = f"Invalid search pattern {raw!r}: {exc}" + raise ValueError(message) from exc + + patterns.append(SearchPattern(fields=token.fields, raw=raw, regex=regex)) + + return patterns + + +def evaluate_match( + fields: dict[str, str], + patterns: list[SearchPattern], + *, + match_any: bool, +) -> tuple[bool, dict[str, list[str]]]: + """Return match status and matched substrings by field. + + Parameters + ---------- + fields : dict[str, str] + Field values to search + patterns : list[SearchPattern] + Compiled search patterns + match_any : bool + Whether to match any token instead of all tokens + + Returns + ------- + tuple[bool, dict[str, list[str]]] + Match status and mapping of matched fields to match strings + + Examples + -------- + >>> fields = { + ... "name": "django", + ... "path": "~/code/django", + ... "url": "git+https://github.com/django/django.git", + ... "workspace": "~/code/", + ... } + >>> tokens = parse_query_terms(["name:django"], default_fields=("name", "url")) + >>> patterns = compile_search_patterns( + ... tokens, + ... ignore_case=False, + ... smart_case=False, + ... fixed_strings=False, + ... word_regexp=False, + ... ) + >>> matched, matches = evaluate_match(fields, patterns, match_any=False) + >>> matched + True + >>> matches["name"] + ['django'] + """ + if not patterns: + return False, {} + + matches: dict[str, list[str]] = {} + token_hits: list[bool] = [] + + for pattern in patterns: + token_matched = False + for field in pattern.fields: + value = fields.get(field, "") + if not value: + continue + if pattern.regex.search(value): + token_matched = True + field_matches = matches.setdefault(field, []) + for match in pattern.regex.finditer(value): + text = match.group(0) + if text and text not in field_matches: + field_matches.append(text) + token_hits.append(token_matched) + if not match_any and not token_matched: + return False, {} + + if match_any: + if any(token_hits): + return True, matches + return False, {} + + return True, matches + + +def highlight_text( + text: str, + patterns: list[re.Pattern[str]], + *, + colors: Colors, + base_color: str | None = None, +) -> str: + """Return text with regex matches highlighted. + + Parameters + ---------- + text : str + Input text + patterns : list[re.Pattern[str]] + Patterns to highlight + colors : Colors + Color manager + base_color : str | None + Base color code to reapply outside highlights + + Returns + ------- + str + Highlighted text + + Examples + -------- + >>> from vcspull.cli._colors import ColorMode + >>> colors = Colors(ColorMode.NEVER) + >>> highlight_text("django", [re.compile("jan")], colors=colors) + 'django' + """ + if not patterns: + if base_color: + return colors.colorize(text, base_color) + return text + + if not colors._enabled: + return text + + unique_patterns: list[str] = [] + flags = 0 + for pattern in patterns: + if pattern.pattern not in unique_patterns: + unique_patterns.append(pattern.pattern) + flags |= pattern.flags + + if not unique_patterns: + if base_color: + return colors.colorize(text, base_color) + return text + + combined = re.compile("|".join(f"(?:{pat})" for pat in unique_patterns), flags) + + if base_color: + + def repl_with_base(match: re.Match[str]) -> str: + return f"{colors.HIGHLIGHT}{match.group(0)}{base_color}" + + return f"{base_color}{combined.sub(repl_with_base, text)}{colors.RESET}" + + def repl_plain(match: re.Match[str]) -> str: + return f"{colors.HIGHLIGHT}{match.group(0)}{colors.RESET}" + + return combined.sub(repl_plain, text) + + +def find_search_matches( + repos: list[ConfigDict], + patterns: list[SearchPattern], + *, + match_any: bool, + invert_match: bool, +) -> list[dict[str, t.Any]]: + """Return search matches for repositories. + + Parameters + ---------- + repos : list[ConfigDict] + Repository configurations to search + patterns : list[SearchPattern] + Compiled search patterns + match_any : bool + Whether any token match is sufficient + invert_match : bool + Whether to return non-matching repositories + + Returns + ------- + list[dict[str, t.Any]] + Search results containing matched fields + + Examples + -------- + >>> repos = [ + ... { + ... "name": "django", + ... "path": "/tmp/django", + ... "url": "git+https://github.com/django/django.git", + ... "workspace_root": "~/code/", + ... }, + ... ] + >>> tokens = parse_query_terms(["django"], default_fields=DEFAULT_FIELDS) + >>> patterns = compile_search_patterns( + ... tokens, + ... ignore_case=False, + ... smart_case=False, + ... fixed_strings=False, + ... word_regexp=False, + ... ) + >>> results = find_search_matches( + ... repos, + ... patterns, + ... match_any=False, + ... invert_match=False, + ... ) + >>> [item["name"] for item in results] + ['django'] + """ + results: list[dict[str, t.Any]] = [] + field_order = DEFAULT_FIELDS + + for repo in repos: + name = str(repo.get("name", "")) + path_value = PrivatePath(pathlib.Path(repo.get("path", ""))) + url = str(repo.get("url", repo.get("pip_url", "")) or "") + workspace_raw = repo.get("workspace_root") + if workspace_raw: + workspace_path = pathlib.Path(str(workspace_raw)).expanduser() + else: + workspace_path = pathlib.Path(repo.get("path", "")) + if workspace_path: + workspace_path = workspace_path.expanduser().parent + workspace = str(PrivatePath(workspace_path)) if workspace_path else "" + + field_values = { + "name": name, + "path": str(path_value), + "url": url, + "workspace": workspace, + } + + matched, matches_by_field = evaluate_match( + field_values, + patterns, + match_any=match_any, + ) + + if invert_match: + matched = not matched + if matched: + matches_by_field = {} + + if not matched: + continue + + matched_fields = [field for field in field_order if field in matches_by_field] + + results.append( + { + "name": name, + "path": str(path_value), + "url": url, + "workspace_root": workspace, + "matched_fields": matched_fields, + "matches": matches_by_field, + }, + ) + + return results + + +def create_search_subparser(parser: argparse.ArgumentParser) -> None: + """Create ``vcspull search`` argument subparser. + + Parameters + ---------- + parser : argparse.ArgumentParser + The parser to configure + + Examples + -------- + >>> import argparse + >>> parser = argparse.ArgumentParser() + >>> create_search_subparser(parser) + >>> parsed = parser.parse_args(["django"]) + >>> parsed.query_terms + ['django'] + """ + parser.add_argument( + "query_terms", + metavar="query", + nargs="+", + help=( + "search query terms (regex by default). Use field prefixes like " + "name:, path:, url:, workspace:." + ), + ) + parser.add_argument( + "-f", + "--file", + dest="config", + metavar="FILE", + help="path to config file (default: ~/.vcspull.yaml or ./.vcspull.yaml)", + ) + parser.add_argument( + "-w", + "--workspace", + "--workspace-root", + dest="workspace_root", + metavar="DIR", + help="filter by workspace root directory", + ) + parser.add_argument( + "--field", + dest="fields", + action="append", + metavar="NAME", + help=( + "limit unscoped queries to specific fields " + "(name, path, url, workspace). Repeatable or comma-separated." + ), + ) + parser.add_argument( + "-i", + "--ignore-case", + action="store_true", + help="case-insensitive matching", + ) + parser.add_argument( + "-S", + "--smart-case", + action="store_true", + help="smart case matching (ignore case unless pattern has capitals)", + ) + parser.add_argument( + "-F", + "--fixed-strings", + action="store_true", + help="treat search terms as literal strings", + ) + parser.add_argument( + "--word-regexp", + action="store_true", + help="match only whole words", + ) + parser.add_argument( + "-v", + "--invert-match", + action="store_true", + help="show non-matching repositories", + ) + parser.add_argument( + "--any", + dest="match_any", + action="store_true", + help="match if any term matches (default: all terms)", + ) + parser.add_argument( + "--json", + action="store_true", + dest="output_json", + help="output as JSON", + ) + parser.add_argument( + "--ndjson", + action="store_true", + dest="output_ndjson", + help="output as NDJSON (one JSON per line)", + ) + parser.add_argument( + "--color", + choices=["auto", "always", "never"], + default="auto", + help="when to use colors (default: auto)", + ) + + +def search_repos( + query_terms: list[str], + config_path: pathlib.Path | None, + workspace_root: str | None, + output_json: bool, + output_ndjson: bool, + color: str, + *, + fields: list[str] | None, + ignore_case: bool, + smart_case: bool, + fixed_strings: bool, + word_regexp: bool, + invert_match: bool, + match_any: bool, + emit_output: bool = True, +) -> list[dict[str, t.Any]]: + """Search configured repositories. + + Parameters + ---------- + query_terms : list[str] + Search query terms + config_path : pathlib.Path | None + Path to config file, or None to auto-discover + workspace_root : str | None + Filter by workspace root + output_json : bool + Output as JSON + output_ndjson : bool + Output as NDJSON + color : str + Color mode (auto, always, never) + fields : list[str] | None + Field list for unscoped queries + ignore_case : bool + Force case-insensitive matching + smart_case : bool + Enable smart-case matching + fixed_strings : bool + Treat terms as literal strings + word_regexp : bool + Match whole words only + invert_match : bool + Return non-matching repositories + match_any : bool + Match if any term matches + emit_output : bool + Whether to emit human/JSON output + + Returns + ------- + list[dict[str, t.Any]] + Search results + + Examples + -------- + >>> from vcspull.config import save_config_yaml + >>> config_file = tmp_path / ".vcspull.yaml" + >>> save_config_yaml( + ... config_file, + ... {"~/code/": {"django": {"repo": "git+https://github.com/django/django.git"}}}, + ... ) + >>> results = search_repos( + ... ["django"], + ... config_path=config_file, + ... workspace_root=None, + ... output_json=False, + ... output_ndjson=False, + ... color="never", + ... fields=None, + ... ignore_case=False, + ... smart_case=False, + ... fixed_strings=False, + ... word_regexp=False, + ... invert_match=False, + ... match_any=False, + ... emit_output=False, + ... ) + >>> [item["name"] for item in results] + ['django'] + """ + if config_path: + configs = load_configs([config_path]) + else: + configs = load_configs(find_config_files(include_home=True)) + + if workspace_root: + configs = filter_by_workspace(configs, workspace_root) + + try: + normalized_fields = normalize_fields(fields) + tokens = parse_query_terms(query_terms, default_fields=normalized_fields) + patterns = compile_search_patterns( + tokens, + ignore_case=ignore_case, + smart_case=smart_case, + fixed_strings=fixed_strings, + word_regexp=word_regexp, + ) + except ValueError: + log.exception("Search query parsing failed") + return [] + + results = find_search_matches( + configs, + patterns, + match_any=match_any, + invert_match=invert_match, + ) + + if not emit_output: + return results + + output_mode = get_output_mode(output_json, output_ndjson) + formatter = OutputFormatter(output_mode) + colors = Colors(get_color_mode(color)) + + if not results: + formatter.emit_text(colors.warning("No repositories found.")) + formatter.finalize() + return results + + patterns_by_field: dict[str, list[re.Pattern[str]]] = { + field: [] for field in DEFAULT_FIELDS + } + for pattern in patterns: + for field in pattern.fields: + patterns_by_field.setdefault(field, []).append(pattern.regex) + + for result in results: + formatter.emit( + { + "name": result["name"], + "url": result["url"], + "path": result["path"], + "workspace_root": result["workspace_root"], + "matched_fields": result["matched_fields"], + }, + ) + + name_display = highlight_text( + result["name"], + patterns_by_field.get("name", []), + colors=colors, + base_color=colors.INFO, + ) + path_display = highlight_text( + result["path"], + patterns_by_field.get("path", []), + colors=colors, + ) + formatter.emit_text( + f"{colors.muted('•')} {name_display} {colors.muted('→')} {path_display}", + ) + + matched_fields = set(result.get("matched_fields", [])) + if "url" in matched_fields: + url_display = highlight_text( + result["url"], + patterns_by_field.get("url", []), + colors=colors, + ) + formatter.emit_text(f" {colors.muted('url:')} {url_display}") + if "workspace" in matched_fields: + workspace_display = highlight_text( + result["workspace_root"], + patterns_by_field.get("workspace", []), + colors=colors, + ) + formatter.emit_text( + f" {colors.muted('workspace:')} {workspace_display}", + ) + + formatter.finalize() + return results diff --git a/tests/cli/test_search.py b/tests/cli/test_search.py new file mode 100644 index 00000000..f9976adb --- /dev/null +++ b/tests/cli/test_search.py @@ -0,0 +1,798 @@ +"""Tests for vcspull search command.""" + +from __future__ import annotations + +import json +import re +import typing as t + +import pytest + +from vcspull.cli._colors import ColorMode, Colors +from vcspull.cli.search import ( + compile_search_patterns, + highlight_text, + normalize_fields, + parse_query_terms, + search_repos, +) +from vcspull.config import save_config_yaml + +if t.TYPE_CHECKING: + import pathlib + + +def create_test_config(config_path: pathlib.Path, repos: dict[str, t.Any]) -> None: + """Create a test config file.""" + save_config_yaml(config_path, repos) + + +class SearchReposFixture(t.NamedTuple): + """Fixture for search repos test cases.""" + + test_id: str + query_terms: list[str] + fields: list[str] | None + ignore_case: bool + smart_case: bool + fixed_strings: bool + word_regexp: bool + invert_match: bool + match_any: bool + workspace_filter: str | None + expected_repo_names: list[str] + + +SEARCH_REPOS_FIXTURES: list[SearchReposFixture] = [ + SearchReposFixture( + test_id="search-basic-regex", + query_terms=["django"], + fields=None, + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=False, + workspace_filter=None, + expected_repo_names=["django"], + ), + SearchReposFixture( + test_id="search-field-url", + query_terms=["url:pallets"], + fields=None, + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=False, + workspace_filter=None, + expected_repo_names=["flask"], + ), + SearchReposFixture( + test_id="search-ignore-case", + query_terms=["name:FLASK"], + fields=None, + ignore_case=True, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=False, + workspace_filter=None, + expected_repo_names=["flask"], + ), + SearchReposFixture( + test_id="search-any-term", + query_terms=["name:django", "url:pallets"], + fields=None, + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=True, + workspace_filter=None, + expected_repo_names=["django", "flask"], + ), + SearchReposFixture( + test_id="search-invert-match", + query_terms=["flask"], + fields=None, + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=True, + match_any=False, + workspace_filter=None, + expected_repo_names=["django", "internal-api"], + ), + SearchReposFixture( + test_id="search-workspace-filter", + query_terms=["internal"], + fields=None, + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=False, + workspace_filter="~/work/", + expected_repo_names=["internal-api"], + ), +] + + +@pytest.mark.parametrize( + list(SearchReposFixture._fields), + SEARCH_REPOS_FIXTURES, + ids=[fixture.test_id for fixture in SEARCH_REPOS_FIXTURES], +) +def test_search_repos( + test_id: str, + query_terms: list[str], + fields: list[str] | None, + ignore_case: bool, + smart_case: bool, + fixed_strings: bool, + word_regexp: bool, + invert_match: bool, + match_any: bool, + workspace_filter: str | None, + expected_repo_names: list[str], + user_path: pathlib.Path, +) -> None: + """Test searching repositories.""" + config_file = user_path / ".vcspull.yaml" + config_data = { + "~/code/": { + "flask": {"repo": "git+https://github.com/pallets/flask.git"}, + "django": {"repo": "git+https://github.com/django/django.git"}, + }, + "~/work/": { + "internal-api": {"repo": "git+ssh://git.example.com/internal-api.git"}, + }, + } + create_test_config(config_file, config_data) + + results = search_repos( + query_terms=query_terms, + config_path=config_file, + workspace_root=workspace_filter, + output_json=False, + output_ndjson=False, + color="never", + fields=fields, + ignore_case=ignore_case, + smart_case=smart_case, + fixed_strings=fixed_strings, + word_regexp=word_regexp, + invert_match=invert_match, + match_any=match_any, + emit_output=False, + ) + + repo_names = {item["name"] for item in results} + assert repo_names == set(expected_repo_names) + + +def test_search_repos_json_output( + user_path: pathlib.Path, + capsys: t.Any, +) -> None: + """Test JSON output for search command.""" + config_file = user_path / ".vcspull.yaml" + config_data = { + "~/code/": { + "django": {"repo": "git+https://github.com/django/django.git"}, + }, + } + create_test_config(config_file, config_data) + + search_repos( + query_terms=["django"], + config_path=config_file, + workspace_root=None, + output_json=True, + output_ndjson=False, + color="never", + fields=None, + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=False, + ) + + captured = capsys.readouterr() + output_data = json.loads(captured.out) + assert isinstance(output_data, list) + assert output_data[0]["name"] == "django" + assert "matched_fields" in output_data[0] + + +def test_search_repos_ndjson_output( + user_path: pathlib.Path, + capsys: t.Any, +) -> None: + """Test NDJSON output for search command.""" + config_file = user_path / ".vcspull.yaml" + config_data = { + "~/code/": { + "django": {"repo": "git+https://github.com/django/django.git"}, + }, + } + create_test_config(config_file, config_data) + + search_repos( + query_terms=["django"], + config_path=config_file, + workspace_root=None, + output_json=False, + output_ndjson=True, + color="never", + fields=None, + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=False, + ) + + captured = capsys.readouterr() + lines = [line for line in captured.out.strip().split("\n") if line] + assert lines, "Expected NDJSON output" + item = json.loads(lines[0]) + assert item["name"] == "django" + assert "matched_fields" in item + + +def test_search_repos_no_matches( + user_path: pathlib.Path, + capsys: t.Any, +) -> None: + """Test search output when no repositories match.""" + config_file = user_path / ".vcspull.yaml" + config_data = { + "~/code/": { + "django": {"repo": "git+https://github.com/django/django.git"}, + }, + } + create_test_config(config_file, config_data) + + search_repos( + query_terms=["nonexistent"], + config_path=config_file, + workspace_root=None, + output_json=False, + output_ndjson=False, + color="never", + fields=None, + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=False, + ) + + captured = capsys.readouterr() + assert "No repositories found" in captured.out + + +# Unit tests for normalize_fields + + +class NormalizeFieldsFixture(t.NamedTuple): + """Fixture for normalize_fields test cases.""" + + test_id: str + fields: list[str] | None + expected: tuple[str, ...] + raises: type[Exception] | None + + +NORMALIZE_FIELDS_FIXTURES: list[NormalizeFieldsFixture] = [ + NormalizeFieldsFixture( + test_id="none-returns-defaults", + fields=None, + expected=("name", "path", "url", "workspace"), + raises=None, + ), + NormalizeFieldsFixture( + test_id="empty-list-returns-defaults", + fields=[], + expected=("name", "path", "url", "workspace"), + raises=None, + ), + NormalizeFieldsFixture( + test_id="comma-separated-fields", + fields=["name,url"], + expected=("name", "url"), + raises=None, + ), + NormalizeFieldsFixture( + test_id="mixed-comma-and-separate", + fields=["name,url", "workspace"], + expected=("name", "url", "workspace"), + raises=None, + ), + NormalizeFieldsFixture( + test_id="alias-root-to-workspace", + fields=["root"], + expected=("workspace",), + raises=None, + ), + NormalizeFieldsFixture( + test_id="alias-ws-to-workspace", + fields=["ws"], + expected=("workspace",), + raises=None, + ), + NormalizeFieldsFixture( + test_id="duplicates-removed", + fields=["name", "name", "url"], + expected=("name", "url"), + raises=None, + ), + NormalizeFieldsFixture( + test_id="empty-string-entry-skipped", + fields=["", "name"], + expected=("name",), + raises=None, + ), + NormalizeFieldsFixture( + test_id="whitespace-trimmed", + fields=[" name ", " url "], + expected=("name", "url"), + raises=None, + ), + NormalizeFieldsFixture( + test_id="invalid-field-raises", + fields=["invalid_field"], + expected=(), + raises=ValueError, + ), +] + + +@pytest.mark.parametrize( + list(NormalizeFieldsFixture._fields), + NORMALIZE_FIELDS_FIXTURES, + ids=[fixture.test_id for fixture in NORMALIZE_FIELDS_FIXTURES], +) +def test_normalize_fields( + test_id: str, + fields: list[str] | None, + expected: tuple[str, ...], + raises: type[Exception] | None, +) -> None: + """Test normalize_fields function.""" + if raises: + with pytest.raises(raises): + normalize_fields(fields) + else: + result = normalize_fields(fields) + assert result == expected + + +# Unit tests for parse_query_terms + + +class ParseQueryTermsFixture(t.NamedTuple): + """Fixture for parse_query_terms test cases.""" + + test_id: str + terms: list[str] + expected_patterns: list[str] + raises: type[Exception] | None + + +PARSE_QUERY_TERMS_FIXTURES: list[ParseQueryTermsFixture] = [ + ParseQueryTermsFixture( + test_id="simple-term", + terms=["django"], + expected_patterns=["django"], + raises=None, + ), + ParseQueryTermsFixture( + test_id="field-prefixed-term", + terms=["name:django"], + expected_patterns=["django"], + raises=None, + ), + ParseQueryTermsFixture( + test_id="empty-after-prefix-raises", + terms=["name:"], + expected_patterns=[], + raises=ValueError, + ), + ParseQueryTermsFixture( + test_id="unknown-prefix-treated-as-pattern", + terms=["unknownfield:value"], + expected_patterns=["unknownfield:value"], + raises=None, + ), +] + + +@pytest.mark.parametrize( + list(ParseQueryTermsFixture._fields), + PARSE_QUERY_TERMS_FIXTURES, + ids=[fixture.test_id for fixture in PARSE_QUERY_TERMS_FIXTURES], +) +def test_parse_query_terms( + test_id: str, + terms: list[str], + expected_patterns: list[str], + raises: type[Exception] | None, +) -> None: + """Test parse_query_terms function.""" + if raises: + with pytest.raises(raises): + parse_query_terms(terms, default_fields=("name", "url")) + else: + result = parse_query_terms(terms, default_fields=("name", "url")) + assert [token.pattern for token in result] == expected_patterns + + +# Unit tests for compile_search_patterns + + +class CompileSearchPatternsFixture(t.NamedTuple): + """Fixture for compile_search_patterns test cases.""" + + test_id: str + terms: list[str] + ignore_case: bool + smart_case: bool + fixed_strings: bool + word_regexp: bool + test_text: str + should_match: bool + raises: type[Exception] | None + + +COMPILE_SEARCH_PATTERNS_FIXTURES: list[CompileSearchPatternsFixture] = [ + CompileSearchPatternsFixture( + test_id="smart-case-lowercase-ignores-case", + terms=["django"], + ignore_case=False, + smart_case=True, + fixed_strings=False, + word_regexp=False, + test_text="Django", + should_match=True, + raises=None, + ), + CompileSearchPatternsFixture( + test_id="smart-case-uppercase-is-case-sensitive", + terms=["Django"], + ignore_case=False, + smart_case=True, + fixed_strings=False, + word_regexp=False, + test_text="django", + should_match=False, + raises=None, + ), + CompileSearchPatternsFixture( + test_id="word-regexp-matches-whole-word", + terms=["test"], + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=True, + test_text="test", + should_match=True, + raises=None, + ), + CompileSearchPatternsFixture( + test_id="word-regexp-no-partial-match", + terms=["test"], + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=True, + test_text="testing", + should_match=False, + raises=None, + ), + CompileSearchPatternsFixture( + test_id="fixed-strings-escapes-regex", + terms=["a.b"], + ignore_case=False, + smart_case=False, + fixed_strings=True, + word_regexp=False, + test_text="a.b", + should_match=True, + raises=None, + ), + CompileSearchPatternsFixture( + test_id="fixed-strings-no-regex-match", + terms=["a.b"], + ignore_case=False, + smart_case=False, + fixed_strings=True, + word_regexp=False, + test_text="aXb", + should_match=False, + raises=None, + ), + CompileSearchPatternsFixture( + test_id="invalid-regex-raises", + terms=["[invalid"], + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + test_text="", + should_match=False, + raises=ValueError, + ), +] + + +@pytest.mark.parametrize( + list(CompileSearchPatternsFixture._fields), + COMPILE_SEARCH_PATTERNS_FIXTURES, + ids=[fixture.test_id for fixture in COMPILE_SEARCH_PATTERNS_FIXTURES], +) +def test_compile_search_patterns( + test_id: str, + terms: list[str], + ignore_case: bool, + smart_case: bool, + fixed_strings: bool, + word_regexp: bool, + test_text: str, + should_match: bool, + raises: type[Exception] | None, +) -> None: + """Test compile_search_patterns function.""" + tokens = parse_query_terms(terms, default_fields=("name",)) + if raises: + with pytest.raises(raises): + compile_search_patterns( + tokens, + ignore_case=ignore_case, + smart_case=smart_case, + fixed_strings=fixed_strings, + word_regexp=word_regexp, + ) + else: + patterns = compile_search_patterns( + tokens, + ignore_case=ignore_case, + smart_case=smart_case, + fixed_strings=fixed_strings, + word_regexp=word_regexp, + ) + assert len(patterns) == 1 + match = patterns[0].regex.search(test_text) + assert (match is not None) == should_match + + +def test_compile_search_patterns_empty_tokens() -> None: + """Test compile_search_patterns with empty token list.""" + patterns = compile_search_patterns( + [], + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + ) + assert patterns == [] + + +# Unit tests for highlight_text + + +def test_highlight_text_no_patterns() -> None: + """Test highlight_text with no patterns returns original text.""" + colors = Colors(ColorMode.NEVER) + result = highlight_text("django", [], colors=colors) + assert result == "django" + + +def test_highlight_text_no_patterns_with_base_color() -> None: + """Test highlight_text with base_color but no patterns.""" + colors = Colors(ColorMode.ALWAYS) + result = highlight_text("django", [], colors=colors, base_color=colors.INFO) + assert "django" in result + + +def test_highlight_text_with_color_enabled() -> None: + """Test highlight_text with colors enabled.""" + colors = Colors(ColorMode.ALWAYS) + pattern = re.compile("jan", re.IGNORECASE) + result = highlight_text("django", [pattern], colors=colors) + # Should contain ANSI codes for highlighting + assert colors.HIGHLIGHT in result or "jan" in result + + +def test_highlight_text_with_base_color_and_pattern() -> None: + """Test highlight_text with both base_color and pattern.""" + colors = Colors(ColorMode.ALWAYS) + pattern = re.compile("jan", re.IGNORECASE) + result = highlight_text("django", [pattern], colors=colors, base_color=colors.INFO) + assert "jan" in result + + +# Tests for search_repos with matched fields output + + +def test_search_repos_url_field_matched( + user_path: pathlib.Path, + capsys: t.Any, +) -> None: + """Test search output when URL field is matched.""" + config_file = user_path / ".vcspull.yaml" + config_data = { + "~/code/": { + "django": {"repo": "git+https://github.com/django/django.git"}, + }, + } + create_test_config(config_file, config_data) + + search_repos( + query_terms=["url:github"], + config_path=config_file, + workspace_root=None, + output_json=False, + output_ndjson=False, + color="never", + fields=None, + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=False, + ) + + captured = capsys.readouterr() + assert "url:" in captured.out + assert "github" in captured.out + + +def test_search_repos_workspace_field_matched( + user_path: pathlib.Path, + capsys: t.Any, +) -> None: + """Test search output when workspace field is matched.""" + config_file = user_path / ".vcspull.yaml" + config_data = { + "~/code/": { + "django": {"repo": "git+https://github.com/django/django.git"}, + }, + } + create_test_config(config_file, config_data) + + search_repos( + query_terms=["workspace:code"], + config_path=config_file, + workspace_root=None, + output_json=False, + output_ndjson=False, + color="never", + fields=None, + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=False, + ) + + captured = capsys.readouterr() + assert "workspace:" in captured.out + assert "code" in captured.out + + +def test_search_repos_invalid_field_error( + user_path: pathlib.Path, +) -> None: + """Test search_repos handles invalid field gracefully.""" + config_file = user_path / ".vcspull.yaml" + config_data = { + "~/code/": { + "django": {"repo": "git+https://github.com/django/django.git"}, + }, + } + create_test_config(config_file, config_data) + + # Invalid field should trigger ValueError which is caught and logged + results = search_repos( + query_terms=["django"], + config_path=config_file, + workspace_root=None, + output_json=False, + output_ndjson=False, + color="never", + fields=["invalid_field"], + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=False, + emit_output=False, + ) + assert results == [] + + +def test_search_repos_invalid_regex_error( + user_path: pathlib.Path, +) -> None: + """Test search_repos handles invalid regex gracefully.""" + config_file = user_path / ".vcspull.yaml" + config_data = { + "~/code/": { + "django": {"repo": "git+https://github.com/django/django.git"}, + }, + } + create_test_config(config_file, config_data) + + # Invalid regex should trigger ValueError which is caught and logged + results = search_repos( + query_terms=["[invalid"], + config_path=config_file, + workspace_root=None, + output_json=False, + output_ndjson=False, + color="never", + fields=None, + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=False, + emit_output=False, + ) + assert results == [] + + +def test_search_repos_auto_discover_config( + user_path: pathlib.Path, +) -> None: + """Test search_repos with config_path=None to trigger auto-discovery.""" + config_file = user_path / ".vcspull.yaml" + config_data = { + "~/code/": { + "django": {"repo": "git+https://github.com/django/django.git"}, + }, + } + create_test_config(config_file, config_data) + + # config_path=None triggers find_config_files auto-discovery + results = search_repos( + query_terms=["django"], + config_path=None, + workspace_root=None, + output_json=False, + output_ndjson=False, + color="never", + fields=None, + ignore_case=False, + smart_case=False, + fixed_strings=False, + word_regexp=False, + invert_match=False, + match_any=False, + emit_output=False, + ) + assert len(results) == 1 + assert results[0]["name"] == "django" + + +def test_normalize_fields_empty_after_comma() -> None: + """Test normalize_fields with empty string after comma.""" + result = normalize_fields(["name,", ",url"]) + assert result == ("name", "url") diff --git a/tests/test_log.py b/tests/test_log.py index f1fd0400..1ba779bc 100644 --- a/tests/test_log.py +++ b/tests/test_log.py @@ -433,6 +433,7 @@ def test_get_cli_logger_names_includes_base() -> None: "vcspull.cli.discover", "vcspull.cli.fmt", "vcspull.cli.list", + "vcspull.cli.search", "vcspull.cli.status", "vcspull.cli.sync", ]