diff --git a/constructor/build_outputs.py b/constructor/build_outputs.py index 32d224843..bb1c5c775 100644 --- a/constructor/build_outputs.py +++ b/constructor/build_outputs.py @@ -56,7 +56,7 @@ def dump_hash(info, algorithm=None): invalid = algorithms.difference(set(hashlib.algorithms_available)) raise ValueError(f"Invalid algorithm: {', '.join(invalid)}") BUFFER_SIZE = 65536 - if isinstance(info["_outpath"], str): + if isinstance(info["_outpath"], (str, Path)): installers = [Path(info["_outpath"])] else: installers = [Path(outpath) for outpath in info["_outpath"]] @@ -75,14 +75,13 @@ def dump_hash(info, algorithm=None): return ", ".join(outpaths) -def dump_info(info): - outpath = os.path.join(info["_output_dir"], "info.json") - with open(outpath, "w") as f: - json.dump(info, f, indent=2, default=repr) - return os.path.abspath(outpath) +def dump_info(info) -> Path: + outpath = (info["_output_dir"] / "info.json").resolve() + outpath.write_text(json.dumps(info, indent=2, default=repr)) + return outpath -def dump_packages_list(info, env="base"): +def dump_packages_list(info, env="base") -> Path: if env == "base": dists = info["_dists"] elif env in info["_extra_envs_info"]: @@ -90,11 +89,11 @@ def dump_packages_list(info, env="base"): else: raise ValueError(f"env='{env}' is not a valid env name.") - outpath = os.path.join(info["_output_dir"], f"pkg-list.{env}.txt") + outpath = (info["_output_dir"] / f"pkg-list.{env}.txt").resolve() with open(outpath, "w") as fo: fo.write(f"# {info['name']} {info['version']}, env={env}\n") fo.write("\n".join(dists)) - return os.path.abspath(outpath) + return outpath def dump_lockfile(info, env="base"): @@ -123,10 +122,9 @@ def dump_lockfile(info, env="base"): hash_value = record.get("md5") lines.append(url + (f"#{hash_value}" if hash_value else "")) - outpath = os.path.join(info["_output_dir"], f"lockfile.{env}.txt") - with open(outpath, "w") as f: - f.write("\n".join(lines)) - return os.path.abspath(outpath) + outpath = (info["_output_dir"] / f"lockfile.{env}.txt").resolve() + outpath.write_text("\n".join(lines)) + return outpath def dump_licenses(info, include_text=False, text_errors=None): @@ -159,24 +157,24 @@ def dump_licenses(info, include_text=False, text_errors=None): licenses = defaultdict(dict) for pkg_record in info["_all_pkg_records"]: extracted_package_dir = pkg_record.extracted_package_dir - licenses_dir = os.path.join(extracted_package_dir, "info", "licenses") + licenses_dir = Path(extracted_package_dir, "info", "licenses") licenses[pkg_record.dist_str()]["type"] = pkg_record.license licenses[pkg_record.dist_str()]["files"] = license_files = [] - if not os.path.isdir(licenses_dir): + if not licenses_dir.is_dir(): continue + # FUTURE: pathlib.Path() has .walk() in Python 3.12+ for directory, _, files in os.walk(licenses_dir): for filepath in files: - license_path = os.path.join(directory, filepath) - license_file = {"path": license_path, "text": None} + license_path = Path(directory, filepath) + license_file = {"path": str(license_path), "text": None} if include_text: - license_file["text"] = Path(license_path).read_text(errors=text_errors) + license_file["text"] = license_path.read_text(errors=text_errors) license_files.append(license_file) - outpath = os.path.join(info["_output_dir"], "licenses.json") - with open(outpath, "w") as f: - json.dump(licenses, f, indent=2, default=repr) - return os.path.abspath(outpath) + outpath = (info["_output_dir"] / "licenses.json").resolve() + outpath.write_text(json.dumps(licenses, indent=2, default=repr)) + return outpath OUTPUT_HANDLERS = { diff --git a/constructor/conda_interface.py b/constructor/conda_interface.py index b66d3e327..4b815b222 100644 --- a/constructor/conda_interface.py +++ b/constructor/conda_interface.py @@ -10,7 +10,7 @@ import sys from copy import deepcopy from itertools import chain -from os.path import join +from pathlib import Path from conda.gateways.disk import mkdir_p_sudo_safe @@ -163,7 +163,7 @@ def write_repodata(cache_dir, url, full_repodata, used_packages, info): raise NotImplementedError("Package type is unknown for: %s" % package) if original_package in full_repodata.get(original_key, {}): data = deepcopy(full_repodata[original_key][original_package]) - pkg_fn = join(info["_download_dir"], package) + pkg_fn = info["_download_dir"] / package data["size"] = os.stat(pkg_fn).st_size data["sha256"] = hash_files([pkg_fn], algorithm="sha256") data["md5"] = hash_files([pkg_fn]) @@ -183,7 +183,7 @@ def write_repodata(cache_dir, url, full_repodata, used_packages, info): } ) repodata = repodata_header[:-1] + "," + repodata[1:] - repodata_filepath = join(cache_dir, _cache_fn_url(repodata_url)) + repodata_filepath = cache_dir / _cache_fn_url(repodata_url) with open(repodata_filepath, "w") as fh: fh.write(repodata) @@ -196,6 +196,6 @@ def write_repodata(cache_dir, url, full_repodata, used_packages, info): # Maybe it's not needed anymore. def write_cache_dir(): - cache_dir = join(PackageCacheData.first_writable().pkgs_dir, "cache") + cache_dir = Path(PackageCacheData.first_writable().pkgs_dir, "cache") mkdir_p_sudo_safe(cache_dir) return cache_dir diff --git a/constructor/construct.py b/constructor/construct.py index 1bb411ec6..214faea19 100644 --- a/constructor/construct.py +++ b/constructor/construct.py @@ -14,7 +14,6 @@ import re import sys from functools import partial -from os.path import dirname from pathlib import Path from jsonschema import Draft202012Validator, validators @@ -108,16 +107,15 @@ def yamlize(data, directory, content_filter): return yaml.load(data) -def parse(path, platform): +def parse(path: Path, platform): try: - with open(path) as fi: - data = fi.read() + data = path.read_text() except OSError: sys.exit("Error: could not open '%s' for reading" % path) - directory = dirname(path) + directory = path.parent content_filter = partial(select_lines, namespace=ns_platform(platform)) try: - res = yamlize(data, directory, content_filter) + res = yamlize(data, str(directory), content_filter) except YamlParsingError as e: sys.exit(e.error_msg()) diff --git a/constructor/fcp.py b/constructor/fcp.py index 45b93bcd1..079e01c8e 100644 --- a/constructor/fcp.py +++ b/constructor/fcp.py @@ -7,6 +7,8 @@ fcp (fetch conda packages) module """ +from __future__ import annotations + import logging import os import shutil @@ -14,7 +16,7 @@ import tempfile from collections import defaultdict from itertools import groupby -from os.path import abspath, expanduser, isdir, join +from pathlib import Path from subprocess import check_call from typing import TYPE_CHECKING @@ -133,10 +135,10 @@ def _show(name, version, platform, download_dir, precs, more_recent_versions={}) logger.debug(" %s", prec.fn) -def _fetch(download_dir, precs): - assert conda_context.pkgs_dirs[0] == download_dir +def _fetch(download_dir: Path, precs): + assert Path(conda_context.pkgs_dirs[0]) == download_dir pc = PackageCacheData.first_writable() - assert pc.pkgs_dir == download_dir + assert Path(pc.pkgs_dir) == download_dir assert pc.is_writable, f"{download_dir} does not exist or is not writable" ProgressiveFetchExtract(precs).execute() @@ -156,7 +158,7 @@ def check_duplicates_files(pc_recs, platform, duplicate_files="error"): for pc_rec in pc_recs: fn = pc_rec.fn - extracted_package_dir = pc_rec.extracted_package_dir + extracted_package_dir = Path(pc_rec.extracted_package_dir) total_tarball_size += int(pc_rec.get("size", 0)) @@ -164,9 +166,9 @@ def check_duplicates_files(pc_recs, platform, duplicate_files="error"): for path_data in paths_data: short_path = path_data.path try: - size = path_data.size_in_bytes or getsize(join(extracted_package_dir, short_path)) + size = path_data.size_in_bytes or getsize(extracted_package_dir / short_path) except AttributeError: - size = getsize(join(extracted_package_dir, short_path)) + size = getsize(extracted_package_dir / short_path) total_extracted_pkgs_size += size map_members_scase[short_path].add(fn) @@ -204,13 +206,14 @@ def check_duplicates_files(pc_recs, platform, duplicate_files="error"): return total_tarball_size, total_extracted_pkgs_size -def _precs_from_environment(environment, input_dir): - if not isdir(environment) and ("/" in environment or "\\" in environment): - env2 = join(input_dir, environment) - if isdir(env2): +def _precs_from_environment(environment: Path, input_dir: Path): + environment = Path(environment) + if not environment.is_dir() and len(environment.parts) > 1: + env2 = input_dir / environment + if env2.is_dir(): environment = env2 - if isdir(environment): - environment = abspath(join(input_dir, expanduser(environment))) + if environment.is_dir(): + environment = (input_dir / environment.expanduser()).resolve() else: environment = locate_prefix_by_name(environment) pdata = PrefixData(environment) @@ -267,13 +270,13 @@ def _solve_precs( sys.exit("CONDA_EXE env variable is empty. Need to activate a conda env.") # make the environment, if needed if environment_file: - environment = tempfile.mkdtemp() + environment = Path(tempfile.mkdtemp()) new_env = os.environ.copy() new_env["CONDA_SUBDIR"] = platform # use conda env for yaml, and standard conda create otherwise subcommand = ( ["env", "create"] - if environment_file.endswith((".yml", ".yaml")) + if environment_file.suffix in (".yml", ".yaml") else ["create", "--yes"] ) if channel_urls: @@ -356,14 +359,14 @@ def _fetch_precs(precs, download_dir, transmute_file_type=""): dist = filename_dist(dist) new_file_name = "%s%s" % (dist[:-8], transmute_file_type) new_dists.append(new_file_name) - new_file_name = join(download_dir, new_file_name) - if os.path.exists(new_file_name): + new_file_name = Path(download_dir, new_file_name) + if new_file_name.exists(): continue logger.info("transmuting %s", dist) conda_package_handling.api.transmute( - os.path.join(download_dir, dist), + str(download_dir / dist), transmute_file_type, - out_folder=download_dir, + out_folder=str(download_dir), ) else: new_dists.append(dist) @@ -375,7 +378,7 @@ def _fetch_precs(precs, download_dir, transmute_file_type=""): def _main( name, version, - download_dir, + download_dir: Path, platform, channel_urls=(), channels_remap=(), @@ -384,14 +387,14 @@ def _main( menu_packages=None, ignore_duplicate_files=True, environment=None, - environment_file=None, + environment_file: Path | None = None, verbose=True, dry_run=False, - conda_exe="conda.exe", + conda_exe: Path = Path("conda.exe"), transmute_file_type="", extra_envs=None, check_path_spaces=True, - input_dir="", + input_dir: Path = Path.cwd(), ): precs = _solve_precs( name, @@ -481,9 +484,9 @@ def _main( def main(info, verbose=True, dry_run=False, conda_exe="conda.exe"): name = info["name"] - input_dir = info["_input_dir"] + input_dir: Path = info["_input_dir"] version = info["version"] - download_dir = info["_download_dir"] + download_dir: Path = info["_download_dir"] platform = info["_platform"] channel_urls = all_channel_urls(info.get("channels", ()), subdirs=[platform, "noarch"]) channels_remap = info.get("channels_remap", ()) @@ -492,7 +495,7 @@ def main(info, verbose=True, dry_run=False, conda_exe="conda.exe"): menu_packages = info.get("menu_packages") ignore_duplicate_files = info.get("ignore_duplicate_files", True) environment = info.get("environment", None) - environment_file = info.get("environment_file", None) + environment_file: Path = info.get("environment_file", None) transmute_file_type = info.get("transmute_file_type", "") extra_envs = info.get("extra_envs", {}) check_path_spaces = info.get("check_path_spaces", True) @@ -517,7 +520,7 @@ def main(info, verbose=True, dry_run=False, conda_exe="conda.exe"): # Restoring the state for "proxy_servers" to what it was before conda_context.proxy_servers = proxy_servers assert conda_context.ssl_verify == _ssl_verify - assert conda_context.pkgs_dirs and conda_context.pkgs_dirs[0] == download_dir + assert conda_context.pkgs_dirs and Path(conda_context.pkgs_dirs[0]) == download_dir ( pkg_records, diff --git a/constructor/imaging.py b/constructor/imaging.py index b6ecd3347..48df7e2a8 100644 --- a/constructor/imaging.py +++ b/constructor/imaging.py @@ -10,14 +10,13 @@ import sys from io import BytesIO -from os.path import dirname, join +from pathlib import Path from random import randint from PIL import Image, ImageDraw, ImageFont -ttf_path = join(dirname(__file__), "ttf", "Vera.ttf") -with open(ttf_path, "rb") as f: - ttf_bytes = f.read() +ttf_path = Path(__file__).parent / "ttf" / "Vera.ttf" +ttf_bytes = ttf_path.read_bytes() white = 0xFF, 0xFF, 0xFF # These are for Windows welcome_size = 164, 314 @@ -99,7 +98,7 @@ def add_color_info(info): sys.exit("Error: color '%s' not defined" % color_name) -def write_images(info, dir_path, os="windows"): +def write_images(info, dir_path: Path, os="windows"): if os == "windows": instructions = [ ("welcome", welcome_size, mk_welcome_image, ".bmp"), @@ -122,7 +121,7 @@ def write_images(info, dir_path, os="windows"): add_color_info(info) im = function(info) assert im.size == size - im.save(join(dir_path, name + ext)) + im.save(dir_path / f"{name}{ext}") if __name__ == "__main__": diff --git a/constructor/main.py b/constructor/main.py index f76e81ab8..ecbeecf1f 100644 --- a/constructor/main.py +++ b/constructor/main.py @@ -14,7 +14,7 @@ import logging import os import sys -from os.path import abspath, expanduser, isdir, join +from pathlib import Path from textwrap import dedent from . import __version__ @@ -25,9 +25,9 @@ from .construct import parse as construct_parse from .construct import verify as construct_verify from .fcp import main as fcp_main -from .utils import StandaloneExe, identify_conda_exe, normalize_path, yield_lines +from .utils import StandaloneExe, identify_conda_exe, yield_lines -DEFAULT_CACHE_DIR = os.getenv("CONSTRUCTOR_CACHE", "~/.conda/constructor") +DEFAULT_CACHE_DIR = Path(os.getenv("CONSTRUCTOR_CACHE", "~/.conda/constructor")) logger = logging.getLogger(__name__) @@ -74,34 +74,34 @@ def get_output_filename(info): def main_build( - dir_path, - output_dir=".", + dir_path: Path, + output_dir: Path = Path("."), platform=cc_platform, verbose=True, - cache_dir=DEFAULT_CACHE_DIR, + cache_dir: Path = DEFAULT_CACHE_DIR, dry_run=False, - conda_exe="conda.exe", + conda_exe: Path = Path("conda.exe"), config_filename="construct.yaml", debug=False, ): logger.info("platform: %s", platform) - if not os.path.isfile(conda_exe): + if not conda_exe.is_file(): sys.exit("Error: Conda executable '%s' does not exist!" % conda_exe) - cache_dir = abspath(expanduser(cache_dir)) + cache_dir = cache_dir.expanduser().resolve() try: osname, unused_arch = platform.split("-") except ValueError: sys.exit("Error: invalid platform string '%s'" % platform) - construct_path = join(dir_path, config_filename) + construct_path = Path(dir_path, config_filename) info = construct_parse(construct_path, platform) construct_verify(info) info["CONSTRUCTOR_VERSION"] = __version__ info["_input_dir"] = dir_path info["_output_dir"] = output_dir info["_platform"] = platform - info["_download_dir"] = join(cache_dir, platform) - info["_conda_exe"] = abspath(conda_exe) + info["_download_dir"] = Path(cache_dir, platform) + info["_conda_exe"] = conda_exe.resolve() info["_debug"] = debug itypes = get_installer_type(info) @@ -145,10 +145,10 @@ def main_build( "post_install_pages", ): if value := info.get(key): # only join if there's a truthy value set - if isinstance(value, str): - info[key] = abspath(join(dir_path, info[key])) + if isinstance(value, (str, Path)): + info[key] = Path(dir_path, info[key]).resolve() elif isinstance(value, list): - info[key] = [abspath(join(dir_path, val)) for val in value] + info[key] = [Path(dir_path, val).resolve() for val in value] # Normalize name and set default value if info.get("windows_signing_tool"): @@ -160,7 +160,7 @@ def main_build( if key not in info: continue if isinstance(info[key], str): - info[key] = list(yield_lines(join(dir_path, info[key]))) + info[key] = list(yield_lines(Path(dir_path, info[key]))) # normalize paths to be copied; if they are relative, they must be to # construct.yaml's parent (dir_path) @@ -169,11 +169,11 @@ def main_build( extras = info.get(extra_type, ()) new_extras = [] for path in extras: - if isinstance(path, str): - new_extras.append(abspath(join(dir_path, path))) + if isinstance(path, (str, Path)): + new_extras.append(Path(dir_path, path).resolve()) elif isinstance(path, dict): for orig, dest in path.items(): - orig = abspath(join(dir_path, orig)) + orig = Path(dir_path, orig).resolve() new_extras.append({orig: dest}) info[extra_type] = new_extras @@ -190,7 +190,7 @@ def main_build( raise ValueError(f"Environment name '{env_name}' cannot be used") for config_key, value in env_config.copy().items(): if config_key == "environment_file": - env_config[config_key] = abspath(join(dir_path, value)) + env_config[config_key] = Path(dir_path, value).resolve() elif config_key == "channels_remap": env_config[config_key] = [ {"src": item["src"].strip(), "dest": item["dest"].strip()} for item in value @@ -274,7 +274,7 @@ def main_build( "Error: 'initialize_conda == condabin' requires 'conda >=25.5.0' in base env." ) - os.makedirs(output_dir, exist_ok=True) + output_dir.mkdir(parents=True, exist_ok=True) info_dicts = [] for itype in itypes: if itype == "sh": @@ -290,7 +290,7 @@ def main_build( create = winexe_create info["installer_type"] = itype - info["_outpath"] = abspath(join(output_dir, get_output_filename(info))) + info["_outpath"] = Path(output_dir, get_output_filename(info)).resolve() create(info, verbose=verbose) if len(itypes) > 1: info_dicts.append(info.copy()) @@ -382,9 +382,9 @@ def main(argv=None): p.add_argument( "--output-dir", action="store", - default=os.getcwd(), + default=Path.cwd(), help="path to directory in which output installer is written " - f"to, defaults to CWD ('{os.getcwd()}')", + f"to, defaults to CWD ('{Path.cwd()}')", metavar="PATH", ) @@ -446,7 +446,7 @@ def main(argv=None): help="directory containing construct.yaml", action="store", nargs="?", - default=os.getcwd(), + default=Path.cwd(), metavar="DIRECTORY", ) @@ -459,44 +459,45 @@ def main(argv=None): if args.clean: import shutil - cache_dir = abspath(expanduser(args.cache_dir)) + cache_dir = Path(args.cache_dir).expanduser().resolve() logger.info("cleaning cache: '%s'", cache_dir) - if isdir(cache_dir): + if cache_dir.is_dir(): shutil.rmtree(cache_dir) return - dir_path = args.dir_path - if not isdir(dir_path): + dir_path = Path(args.dir_path) + if not dir_path.is_dir(): p.error("no such directory: %s" % dir_path) if os.sep in args.config_filename: p.error("--config-filename can only be a filename, not a path") - full_config_path = os.path.join(dir_path, args.config_filename) - if not os.path.isfile(full_config_path): + full_config_path = dir_path / args.config_filename + if not full_config_path.is_file(): p.error("no such file: %s" % full_config_path) - conda_exe = args.conda_exe - conda_exe_default_path = os.path.join(sys.prefix, "standalone_conda", "conda.exe") - conda_exe_default_path = normalize_path(conda_exe_default_path) - if conda_exe: - conda_exe = normalize_path(os.path.abspath(conda_exe)) + conda_exe_default_path = Path(sys.prefix, "standalone_conda", "conda.exe").resolve() + if args.conda_exe: + conda_exe = Path(args.conda_exe) elif args.platform != cc_platform: p.error("setting --conda-exe is required for building a non-native installer") else: conda_exe = conda_exe_default_path - if not os.path.isfile(conda_exe): + if not conda_exe.is_file(): if conda_exe != conda_exe_default_path: p.error("file not found: %s" % args.conda_exe) p.error( - """ -no standalone conda executable was found. The -easiest way to obtain one is to install the 'conda-standalone' package. -Alternatively, you can download an executable manually and supply its -path with the --conda-exe argument. Self-contained executables can be -downloaded from https://repo.anaconda.com/pkgs/misc/conda-execs/ and/or -https://github.com/conda/conda-standalone/releases""".lstrip() + dedent( + """ + no standalone conda executable was found. The + easiest way to obtain one is to install the 'conda-standalone' package. + Alternatively, you can download an executable manually and supply its + path with the --conda-exe argument. Self-contained executables can be + downloaded from https://repo.anaconda.com/pkgs/misc/conda-execs/ and/or + https://github.com/conda/conda-standalone/releases + """ + ).lstrip() ) - - out_dir = normalize_path(args.output_dir) + conda_exe = conda_exe.resolve() + out_dir = Path(args.output_dir) main_build( dir_path, output_dir=out_dir, diff --git a/constructor/osxpkg.py b/constructor/osxpkg.py index 3785ac617..2a4c7169b 100644 --- a/constructor/osxpkg.py +++ b/constructor/osxpkg.py @@ -2,6 +2,8 @@ Logic to build PKG installers for macOS. """ +from __future__ import annotations + import logging import os import shlex @@ -9,7 +11,6 @@ import subprocess import sys import xml.etree.ElementTree as ET -from os.path import abspath, dirname, exists, isdir, join from pathlib import Path from plistlib import dump as plist_dump from tempfile import NamedTemporaryFile @@ -31,7 +32,7 @@ shortcuts_flags, ) -OSX_DIR = join(dirname(__file__), "osx") +OSX_DIR = (Path(__file__).parent / "osx").resolve() CACHE_DIR = PACKAGE_ROOT = PACKAGES_DIR = SCRIPTS_DIR = None logger = logging.getLogger(__name__) @@ -49,9 +50,8 @@ def calculate_install_dir(yaml_file, subdir=None): def write_readme(dst, info): - src = join(OSX_DIR, "readme_header.rtf") - with open(src) as fi: - data = fi.read() + src = OSX_DIR / "readme_header.rtf" + data = src.read_text() # This is necessary for when installing on case-sensitive macOS filesystems. data = data.replace("__NAME_LOWER__", info.get("pkg_name", info["name"]).lower()) @@ -99,7 +99,7 @@ def modify_xml(xml_path, info): title.text = f"{info['name']} {info['version']}" root.append(title) - license = ET.Element("license", file=info.get("license_file", "No license")) + license = ET.Element("license", file=str(info.get("license_file", "No license"))) root.append(license) # -- BACKGROUND -- # @@ -113,19 +113,19 @@ def modify_xml(xml_path, info): background_path = None else: write_images(info, PACKAGES_DIR, os="osx") - background_path = os.path.join(PACKAGES_DIR, "welcome.png") + background_path = PACKAGES_DIR / "welcome.png" elif "welcome_image_text" in info: write_images(info, PACKAGES_DIR, os="osx") - background_path = os.path.join(PACKAGES_DIR, "welcome.png") + background_path = PACKAGES_DIR / "welcome.png" else: # Default to Anaconda's logo if the keys above were not specified - background_path = join(OSX_DIR, "MacInstaller.png") + background_path = OSX_DIR / "MacInstaller.png" if background_path: logger.info("Using background image: %s", background_path) for key in ("background", "background-darkAqua"): background = ET.Element( - key, file=background_path, scaling="proportional", alignment="center" + key, file=str(background_path), scaling="proportional", alignment="center" ) root.append(background) @@ -135,9 +135,8 @@ def modify_xml(xml_path, info): if "welcome_file" in info and not info["welcome_file"].endswith(".nsi"): welcome_path = info["welcome_file"] elif "welcome_text" in info and info["welcome_text"]: - welcome_path = join(PACKAGES_DIR, "welcome.txt") - with open(welcome_path, "w") as f: - f.write(info["welcome_text"]) + welcome_path = PACKAGES_DIR / "welcome.txt" + welcome_path.write_text(info["welcome_text"]) else: welcome_path = None if info.get("welcome_file", "").endswith(".nsi"): @@ -145,7 +144,7 @@ def modify_xml(xml_path, info): if welcome_path: welcome = ET.Element( - "welcome", file=welcome_path, attrib={"mime-type": _detect_mimetype(welcome_path)} + "welcome", file=str(welcome_path), attrib={"mime-type": _detect_mimetype(welcome_path)} ) root.append(welcome) @@ -158,17 +157,16 @@ def modify_xml(xml_path, info): if not info["conclusion_text"]: conclusion_path = None else: - conclusion_path = join(PACKAGES_DIR, "conclusion.txt") - with open(conclusion_path, "w") as f: - f.write(info["conclusion_text"]) + conclusion_path = PACKAGES_DIR / "conclusion.txt" + conclusion_path.write_text(info["conclusion_text"]) else: - conclusion_path = join(OSX_DIR, "acloud.rtf") - if info.get("conclusion_file", "").endswith(".nsi"): + conclusion_path = OSX_DIR / "acloud.rtf" + if info.get("conclusion_file", Path()).name.endswith(".nsi"): logger.warning("NSI conclusion_file '%s' is ignored.", info["conclusion_file"]) if conclusion_path: conclusion = ET.Element( "conclusion", - file=conclusion_path, + file=str(conclusion_path), attrib={"mime-type": _detect_mimetype(conclusion_path)}, ) root.append(conclusion) @@ -181,16 +179,15 @@ def modify_xml(xml_path, info): if not info["readme_text"]: readme_path = None else: - readme_path = join(PACKAGES_DIR, "readme.txt") - with open(readme_path, "w") as f: - f.write(info["readme_text"]) + readme_path = PACKAGES_DIR / "readme.txt" + readme_path.write_text(info["readme_text"]) else: - readme_path = join(PACKAGES_DIR, "readme.rtf") + readme_path = PACKAGES_DIR / "readme.rtf" write_readme(readme_path, info) if readme_path: readme = ET.Element( - "readme", file=readme_path, attrib={"mime-type": _detect_mimetype(readme_path)} + "readme", file=str(readme_path), attrib={"mime-type": _detect_mimetype(readme_path)} ) root.append(readme) @@ -226,14 +223,14 @@ def modify_xml(xml_path, info): root.remove(path_choice) elif ident.endswith("prepare_installation"): path_choice.set("visible", "true") - path_choice.set("title", "Install {}".format(info["name"])) + path_choice.set("title", f"Install {info['name']}") path_choice.set("enabled", "false") elif ident.endswith("run_installation"): # We leave this one out on purpose! The user does not need to # know we separated the installation in two steps to accommodate # for the pre-install scripts optionality path_choice.set("visible", "false") - path_choice.set("title", "Apply {}".format(info["name"])) + path_choice.set("title", f"Apply {info['name']}") path_choice.set("enabled", "false") elif ident.endswith("shortcuts"): # Show this option if menu_packages was set to a non-empty value @@ -323,14 +320,13 @@ def modify_xml(xml_path, info): tree.write(xml_path) -def move_script(src, dst, info, ensure_shebang=False, user_script_type=None): +def move_script(src: Path, dst: Path, info, ensure_shebang=False, user_script_type=None): """ Fill template scripts checks_before_install.sh, prepare_installation.sh and others, and move them to the installer workspace. """ assert user_script_type in (None, "pre_install", "post_install") - with open(src) as fi: - data = fi.read() + data = src.read_text() # ppd hosts the conditions for the #if/#else/#endif preprocessors on scripts variables = ns_platform(info["_platform"]) @@ -370,7 +366,7 @@ def move_script(src, dst, info, ensure_shebang=False, user_script_type=None): with open(dst, "w") as fo: if ( ensure_shebang - and os.path.splitext(dst)[1] in ("", ".sh") + and dst.suffix in ("", ".sh") and not data.startswith(("#!/bin/bash", "#!/bin/sh")) ): # Shell scripts provided by the user require a shebang, otherwise it @@ -378,13 +374,13 @@ def move_script(src, dst, info, ensure_shebang=False, user_script_type=None): # We only handle shell scripts this way fo.write("#!/bin/bash\n") fo.write(data) - os.chmod(dst, 0o755) + dst.chmod(0o755) -def fresh_dir(dir_path): +def fresh_dir(dir_path: Path): rm_rf(dir_path) - assert not exists(dir_path) - os.mkdir(dir_path) + assert not dir_path.exists() + dir_path.mkdir() def pkgbuild(name, identifier=None, version=None, install_location=None): @@ -401,14 +397,14 @@ def pkgbuild(name, identifier=None, version=None, install_location=None): "preserve", ] - if isdir(SCRIPTS_DIR) and os.listdir(SCRIPTS_DIR): + if SCRIPTS_DIR.is_dir() and os.listdir(SCRIPTS_DIR): args += ["--scripts", SCRIPTS_DIR] if version: args += ["--version", version] if install_location is not None: args += ["--install-location", install_location] - output = os.path.join(PACKAGES_DIR, f"{name}.pkg") - args += [output] + output = PACKAGES_DIR / f"{name}.pkg" + args.append(output) explained_check_call(args) return output @@ -430,7 +426,7 @@ def pkgbuild_prepare_installation(info): try: # expand to apply patches explained_check_call(["pkgutil", "--expand", pkg, f"{pkg}.expanded"]) - payload_xml = os.path.join(f"{pkg}.expanded", "PackageInfo") + payload_xml = f"{pkg}.expanded/PackageInfo" tree = ET.parse(payload_xml) root = tree.getroot() payload = root.find("payload") @@ -443,7 +439,7 @@ def pkgbuild_prepare_installation(info): shutil.rmtree(f"{pkg}.expanded") -def create_plugins(pages: list = None, codesigner: CodeSign = None): +def create_plugins(pages: list[Path] | None = None, codesigner: CodeSign = None): def _build_xcode_projects(xcodeporj_dirs: list[Path]): xcodebuild = shutil.which("xcodebuild") if not xcodebuild: @@ -470,20 +466,18 @@ def _build_xcode_projects(xcodeporj_dirs: list[Path]): if not pages: return - elif isinstance(pages, str): + if isinstance(pages, Path): pages = [pages] fresh_dir(PLUGINS_DIR) for page in pages: - xcodeproj_dirs = [ - file.resolve() for file in Path(page).iterdir() if file.suffix == ".xcodeproj" - ] + xcodeproj_dirs = [file.resolve() for file in page.iterdir() if file.suffix == ".xcodeproj"] if xcodeproj_dirs: _build_xcode_projects(xcodeproj_dirs) else: - plugin_name = os.path.basename(page) - page_in_plugins = join(PLUGINS_DIR, plugin_name) + plugin_name = page.name + page_in_plugins = PLUGINS_DIR / plugin_name shutil.copytree(page, page_in_plugins) if codesigner: @@ -499,7 +493,7 @@ def _build_xcode_projects(xcodeporj_dirs: list[Path]): os.unlink(entitlements.name) plugins = [file.name for file in Path(PLUGINS_DIR).iterdir()] - with open(join(PLUGINS_DIR, "InstallerSections.plist"), "wb") as f: + with open(PLUGINS_DIR / "InstallerSections.plist", "wb") as f: plist = { "SectionOrder": [ "Introduction", @@ -517,7 +511,7 @@ def _build_xcode_projects(xcodeporj_dirs: list[Path]): def pkgbuild_script(name, info, src, dst="postinstall", **kwargs): fresh_dir(SCRIPTS_DIR) fresh_dir(PACKAGE_ROOT) - move_script(join(OSX_DIR, src), join(SCRIPTS_DIR, dst), info, **kwargs) + move_script(OSX_DIR / src, SCRIPTS_DIR / dst, info, **kwargs) pkgbuild( name, identifier=info.get("reverse_domain_identifier"), @@ -541,13 +535,13 @@ def create(info, verbose=False): global CACHE_DIR, PACKAGE_ROOT, PACKAGES_DIR, PLUGINS_DIR, SCRIPTS_DIR CACHE_DIR = info["_download_dir"] - SCRIPTS_DIR = join(CACHE_DIR, "scripts") - PACKAGE_ROOT = join(CACHE_DIR, "package_root") - PACKAGES_DIR = join(CACHE_DIR, "built_pkgs") - PLUGINS_DIR = join(CACHE_DIR, "plugins") + SCRIPTS_DIR = CACHE_DIR / "scripts" + PACKAGE_ROOT = CACHE_DIR / "package_root" + PACKAGES_DIR = CACHE_DIR / "built_pkgs" + PLUGINS_DIR = CACHE_DIR / "plugins" fresh_dir(PACKAGES_DIR) - prefix = join(PACKAGE_ROOT, info.get("pkg_name", info["name"]).lower()) + prefix = PACKAGE_ROOT / info.get("pkg_name", info["name"]).lower() # We need to split tasks in sub-PKGs so the GUI allows the user to enable/disable # the ones marked as optional. Optionality is controlled in modify_xml() by @@ -561,23 +555,23 @@ def create(info, verbose=False): # We first populate PACKAGE_ROOT with everything needed, and then run pkg build on that dir fresh_dir(PACKAGE_ROOT) fresh_dir(SCRIPTS_DIR) - pkgs_dir = join(prefix, "pkgs") - os.makedirs(pkgs_dir) + pkgs_dir = prefix / "pkgs" + pkgs_dir.mkdir(parents=True, exist_ok=True) preconda.write_files(info, prefix) preconda.copy_extra_files(info.get("extra_files", []), prefix) # These are the user-provided scripts, maybe patched to have a shebang # They will be called by a wrapping script added later, if present if info.get("pre_install"): move_script( - abspath(info["pre_install"]), - abspath(join(pkgs_dir, "user_pre_install")), + info["pre_install"].resolve(), + (pkgs_dir / "user_pre_install").resolve(), info, ensure_shebang=True, ) if info.get("post_install"): move_script( - abspath(info["post_install"]), - abspath(join(pkgs_dir, "user_post_install")), + info["post_install"].resolve(), + (pkgs_dir / "user_post_install").resolve(), info, ensure_shebang=True, ) @@ -585,9 +579,9 @@ def create(info, verbose=False): all_dists = info["_dists"].copy() for env_info in info.get("_extra_envs_info", {}).values(): all_dists += env_info["_dists"] - all_dists = list({dist: None for dist in all_dists}) # de-duplicate + all_dists = list(dict.fromkeys(all_dists)) # de-duplicate for dist in all_dists: - os.link(join(CACHE_DIR, dist), join(pkgs_dir, dist)) + os.link(CACHE_DIR / dist, pkgs_dir / dist) copy_conda_exe(prefix, "_conda", info["_conda_exe"]) @@ -604,13 +598,13 @@ def create(info, verbose=False): "com.apple.security.cs.disable-library-validation": True, "com.apple.security.cs.allow-dyld-environment-variables": True, } - codesigner.sign_bundle(join(prefix, "_conda"), entitlements=entitlements) + codesigner.sign_bundle(prefix / "_conda", entitlements=entitlements) # This script checks to see if the install location already exists and/or contains spaces # Not to be confused with the user-provided pre_install! - move_script(join(OSX_DIR, "checks_before_install.sh"), join(SCRIPTS_DIR, "preinstall"), info) + move_script(OSX_DIR / "checks_before_install.sh", SCRIPTS_DIR / "preinstall", info) # This script populates the cache, mainly - move_script(join(OSX_DIR, "prepare_installation.sh"), join(SCRIPTS_DIR, "postinstall"), info) + move_script(OSX_DIR / "prepare_installation.sh", SCRIPTS_DIR / "postinstall", info) pkgbuild_prepare_installation(info) names = ["prepare_installation"] @@ -651,11 +645,11 @@ def create(info, verbose=False): # The default distribution file needs to be modified, so we create # it to a temporary location, edit it, and supply it to the final call. - xml_path = join(PACKAGES_DIR, "distribution.xml") + xml_path = PACKAGES_DIR / "distribution.xml" # hardcode to system location to avoid accidental clobber in PATH args = ["/usr/bin/productbuild", "--synthesize"] for name in names: - args.extend(["--package", join(PACKAGES_DIR, "%s.pkg" % name)]) + args.extend(["--package", PACKAGES_DIR / f"{name}.pkg"]) args.append(xml_path) explained_check_call(args) modify_xml(xml_path, info) diff --git a/constructor/preconda.py b/constructor/preconda.py index 2beeb3dc6..e85e4a744 100644 --- a/constructor/preconda.py +++ b/constructor/preconda.py @@ -9,16 +9,12 @@ from __future__ import annotations -import os import platform import shutil import sys import time -from os.path import isdir, join -from os.path import split as path_split from pathlib import Path from textwrap import dedent -from typing import TYPE_CHECKING from . import __version__ as CONSTRUCTOR_VERSION from .conda_interface import ( @@ -40,9 +36,6 @@ shortcuts_flags, ) -if TYPE_CHECKING: - from collections.abc import Mapping - try: import json except ImportError: @@ -56,11 +49,9 @@ ) -def write_index_cache(info, dst_dir, used_packages): - cache_dir = join(dst_dir, "cache") - - if not isdir(cache_dir): - os.makedirs(cache_dir) +def write_index_cache(info, dst_dir: Path, used_packages): + cache_dir = dst_dir / "cache" + cache_dir.mkdir(parents=True, exist_ok=True) _platforms = info["_platform"], "noarch" _remap_configs = list(info.get("channels_remap", [])) @@ -108,9 +99,9 @@ def write_index_cache(info, dst_dir, used_packages): if repodata is not None: write_repodata(cache_dir, url, repodata, used_packages, info) - for cache_file in os.listdir(cache_dir): - if not cache_file.endswith(".json"): - os.unlink(join(cache_dir, cache_file)) + for cache_file in cache_dir.glob("*"): + if not cache_file.suffix == ".json": + cache_file.unlink() def system_info(): @@ -140,7 +131,7 @@ def system_info(): return out -def write_files(info: dict, workspace: str): +def write_files(info: dict, workspace: Path): """ Prepare files on disk to be shipped as part of the pre-conda payload, mostly configuration and metadata files: @@ -158,11 +149,10 @@ def write_files(info: dict, workspace: str): - Their corresponding `pkgs/channels.txt` and `pkgs/shortcuts.txt` under `pkgs/envs/`. """ - os.makedirs(join(workspace, "conda-meta"), exist_ok=True) - pkgs_dir = join(workspace, "pkgs") - os.makedirs(pkgs_dir, exist_ok=True) - with open(join(pkgs_dir, ".constructor-build.info"), "w") as fo: - json.dump(system_info(), fo) + (workspace / "conda-meta").mkdir(exist_ok=True) + pkgs_dir = workspace / "pkgs" + pkgs_dir.mkdir(exist_ok=True) + (pkgs_dir / ".constructor-build.info").write_text(json.dumps(system_info())) all_urls = info["_urls"].copy() for env_info in info.get("_extra_envs_info", {}).values(): @@ -171,7 +161,7 @@ def write_files(info: dict, workspace: str): final_urls_md5s = tuple((get_final_url(info, url), md5) for url, md5 in info["_urls"]) all_final_urls_md5s = tuple((get_final_url(info, url), md5) for url, md5 in all_urls) - with open(join(pkgs_dir, "urls"), "w") as fo: + with open(pkgs_dir / "urls", "w") as fo: for url, md5 in all_final_urls_md5s: maybe_different_url = ensure_transmuted_ext(info, url) if maybe_different_url != url: # transmuted, no md5 @@ -179,9 +169,7 @@ def write_files(info: dict, workspace: str): else: fo.write(f"{url}#{md5}\n") - with open(join(pkgs_dir, "urls.txt"), "w") as fo: - for url, _ in all_final_urls_md5s: - fo.write("%s\n" % url) + (pkgs_dir / "urls.txt").write_text("".join([f"{url}\n" for url, _ in all_final_urls_md5s])) all_dists = info["_dists"].copy() for env_info in info.get("_extra_envs_info", {}).values(): @@ -191,23 +179,23 @@ def write_files(info: dict, workspace: str): write_index_cache(info, pkgs_dir, all_dists) # base environment conda-meta - write_conda_meta(info, join(workspace, "conda-meta"), final_urls_md5s) + write_conda_meta(info, workspace / "conda-meta", final_urls_md5s) write_repodata_record(info, pkgs_dir) # base environment file used with conda install --file # (list of specs/dists to install) - write_initial_state_explicit_txt(info, join(workspace, "conda-meta"), final_urls_md5s) + write_initial_state_explicit_txt(info, workspace / "conda-meta", final_urls_md5s) for fn in files: - os.chmod(join(workspace, fn), 0o664) + (workspace / fn).chmod(0o664) for env_name, env_info in info.get("_extra_envs_info", {}).items(): env_config = info["extra_envs"][env_name] - env_pkgs = os.path.join(workspace, "pkgs", "envs", env_name) - env_conda_meta = os.path.join(workspace, "envs", env_name, "conda-meta") - os.makedirs(env_pkgs, exist_ok=True) - os.makedirs(env_conda_meta, exist_ok=True) + env_pkgs = workspace / "pkgs" / "envs" / env_name + env_conda_meta = workspace / "envs" / env_name / "conda-meta" + env_pkgs.mkdir(parents=True, exist_ok=True) + env_conda_meta.mkdir(parents=True, exist_ok=True) # environment conda-meta env_urls_md5 = tuple((get_final_url(info, url), md5) for url, md5 in env_info["_urls"]) user_requested_specs = env_config.get("user_requested_specs", env_config.get("specs", ())) @@ -220,11 +208,11 @@ def write_files(info: dict, workspace: str): write_shortcuts_txt(info, env_pkgs, env_config) -def write_conda_meta(info, dst_dir, final_urls_md5s, user_requested_specs=None): +def write_conda_meta(info, dst_dir: Path, final_urls_md5s, user_requested_specs=None): if user_requested_specs is None: user_requested_specs = info.get("user_requested_specs", info.get("specs", ())) - cmd = path_split(sys.argv[0])[-1] + cmd = Path(sys.argv[0]).name if len(sys.argv) > 1: cmd = "%s %s" % (cmd, " ".join(sys.argv[1:])) @@ -240,11 +228,10 @@ def write_conda_meta(info, dst_dir, final_urls_md5s, user_requested_specs=None): builder.append("# update specs: %s" % update_specs) builder.append("\n") - with open(join(dst_dir, "history"), "w") as fh: - fh.write("\n".join(builder)) + (dst_dir / "history").write_text("\n".join(builder)) -def write_repodata_record(info, dst_dir): +def write_repodata_record(info, dst_dir: Path): all_dists = info["_dists"].copy() for env_data in info.get("_extra_envs_info", {}).values(): all_dists += env_data["_dists"] @@ -253,22 +240,15 @@ def write_repodata_record(info, dst_dir): _dist = filename_dist(dist)[:-6] elif filename_dist(dist).endswith(".tar.bz2"): _dist = filename_dist(dist)[:-8] - record_file = join(_dist, "info", "repodata_record.json") - record_file_src = join(info["_download_dir"], record_file) - - with open(record_file_src) as rf: - rr_json = json.load(rf) - + record_file = Path(_dist, "info", "repodata_record.json") + record_file_src = info["_download_dir"] / record_file + record_file_dst = dst_dir / record_file + rr_json = json.loads(record_file_src.read_text()) rr_json["url"] = get_final_url(info, rr_json["url"]) rr_json["channel"] = get_final_url(info, rr_json["channel"]) - if not isdir(join(dst_dir, _dist, "info")): - os.makedirs(join(dst_dir, _dist, "info")) - - record_file_dest = join(dst_dir, record_file) - - with open(record_file_dest, "w") as rf: - json.dump(rr_json, rf, indent=2, sort_keys=True) + record_file_dst.parent.mkdir(parents=True, exist_ok=True) + record_file_dst.write_text(json.dumps(rr_json, indent=2, sort_keys=True)) def write_initial_state_explicit_txt(info, dst_dir, urls): @@ -283,7 +263,7 @@ def write_initial_state_explicit_txt(info, dst_dir, urls): @EXPLICIT """ ).lstrip() - with open(join(dst_dir, "initial-state.explicit.txt"), "w") as envf: + with open(dst_dir / "initial-state.explicit.txt", "w") as envf: envf.write(header) for url, md5 in urls: maybe_different_url = ensure_transmuted_ext(info, url) @@ -293,15 +273,14 @@ def write_initial_state_explicit_txt(info, dst_dir, urls): envf.write(f"{url}#{md5}\n") -def write_channels_txt(info, dst_dir, env_config): +def write_channels_txt(info, dst_dir: Path, env_config): env_config = env_config.copy() if "channels" not in env_config: env_config["channels"] = info.get("channels", ()) if "channels_remap" not in env_config: env_config["channels_remap"] = info.get("channels_remap", ()) - with open(join(dst_dir, "channels.txt"), "w") as f: - f.write(",".join(get_final_channels(env_config))) + (dst_dir / "channels.txt").write_text(",".join(get_final_channels(env_config))) def write_shortcuts_txt(info, dst_dir, env_config): @@ -309,13 +288,12 @@ def write_shortcuts_txt(info, dst_dir, env_config): contents = shortcuts_flags(env_config) else: contents = shortcuts_flags(info) - with open(join(dst_dir, "shortcuts.txt"), "w") as f: - f.write(contents) + (dst_dir / "shortcuts.txt").write_text(contents) def copy_extra_files( - extra_files: list[os.PathLike | Mapping], workdir: os.PathLike -) -> list[os.PathLike]: + extra_files: list[str | Path | dict[str | Path, str]], workdir: Path +) -> list[Path]: """Copy list of extra files to a working directory Args: @@ -326,14 +304,15 @@ def copy_extra_files( FileNotFoundError: Raises when the file isn't found. Returns: - list[os.PathLike]: List of normalized paths of copied locations. + list[Path]: List of normalized paths of copied locations. """ if not extra_files: return [] copied = [] + workdir = Path(workdir) for path in extra_files: - if isinstance(path, str): - copied.append(shutil.copy(path, workdir)) + if isinstance(path, (str, Path)): + copied.append(Path(shutil.copy(path, workdir))) elif isinstance(path, dict): assert len(path) == 1 origin, destination = next(iter(path.items())) @@ -342,5 +321,5 @@ def copy_extra_files( raise FileNotFoundError(f"File {origin} does not exist.") dest_path = Path(workdir) / destination dest_path.parent.mkdir(parents=True, exist_ok=True) - copied.append(shutil.copy(orig_path, dest_path)) + copied.append(Path(shutil.copy(orig_path, dest_path))) return copied diff --git a/constructor/shar.py b/constructor/shar.py index f6361a0f9..febe369e5 100644 --- a/constructor/shar.py +++ b/constructor/shar.py @@ -18,7 +18,8 @@ import tempfile from contextlib import nullcontext from io import BytesIO -from os.path import basename, dirname, getsize, isdir, join, relpath +from os.path import getsize +from pathlib import Path from .construct import ns_platform from .jinja import render_template @@ -37,7 +38,7 @@ shortcuts_flags, ) -THIS_DIR = dirname(__file__) +THIS_DIR = Path(__file__).parent logger = logging.getLogger(__name__) @@ -53,10 +54,9 @@ def make_executable(tarinfo): def read_header_template(): - path = join(THIS_DIR, "header.sh") + path = THIS_DIR / "header.sh" logger.info("Reading: %s", path) - with open(path) as fi: - return fi.read() + return path.read_text() def get_header(conda_exec, tarball, info): @@ -118,28 +118,25 @@ def get_header(conda_exec, tarball, info): return render_template(read_header_template(), **variables) -def create(info, verbose=False): - tmp_dir_base_path = join(dirname(info["_outpath"]), "tmp") - try: - os.makedirs(tmp_dir_base_path) - except Exception: - pass - tmp_dir = tempfile.mkdtemp(dir=tmp_dir_base_path) +def create(info, verbose: bool = False): + tmp_dir_base_path = info["_outpath"].parent / "tmp" + tmp_dir_base_path.mkdir(parents=True, exist_ok=True) + tmp_dir = Path(tempfile.mkdtemp(dir=tmp_dir_base_path)) preconda_write_files(info, tmp_dir) - preconda_tarball = join(tmp_dir, "preconda.tar.bz2") - postconda_tarball = join(tmp_dir, "postconda.tar.bz2") + preconda_tarball = tmp_dir / "preconda.tar.bz2" + postconda_tarball = tmp_dir / "postconda.tar.bz2" pre_t = tarfile.open(preconda_tarball, "w:bz2") post_t = tarfile.open(postconda_tarball, "w:bz2") for rel_path in preconda_files: - pre_t.add(join(tmp_dir, rel_path), rel_path) + pre_t.add(tmp_dir / rel_path, rel_path) for env_name in info.get("_extra_envs_info", ()): for rel_path in ( f"pkgs/envs/{env_name}/shortcuts.txt", f"envs/{env_name}/conda-meta/initial-state.explicit.txt", ): - pre_t.add(join(tmp_dir, rel_path), rel_path) + pre_t.add(tmp_dir / rel_path, rel_path) for key in "pre_install", "post_install": if key in info: @@ -148,11 +145,10 @@ def create(info, verbose=False): "pkgs/%s.sh" % key, filter=make_executable if has_shebang(info[key]) else None, ) - cache_dir = join(tmp_dir, "cache") - if isdir(cache_dir): - for cf in os.listdir(cache_dir): - if cf.endswith(".json"): - pre_t.add(join(cache_dir, cf), "pkgs/cache/" + cf) + cache_dir = tmp_dir / "cache" + if cache_dir.is_dir(): + for cf in cache_dir.glob("*.json"): + pre_t.add(cf, "pkgs/cache/" + cf.name) all_dists = info["_dists"].copy() for env_data in info.get("_extra_envs_info", {}).values(): @@ -164,36 +160,36 @@ def create(info, verbose=False): _dist = filename_dist(dist)[:-6] elif filename_dist(dist).endswith(".tar.bz2"): _dist = filename_dist(dist)[:-8] - record_file = join(_dist, "info", "repodata_record.json") - record_file_src = join(tmp_dir, "pkgs", record_file) - record_file_dest = join("pkgs", record_file) + record_file = f"{_dist}/info/repodata_record.json" + record_file_src = tmp_dir / "pkgs" / record_file + record_file_dest = f"pkgs/{record_file}" pre_t.add(record_file_src, record_file_dest) pre_t.addfile(tarinfo=tarfile.TarInfo("conda-meta/history")) - post_t.add(join(tmp_dir, "conda-meta", "history"), "conda-meta/history") + post_t.add(tmp_dir / "conda-meta" / "history", "conda-meta/history") for env_name in info.get("_extra_envs_info", {}): pre_t.addfile(tarinfo=tarfile.TarInfo(f"envs/{env_name}/conda-meta/history")) post_t.add( - join(tmp_dir, "envs", env_name, "conda-meta", "history"), + tmp_dir / "envs" / env_name / "conda-meta" / "history", f"envs/{env_name}/conda-meta/history", ) extra_files = copy_extra_files(info.get("extra_files", []), tmp_dir) for path in extra_files: - post_t.add(path, relpath(path, tmp_dir)) + post_t.add(path, path.relative_to(tmp_dir)) pre_t.close() post_t.close() - tarball = join(tmp_dir, "pkgs", "tmp.tar") + tarball = tmp_dir / "pkgs" / "tmp.tar" t = tarfile.open(tarball, "w") - t.add(preconda_tarball, basename(preconda_tarball)) - t.add(postconda_tarball, basename(postconda_tarball)) + t.add(preconda_tarball, preconda_tarball.name) + t.add(postconda_tarball, postconda_tarball.name) if "license_file" in info: t.add(info["license_file"], "LICENSE.txt") for dist in all_dists: fn = filename_dist(dist) - t.add(join(info["_download_dir"], fn), "pkgs/" + fn) + t.add(info["_download_dir"] / fn, f"pkgs/{fn}") t.close() info["_internal_conda_files"] = copy_conda_exe(tmp_dir, "_conda", info["_conda_exe"]) @@ -205,7 +201,7 @@ def create(info, verbose=False): for path in info["_internal_conda_files"]: relative_path = str(path.relative_to(tmp_dir)) memfile.write(path.read_bytes()) - size = os.path.getsize(path) + size = getsize(path) end = start + size executable = os.access(path, os.X_OK) conda_exe_payloads[relative_path] = (start, end, executable) @@ -223,7 +219,11 @@ def create(info, verbose=False): with open(shar_path, "wb") as fo: fo.write(header.encode("utf-8")) for payload in [conda_exec, *maybe_memfile, tarball]: - with open(payload, "rb") if isinstance(payload, str) else nullcontext(payload) as fi: + with ( + open(payload, "rb") + if isinstance(payload, (str, Path)) + else nullcontext(payload) as fi + ): while True: chunk = fi.read(262144) if not chunk: diff --git a/constructor/utils.py b/constructor/utils.py index 33a629b7d..6c2bf3a53 100644 --- a/constructor/utils.py +++ b/constructor/utils.py @@ -17,8 +17,7 @@ import sys import warnings from io import StringIO -from os import environ, sep, unlink -from os.path import isdir, isfile, islink, join, normpath +from os import environ from pathlib import Path from shutil import rmtree from subprocess import CalledProcessError, check_call, check_output @@ -40,11 +39,11 @@ def explained_check_call(args): """ Execute a system process and debug the invocation """ - logger.debug("Executing: %s", " ".join(args)) + logger.debug("Executing: %s", " ".join(map(str, args))) return check_call(args) -def filename_dist(dist): +def filename_dist(dist) -> str: """Return the filename of a distribution.""" if hasattr(dist, "to_filename"): return dist.to_filename() @@ -242,22 +241,17 @@ def get_final_channels(info): return mapped_channels -def normalize_path(path): - new_path = normpath(path) - return new_path.replace(sep + sep, sep) - - -def rm_rf(path): +def rm_rf(path: Path): """ try to delete path, but never fail """ try: - if islink(path) or isfile(path): + if path.is_symlink() or path.is_file(): # Note that we have to check if the destination is a link because # exists('/path/to/dead-link') will return False, although # islink('/path/to/dead-link') is True. - unlink(path) - elif isdir(path): + path.unlink() + elif path.is_dir(): rmtree(path) except OSError: pass @@ -306,15 +300,15 @@ def approx_size_kb(info, which="pkgs"): def copy_conda_exe( - target_directory: str | Path, + target_directory: Path, target_conda_exe_name: str | None = None, - conda_exe: str | Path | None = None, + conda_exe: Path | None = None, ) -> list[Path]: if conda_exe is None: - conda_exe = normalize_path(join(sys.prefix, "standalone_conda", "conda.exe")) + conda_exe = Path(sys.prefix, "standalone_conda", "conda.exe") if target_conda_exe_name is None: target_conda_exe_name = Path(conda_exe).name - shutil.copyfile(conda_exe, join(target_directory, target_conda_exe_name)) + shutil.copyfile(conda_exe, target_directory / target_conda_exe_name) if (internal_dir := Path(conda_exe).parent / "_internal").is_dir(): # onedir conda-standalone variant, copy that too shutil.copytree(internal_dir, Path(target_directory, "_internal"), dirs_exist_ok=True) @@ -322,11 +316,9 @@ def copy_conda_exe( return [] -def identify_conda_exe(conda_exe: str | Path | None = None) -> tuple[StandaloneExe, str]: +def identify_conda_exe(conda_exe: Path | None = None) -> tuple[StandaloneExe, str]: if conda_exe is None: - conda_exe = normalize_path(join(sys.prefix, "standalone_conda", "conda.exe")) - if isinstance(conda_exe, Path): - conda_exe = str(conda_exe) + conda_exe = Path(sys.prefix, "standalone_conda", "conda.exe") try: output_version = check_output([conda_exe, "--version"], text=True) output_version = output_version.strip() @@ -343,13 +335,14 @@ def identify_conda_exe(conda_exe: str | Path | None = None) -> tuple[StandaloneE return None, None -def win_str_esc(s, newlines=True): +def win_str_esc(s, newlines=True) -> str: + s = str(s) maps = [("$", "$$"), ('"', '$\\"'), ("\t", "$\\t")] if newlines: maps.extend([("\n", "$\\n"), ("\r", "$\\r")]) for a, b in maps: s = s.replace(a, b) - return '"%s"' % s + return f'"{s}"' def check_required_env_vars(env_vars): diff --git a/constructor/winexe.py b/constructor/winexe.py index 3c137f5c1..c88c1bf90 100644 --- a/constructor/winexe.py +++ b/constructor/winexe.py @@ -14,9 +14,10 @@ import shutil import sys import tempfile -from os.path import abspath, basename, dirname, isfile, join +from os.path import join from pathlib import Path from subprocess import check_output, run +from textwrap import dedent from .construct import ns_platform from .imaging import write_images @@ -35,17 +36,16 @@ win_str_esc, ) -NSIS_DIR = join(abspath(dirname(__file__)), "nsis") -MAKENSIS_EXE = abspath(join(sys.prefix, "NSIS", "makensis.exe")) +NSIS_DIR = (Path(__file__).parent / "nsis").resolve() +MAKENSIS_EXE = Path(sys.prefix, "NSIS", "makensis.exe").resolve() logger = logging.getLogger(__name__) def read_nsi_tmpl(info) -> str: - path = abspath(info.get("nsis_template", join(NSIS_DIR, "main.nsi.tmpl"))) + path = Path(info.get("nsis_template", NSIS_DIR / "main.nsi.tmpl")).resolve() logger.info("Reading: %s", path) - with open(path) as fi: - return fi.read() + return path.read_text() def get_extra_files(paths, common_parent): @@ -84,9 +84,9 @@ def setup_envs_commands(info, dir_path): # initial-state.explicit.txt as seen by the running installer "lockfile_txt": r"$INSTDIR\conda-meta\initial-state.explicit.txt", # initial-state.explicit.txt path while building the installer - "lockfile_txt_abspath": join(dir_path, "conda-meta", "initial-state.explicit.txt"), + "lockfile_txt_abspath": dir_path / "conda-meta" / "initial-state.explicit.txt", "conda_meta": r"$INSTDIR\conda-meta", - "history_abspath": join(dir_path, "conda-meta", "history"), + "history_abspath": dir_path / "conda-meta" / "history", "final_channels": get_final_channels(info), "shortcuts": shortcuts_flags(info), "register_envs": str(info.get("register_envs", True)).lower(), @@ -107,14 +107,14 @@ def setup_envs_commands(info, dir_path): { "name": env_name, "prefix": join("$INSTDIR", "envs", env_name), - "lockfile_txt": join( + "lockfile_txt": Path( "$INSTDIR", "envs", env_name, "conda-meta", "initial-state.explicit.txt" ), - "lockfile_txt_abspath": join( + "lockfile_txt_abspath": Path( dir_path, "envs", env_name, "conda-meta", "initial-state.explicit.txt" ), "conda_meta": join("$INSTDIR", "envs", env_name, "conda-meta"), - "history_abspath": join(dir_path, "envs", env_name, "conda-meta", "history"), + "history_abspath": dir_path / "envs" / env_name / "conda-meta" / "history", "final_channels": get_final_channels(channel_info), "shortcuts": shortcuts_flags(env_info), "register_envs": str(info.get("register_envs", True)).lower(), @@ -175,7 +175,7 @@ def make_nsi( "iconfile": "@icon.ico", "headerimage": "@header.bmp", "welcomeimage": "@welcome.bmp", - "licensefile": abspath(info.get("license_file", join(NSIS_DIR, "placeholder_license.txt"))), + "licensefile": info.get("license_file", NSIS_DIR / "placeholder_license.txt").resolve(), "conda_history": "@" + join("conda-meta", "history"), "conda_exe": "@_conda.exe", "urls_file": "@" + join("pkgs", "urls"), @@ -199,7 +199,7 @@ def make_nsi( value = info.get(key, "") if not value: continue - if isinstance(value, str) and not value.endswith(".nsi"): + if isinstance(value, (str, Path)) and not value.endswith(".nsi"): logger.warning( "On Windows, %s must be an .nsi file; %s will be ignored.", key, @@ -208,7 +208,7 @@ def make_nsi( elif isinstance(value, list): valid_values = [] for val in value: - if val.endswith(".nsi"): + if val.suffix == ".nsi": valid_values.append(val) else: logger.warning( @@ -220,7 +220,7 @@ def make_nsi( for key, value in variables.items(): if isinstance(value, str) and value.startswith("@"): - value = join(dir_path, value[1:]) + value = dir_path / value[1:] variables[key] = win_str_esc(value) # From now on, the items added to variables will NOT be escaped @@ -276,7 +276,7 @@ def make_nsi( if variables["custom_conclusion"] else "" ) - if isinstance(info.get("post_install_pages"), str): + if isinstance(info.get("post_install_pages"), (str, Path)): variables["POST_INSTALL_PAGES"] = [custom_nsi_insert_from_file(info["post_install_pages"])] else: variables["POST_INSTALL_PAGES"] = [ @@ -286,7 +286,7 @@ def make_nsi( variables["VIRTUAL_SPECS"] = " ".join([f'"{spec}"' for spec in info.get("virtual_specs", ())]) # This is the same but without quotes so we can print it fine variables["VIRTUAL_SPECS_DEBUG"] = " ".join([spec for spec in info.get("virtual_specs", ())]) - variables["LICENSEFILENAME"] = basename(info.get("license_file", "placeholder_license.txt")) + variables["LICENSEFILENAME"] = Path(info.get("license_file", "placeholder_license.txt")).name variables["NO_RCS_ARG"] = info.get("_ignore_condarcs_arg", "") data = render_template(read_nsi_tmpl(info), **variables) @@ -299,16 +299,14 @@ def make_nsi( break data = "\n".join(data_lines) - nsi_path = join(dir_path, "main.nsi") - with open(nsi_path, "w") as fo: - fo.write(data) + nsi_path = dir_path / "main.nsi" + nsi_path.write_text(data) # Uncomment to see the file for debugging # with open('main.nsi', 'w') as fo: # fo.write(data) # Copy all the NSIS header files (*.nsh) - for fn in os.listdir(NSIS_DIR): - if fn.endswith(".nsh"): - shutil.copy(join(NSIS_DIR, fn), join(dir_path, fn)) + for nsh in NSIS_DIR.glob("*.nsh"): + shutil.copy(nsh, dir_path / nsh.name) logger.info("Created %s file", nsi_path) return nsi_path @@ -316,14 +314,15 @@ def make_nsi( def verify_nsis_install(): logger.info("Checking for '%s'", MAKENSIS_EXE) - if not isfile(MAKENSIS_EXE): + if not MAKENSIS_EXE.is_file(): sys.exit( - """ -Error: no file %s - please make sure nsis is installed: - > conda install nsis -""" - % MAKENSIS_EXE + dedent( + f""" + Error: no file {MAKENSIS_EXE} + please make sure nsis is installed: + > conda install nsis + """ + ).lstrip() ) if sys.platform == "win32": out = check_output([MAKENSIS_EXE, "/VERSION"]) @@ -332,8 +331,8 @@ def verify_nsis_install(): out = out.decode("utf-8").strip() logger.info("NSIS version: %s", out) for dn in "x86-unicode", "x86-ansi", ".": - untgz_dll = abspath(join(sys.prefix, "NSIS", "Plugins", dn, "untgz.dll")) - if isfile(untgz_dll): + untgz_dll = Path(sys.prefix, "NSIS", "Plugins", dn, "untgz.dll").resolve() + if untgz_dll.is_file(): break else: sys.exit("Error: no file untgz.dll") @@ -350,30 +349,31 @@ def create(info, verbose=False): else: raise ValueError(f"Unknown signing tool: {signing_tool_name}") signing_tool.verify_signing_tool() - tmp_dir = tempfile.mkdtemp() + + tmp_dir_base_path = info["_outpath"].parent / "tmp" + tmp_dir_base_path.mkdir(parents=True, exist_ok=True) + tmp_dir = Path(tempfile.mkdtemp(dir=tmp_dir_base_path)) preconda_write_files(info, tmp_dir) copied_extra_files = copy_extra_files(info.get("extra_files", []), tmp_dir) copied_temp_extra_files = copy_extra_files(info.get("temp_extra_files", []), tmp_dir) extra_conda_exe_files = copy_conda_exe(tmp_dir, "_conda.exe", info["_conda_exe"]) - pre_dst = join(tmp_dir, "pre_install.bat") + pre_dst = tmp_dir / "pre_install.bat" pre_install_script = info.get("pre_install") if pre_install_script: shutil.copy(pre_install_script, pre_dst) - post_dst = join(tmp_dir, "post_install.bat") + post_dst = tmp_dir / "post_install.bat" try: shutil.copy(info["post_install"], post_dst) except KeyError: - with open(post_dst, "w") as fo: - fo.write(":: this is an empty post install .bat script\n") + post_dst.write_text(":: this is an empty post install .bat script\n") - preun_dst = join(tmp_dir, "pre_uninstall.bat") + preun_dst = tmp_dir / "pre_uninstall.bat" try: shutil.copy(info["pre_uninstall"], preun_dst) except KeyError: - with open(preun_dst, "w") as fo: - fo.write(":: this is an empty pre uninstall .bat script\n") + preun_dst.write_text(":: this is an empty pre uninstall .bat script\n") write_images(info, tmp_dir) nsi = make_nsi( diff --git a/scripts/make_docs.py b/scripts/make_docs.py index 51bbaf67a..03529ee93 100644 --- a/scripts/make_docs.py +++ b/scripts/make_docs.py @@ -3,7 +3,7 @@ """ import sys -from os.path import dirname, join +from pathlib import Path import jinja2 @@ -11,9 +11,8 @@ from constructor.conda_interface import SUPPORTED_PLATFORMS from constructor.construct import ns_platform -REPO_ROOT = dirname(dirname(__file__)) - -sys.path.insert(0, REPO_ROOT) +REPO_ROOT = Path(__file__).parent.parent +sys.path.insert(0, str(REPO_ROOT)) valid_selectors = ns_platform(sys.platform) @@ -93,8 +92,5 @@ def generate_key_info_dict(): supported_platforms=SUPPORTED_PLATFORMS, ) -with open(join(REPO_ROOT, "CONSTRUCT.md"), "w") as f: - f.write(output) - -with open(join(REPO_ROOT, "docs", "source", "construct-yaml.md"), "w") as f: - f.write(output) +(REPO_ROOT / "CONSTRUCT.md").write_text(output) +(REPO_ROOT / "docs" / "source" / "construct-yaml.md").write_text(output) diff --git a/tests/test_examples.py b/tests/test_examples.py index e8b3168fa..d7e55e1c4 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -90,12 +90,12 @@ def _execute( print("Took", timedelta(seconds=time.time() - t0)) -def _check_installer_log(install_dir): +def _check_installer_log(install_dir: Path): # Windows installers won't raise exit codes so we need to check the log file error_lines = [] try: log_is_empty = True - with open(os.path.join(install_dir, "install.log"), encoding="utf-16-le") as f: + with open(install_dir / "install.log", encoding="utf-16-le") as f: print("Installer log:", file=sys.stderr) for line in f: log_is_empty = False diff --git a/tests/test_imaging.py b/tests/test_imaging.py index 23046e683..f44b821ba 100644 --- a/tests/test_imaging.py +++ b/tests/test_imaging.py @@ -1,6 +1,7 @@ import shutil import sys import tempfile +from pathlib import Path import pytest @@ -13,7 +14,7 @@ reason="imaging only available on Windows and MacOS", ) def test_write_images(): - tmp_dir = tempfile.mkdtemp() + tmp_dir = Path(tempfile.mkdtemp()) info = {"name": "test", "version": "0.3.1"} for key in ("welcome_image_text", "header_image_text"): diff --git a/tests/test_utils.py b/tests/test_utils.py index 56f88e4c7..1274e4bd6 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,6 +1,4 @@ -from os import sep - -from constructor.utils import make_VIProductVersion, normalize_path +from constructor.utils import make_VIProductVersion def test_make_VIProductVersion(): @@ -12,11 +10,3 @@ def test_make_VIProductVersion(): assert f("5.2dev") == "5.0.0.0" assert f("5.26.8.9.3") == "5.26.8.9" assert f("x") == "0.0.0.0" - - -def test_normalize_path(): - path = "//test//test/test".replace("/", sep) - assert normalize_path(path) == "/test/test/test".replace("/", sep) - - path = "test///test/test".replace("/", sep) - assert normalize_path(path) == "test/test/test".replace("/", sep)