diff --git a/vcspull/cli/sync.py b/vcspull/cli/sync.py index 4fa6a84c..fb3806be 100644 --- a/vcspull/cli/sync.py +++ b/vcspull/cli/sync.py @@ -1,11 +1,12 @@ import logging +import os import sys -from copy import deepcopy import click import click.shell_completion -from libvcs.shortcuts import create_project_from_pip_url +from libvcs.projects.base import BaseProject +from libvcs.projects.constants import DEFAULT_VCS_CLASS_MAP from ..config import filter_repos, find_config_files, load_configs @@ -22,16 +23,20 @@ def get_repo_completions(ctx: click.core.Context, args, incomplete): repo_terms = [incomplete] for repo_term in repo_terms: - dir, vcs_url, name = None, None, None + repo_dir, name = None, None if any(repo_term.startswith(n) for n in ["./", "/", "~", "$HOME"]): - dir = repo_term - elif any(repo_term.startswith(n) for n in ["http", "git", "svn", "hg"]): - vcs_url = repo_term + repo_dir = repo_term else: name = repo_term # collect the repos from the config files - found_repos.extend(filter_repos(configs, dir=dir, vcs_url=vcs_url, name=name)) + found_repos.extend( + filter_repos( + configs, + filter_repo_dir=repo_dir, + filter_name=name, + ) + ) if len(found_repos) == 0: found_repos = configs @@ -46,10 +51,6 @@ def get_config_file_completions(ctx, args, incomplete): ] -def clamp(n, _min, _max): - return max(_min, min(n, _max)) - - @click.command(name="sync") @click.argument( "repo_terms", type=click.STRING, nargs=-1, shell_complete=get_repo_completions @@ -67,40 +68,37 @@ def sync(repo_terms, config): configs = load_configs([config]) else: configs = load_configs(find_config_files(include_home=True)) - found_repos = [] + + found_repos = {} if repo_terms: for repo_term in repo_terms: - dir, vcs_url, name = None, None, None + repo_dir, name = None, None + if any(repo_term.startswith(n) for n in ["./", "/", "~", "$HOME"]): - dir = repo_term - elif any(repo_term.startswith(n) for n in ["http", "git", "svn", "hg"]): - vcs_url = repo_term + repo_dir = repo_term else: name = repo_term # collect the repos from the config files - found_repos.extend( - filter_repos(configs, dir=dir, vcs_url=vcs_url, name=name) + found_repos |= filter_repos( + configs, + filter_repo_dir=repo_dir, + filter_name=name, ) else: found_repos = configs - list(map(update_repo, found_repos)) + for path, repos in found_repos.items(): + for name, repo in repos.items(): + r: BaseProject = DEFAULT_VCS_CLASS_MAP[repo["vcs"]]( + repo_dir=os.path.join(path, name), + options=repo["options"], + progress_callback=progress_cb, + ) + r.update_repo(set_remotes=True) def progress_cb(output, timestamp): sys.stdout.write(output) sys.stdout.flush() - - -def update_repo(repo_dict): - repo_dict = deepcopy(repo_dict) - if "pip_url" not in repo_dict: - repo_dict["pip_url"] = repo_dict.pop("url") - repo_dict["progress_callback"] = progress_cb - - r = create_project_from_pip_url(**repo_dict) # Creates the repo object - r.update_repo(set_remotes=True) # Creates repo if not exists and fetches - - return r diff --git a/vcspull/config.py b/vcspull/config.py index e33e14d7..46848671 100644 --- a/vcspull/config.py +++ b/vcspull/config.py @@ -16,7 +16,7 @@ from libvcs.projects.git import GitRemote from . import exc -from .util import get_config_dir, update_dict +from .util import get_config_dir log = logging.getLogger(__name__) @@ -45,75 +45,7 @@ def expand_dir( return _dir -def extract_repos(config: dict, cwd=pathlib.Path.cwd()) -> list[dict]: - """Return expanded configuration. - - end-user configuration permit inline configuration shortcuts, expand to - identical format for parsing. - - Parameters - ---------- - config : dict - the repo config in :py:class:`dict` format. - cwd : pathlib.Path - current working dir (for deciphering relative paths) - - Returns - ------- - list : List of normalized repository information - """ - configs = [] - for directory, repos in config.items(): - for repo, repo_data in repos.items(): - - conf = {} - - """ - repo_name: http://myrepo.com/repo.git - - to - - repo_name: { url: 'http://myrepo.com/repo.git' } - - also assures the repo is a :py:class:`dict`. - """ - - if isinstance(repo_data, str): - conf["url"] = repo_data - else: - conf = update_dict(conf, repo_data) - - if "repo" in conf: - if "url" not in conf: - conf["url"] = conf.pop("repo") - else: - conf.pop("repo", None) - - if "name" not in conf: - conf["name"] = repo - if "parent_dir" not in conf: - conf["parent_dir"] = expand_dir(directory, cwd=cwd) - - # repo_dir -> dir in libvcs 0.12.0b25 - if "repo_dir" in conf and "dir" not in conf: - conf["dir"] = conf.pop("repo_dir") - - if "dir" not in conf: - conf["dir"] = expand_dir(conf["parent_dir"] / conf["name"], cwd) - - if "remotes" in conf: - for remote_name, url in conf["remotes"].items(): - conf["remotes"][remote_name] = GitRemote( - name=remote_name, fetch_url=url, push_url=url - ) - configs.append(conf) - - return configs - - -def find_home_config_files( - filetype: list[str] = ["json", "yaml"] -) -> list[pathlib.Path]: +def find_home_config_files(filetype=["json", "yaml"]): """Return configs of ``.vcspull.{yaml,json}`` in user's home directory.""" configs = [] @@ -195,88 +127,96 @@ def find_config_files( return configs -def load_configs(files: list[Union[str, pathlib.Path]], cwd=pathlib.Path.cwd()): +def load_configs(files): """Return repos from a list of files. Parameters ---------- files : list paths to config file - cwd : pathlib.Path - current path (pass down for :func:`extract_repos` Returns ------- list of dict : - expanded config dict item + config dict item Todo ---- Validate scheme, check for duplicate destinations, VCS urls """ - repos = [] - for file in files: - if isinstance(file, str): - file = pathlib.Path(file) - ext = file.suffix.lstrip(".") - conf = kaptan.Kaptan(handler=ext).import_config(str(file)) - newrepos = extract_repos(conf.export("dict"), cwd=cwd) - - if not repos: - repos.extend(newrepos) - continue + repos = {} + for f in files: + _, ext = os.path.splitext(f) + conf = kaptan.Kaptan(handler=ext.lstrip(".")).import_config(f).export("dict") + + newrepos = {} + + for path, repo in conf.items(): + newrepos[expand_dir(path)] = repo dupes = detect_duplicate_repos(repos, newrepos) if dupes: - msg = ("repos with same path + different VCS detected!", dupes) + msg = ("repos for the same parent_dir and repo_name detected!", dupes) raise exc.VCSPullException(msg) - repos.extend(newrepos) + + repos |= newrepos return repos -def detect_duplicate_repos(repos1: list[dict], repos2: list[dict]): - """Return duplicate repos dict if repo_dir same and vcs different. +def detect_duplicate_repos(config1, config2): + """Return duplicate repos dict if repo_dir is the same Parameters ---------- - repos1 : dict - list of repo expanded dicts + config1 : dict + config dict - repos2 : dict - list of repo expanded dicts + config2 : dict + config dict Returns ------- list of dict, or None Duplicate repos """ + if not config1: + return None + dupes = [] - path_dupe_repos = [] - curpaths = [r["dir"] for r in repos1] - newpaths = [r["dir"] for r in repos2] - path_duplicates = list(set(curpaths).intersection(newpaths)) + for parent_path, repos in config2.items(): + if parent_path in config1: + for name, repo in repos.items(): + if name in config1[parent_path]: + dupes += (repo, config1[parent_path][name]) - if not path_duplicates: - return None + return dupes - path_dupe_repos.extend( - [r for r in repos2 if any(r["dir"] == p for p in path_duplicates)] - ) - if not path_dupe_repos: - return None +def get_repo_dirs(config): + """return a dict of repo paths with their corresponding repos for each repo + in the config list. - for n in path_dupe_repos: - currepo = next((r for r in repos1 if r["dir"] == n["dir"]), None) - if n["url"] != currepo["url"]: - dupes += (n, currepo) - return dupes + Parameters + ---------- + config: dict + list of repos + + Returns + ------- + dict + """ + path_repos = {} + for parent_dir, repos in config.items(): + for name, repo in repos.items(): + path_repos[os.path.join(parent_dir, name)] = repo + return path_repos -def in_dir(config_dir=None, extensions: list[str] = [".yml", ".yaml", ".json"]): + +def in_dir(config_dir, extensions=[".yml", ".yaml", ".json"]): """Return a list of configs in ``config_dir``. Parameters @@ -301,25 +241,18 @@ def in_dir(config_dir=None, extensions: list[str] = [".yml", ".yaml", ".json"]): return configs -def filter_repos( - config: dict, - dir: Union[pathlib.Path, None] = None, - vcs_url: Union[str, None] = None, - name: Union[str, None] = None, -): - """Return a :py:obj:`list` list of repos from (expanded) config file. +def filter_repos(config, filter_repo_dir=None, filter_name=None): + """Return a :py:obj:`list` list of repos from config file. dir, vcs_url and name all support fnmatch. Parameters ---------- - config : dict - the expanded repo config in :py:class:`dict` format. - dir : str, Optional + config : dist + the repo config in :py:class:`dict` format. + filter_repo_dir : str, Optional directory of checkout location, fnmatch pattern supported - vcs_url : str, Optional - url of vcs remote, fn match pattern supported - name : str, Optional + filter_name : str, Optional project name, fnmatch pattern supported Returns @@ -327,20 +260,20 @@ def filter_repos( list : Repos """ - repo_list = [] + matched_repos = {} - if dir: - repo_list.extend([r for r in config if fnmatch.fnmatch(r["parent_dir"], dir)]) - - if vcs_url: - repo_list.extend( - r for r in config if fnmatch.fnmatch(r.get("url", r.get("repo")), vcs_url) - ) + if filter_repo_dir: + for path, repos in config.items(): + if fnmatch.fnmatch(path, filter_repo_dir): + matched_repos[filter_repo_dir] = repos - if name: - repo_list.extend([r for r in config if fnmatch.fnmatch(r.get("name"), name)]) + if filter_name: + for path, repos in config.items(): + for name, repo in repos.items(): + if fnmatch.fnmatch(name, filter_name): + matched_repos[path] = {filter_name: repo} - return repo_list + return matched_repos def is_config_file(filename: str, extensions: list[str] = [".yml", ".yaml", ".json"]):