Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 28 additions & 31 deletions vcspull/cli/sync.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
import os
import sys
from copy import deepcopy

import click
import click.shell_completion

from libvcs.shortcuts import create_repo_from_pip_url
from libvcs.states.base import BaseRepo
from libvcs.states.constants import DEFAULT_VCS_CLASS_MAP

from ..config import filter_repos, find_config_files, load_configs
from ..log import setup_logger
Expand All @@ -19,16 +20,19 @@ def get_repo_completions(ctx: click.core.Context, args, incomplete):
repo_terms = [incomplete]

for repo_term in repo_terms:
dir, vcs_url, name = None, None, None
if any(repo_term.startswith(n) for n in ["./", "/", "~", "$HOME"]):
dir = repo_term
elif any(repo_term.startswith(n) for n in ["http", "git", "svn", "hg"]):
vcs_url = repo_term
repo_dir = repo_term
else:
name = repo_term

# collect the repos from the config files
found_repos.extend(filter_repos(configs, dir=dir, vcs_url=vcs_url, name=name))
found_repos.extend(
filter_repos(
configs,
filter_repo_dir=repo_dir,
filter_name=name,
)
)
if len(found_repos) == 0:
found_repos = configs

Expand All @@ -43,10 +47,6 @@ def get_config_file_completions(ctx, args, incomplete):
]


def clamp(n, _min, _max):
return max(_min, min(n, _max))


@click.command(name="sync")
@click.argument(
"repo_terms", type=click.STRING, nargs=-1, shell_complete=get_repo_completions
Expand All @@ -70,40 +70,37 @@ def sync(repo_terms, log_level, config):
configs = load_configs([config])
else:
configs = load_configs(find_config_files(include_home=True))
found_repos = []

found_repos = {}

if repo_terms:
for repo_term in repo_terms:
dir, vcs_url, name = None, None, None
repo_dir, name = None, None

if any(repo_term.startswith(n) for n in ["./", "/", "~", "$HOME"]):
dir = repo_term
elif any(repo_term.startswith(n) for n in ["http", "git", "svn", "hg"]):
vcs_url = repo_term
repo_dir = repo_term
else:
name = repo_term

# collect the repos from the config files
found_repos.extend(
filter_repos(configs, dir=dir, vcs_url=vcs_url, name=name)
found_repos |= filter_repos(
configs,
filter_repo_dir=repo_dir,
filter_name=name,
)
else:
found_repos = configs

list(map(update_repo, found_repos))
for path, repos in found_repos.items():
for name, repo in repos.items():
r: BaseRepo = DEFAULT_VCS_CLASS_MAP[repo["vcs"]](
repo_dir=os.path.join(path, name),
options=repo["options"],
progress_callback=progress_cb,
)
r.update_repo(set_remotes=True)


def progress_cb(output, timestamp):
sys.stdout.write(output)
sys.stdout.flush()


def update_repo(repo_dict):
repo_dict = deepcopy(repo_dict)
if "pip_url" not in repo_dict:
repo_dict["pip_url"] = repo_dict.pop("url")
repo_dict["progress_callback"] = progress_cb

r = create_repo_from_pip_url(**repo_dict) # Creates the repo object
r.update_repo(set_remotes=True) # Creates repo if not exists and fetches

return r
199 changes: 68 additions & 131 deletions vcspull/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@

import kaptan

from libvcs.states.git import GitRemote

from . import exc
from .util import get_config_dir, update_dict
from .util import get_config_dir

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,72 +43,6 @@ def expand_dir(
return _dir


def extract_repos(config: dict, cwd=pathlib.Path.cwd()) -> list[dict]:
"""Return expanded configuration.

end-user configuration permit inline configuration shortcuts, expand to
identical format for parsing.

Parameters
----------
config : dict
the repo config in :py:class:`dict` format.
cwd : pathlib.Path
current working dir (for deciphering relative paths)

Returns
-------
list : List of normalized repository information
"""
configs = []
for directory, repos in config.items():
for repo, repo_data in repos.items():

conf = {}

"""
repo_name: http://myrepo.com/repo.git

to

repo_name: { url: 'http://myrepo.com/repo.git' }

also assures the repo is a :py:class:`dict`.
"""

if isinstance(repo_data, str):
conf["url"] = repo_data
else:
conf = update_dict(conf, repo_data)

if "repo" in conf:
if "url" not in conf:
conf["url"] = conf.pop("repo")
else:
conf.pop("repo", None)

if "name" not in conf:
conf["name"] = repo
if "parent_dir" not in conf:
conf["parent_dir"] = expand_dir(directory, cwd=cwd)

# repo_dir -> dir in libvcs 0.12.0b25
if "repo_dir" in conf and "dir" not in conf:
conf["dir"] = conf.pop("repo_dir")

if "dir" not in conf:
conf["dir"] = expand_dir(conf["parent_dir"] / conf["name"], cwd)

if "remotes" in conf:
for remote_name, url in conf["remotes"].items():
conf["remotes"][remote_name] = GitRemote(
name=remote_name, fetch_url=url, push_url=url
)
configs.append(conf)

return configs


def find_home_config_files(
filetype: list[str] = ["json", "yaml"]
) -> list[pathlib.Path]:
Expand Down Expand Up @@ -195,85 +127,97 @@ def find_config_files(
return configs


def load_configs(files: list[Union[str, pathlib.Path]], cwd=pathlib.Path.cwd()):
def load_configs(files: list[Union[str, pathlib.Path]]):
"""Return repos from a list of files.

Parameters
----------
files : list
paths to config file
cwd : pathlib.Path
current path (pass down for :func:`extract_repos`

Returns
-------
list of dict :
expanded config dict item
config dict item

Todo
----
Validate scheme, check for duplicate destinations, VCS urls
"""
repos = []
for file in files:
if isinstance(file, str):
file = pathlib.Path(file)
ext = file.suffix.lstrip(".")
conf = kaptan.Kaptan(handler=ext).import_config(str(file))
newrepos = extract_repos(conf.export("dict"), cwd=cwd)

if not repos:
repos.extend(newrepos)
continue
repos = {}
for f in files:
if isinstance(f, str):
f = pathlib.Path(f)
ext = f.suffix.lstrip(".")

_, ext = os.path.splitext(f)
conf = kaptan.Kaptan(handler=ext.lstrip(".")).import_config(f).export("dict")

newrepos = {}

for path, repo in conf.items():
newrepos[expand_dir(path)] = repo

dupes = detect_duplicate_repos(repos, newrepos)

if dupes:
msg = ("repos with same path + different VCS detected!", dupes)
msg = ("repos for the same parent_dir and repo_name detected!", dupes)
raise exc.VCSPullException(msg)
repos.extend(newrepos)

repos |= newrepos

return repos


def detect_duplicate_repos(repos1: list[dict], repos2: list[dict]):
"""Return duplicate repos dict if repo_dir same and vcs different.
def detect_duplicate_repos(config1, config2):
"""Return duplicate repos dict if repo_dir is the same

Parameters
----------
repos1 : dict
list of repo expanded dicts
config1 : dict
config dict

repos2 : dict
list of repo expanded dicts
config2 : dict
config dict

Returns
-------
list of dict, or None
Duplicate repos
"""
if not config1:
return None

dupes = []
path_dupe_repos = []

curpaths = [r["dir"] for r in repos1]
newpaths = [r["dir"] for r in repos2]
path_duplicates = list(set(curpaths).intersection(newpaths))
for parent_path, repos in config2.items():
if parent_path in config1:
for name, repo in repos.items():
if name in config1[parent_path]:
dupes += (repo, config1[parent_path][name])

if not path_duplicates:
return None
return dupes

path_dupe_repos.extend(
[r for r in repos2 if any(r["dir"] == p for p in path_duplicates)]
)

if not path_dupe_repos:
return None
def get_repo_dirs(config):
"""return a dict of repo paths with their corresponding repos for each repo
in the config list.

for n in path_dupe_repos:
currepo = next((r for r in repos1 if r["dir"] == n["dir"]), None)
if n["url"] != currepo["url"]:
dupes += (n, currepo)
return dupes
Parameters
----------
config: dict
list of repos

Returns
-------
dict
"""
path_repos = {}
for parent_dir, repos in config.items():
for name, repo in repos.items():
path_repos[os.path.join(parent_dir, name)] = repo

return path_repos


def in_dir(config_dir=None, extensions: list[str] = [".yml", ".yaml", ".json"]):
Expand Down Expand Up @@ -301,46 +245,39 @@ def in_dir(config_dir=None, extensions: list[str] = [".yml", ".yaml", ".json"]):
return configs


def filter_repos(
config: dict,
dir: Union[pathlib.Path, None] = None,
vcs_url: Union[str, None] = None,
name: Union[str, None] = None,
):
"""Return a :py:obj:`list` list of repos from (expanded) config file.
def filter_repos(config, filter_repo_dir=None, filter_name=None):
"""Return a :py:obj:`list` list of repos from config file.

dir, vcs_url and name all support fnmatch.

Parameters
----------
config : dict
the expanded repo config in :py:class:`dict` format.
dir : str, Optional
the repo config in :py:class:`dict` format.
filter_repo_dir : str, Optional
directory of checkout location, fnmatch pattern supported
vcs_url : str, Optional
url of vcs remote, fn match pattern supported
name : str, Optional
filter_name : str, Optional
project name, fnmatch pattern supported

Returns
-------
list :
Repos
"""
repo_list = []

if dir:
repo_list.extend([r for r in config if fnmatch.fnmatch(r["parent_dir"], dir)])
matched_repos = {}

if vcs_url:
repo_list.extend(
r for r in config if fnmatch.fnmatch(r.get("url", r.get("repo")), vcs_url)
)
if filter_repo_dir:
for path, repos in config.items():
if fnmatch.fnmatch(path, filter_repo_dir):
matched_repos[filter_repo_dir] = repos

if name:
repo_list.extend([r for r in config if fnmatch.fnmatch(r.get("name"), name)])
if filter_name:
for path, repos in config.items():
for name, repo in repos.items():
if fnmatch.fnmatch(name, filter_name):
matched_repos[path] = {filter_name: repo}

return repo_list
return matched_repos


def is_config_file(filename: str, extensions: list[str] = [".yml", ".yaml", ".json"]):
Expand Down