Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify PEX buildtime and runtime wheel caches. #821

Merged
merged 6 commits into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions pex/bin/pex.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ def configure_clp_pex_resolution(parser):
group.add_option(
'--cache-dir',
dest='cache_dir',
default='{pex_root}/build',
default='{pex_root}',
help='The local cache directory to use for speeding up requirement '
'lookups. [Default: ~/.pex/build]')
'lookups. [Default: ~/.pex]')

group.add_option(
'--wheel', '--no-wheel', '--no-use-wheel',
Expand Down Expand Up @@ -458,7 +458,7 @@ def configure_clp():
parser.add_option(
'--pex-root',
dest='pex_root',
default=None,
default=ENV.PEX_ROOT,
help='Specify the pex root used in this invocation of pex. [Default: ~/.pex]'
)

Expand Down Expand Up @@ -625,16 +625,13 @@ def main(args=None):
if options.python and options.interpreter_constraint:
die('The "--python" and "--interpreter-constraint" options cannot be used together.')

if options.pex_root:
ENV.set('PEX_ROOT', options.pex_root)
else:
options.pex_root = ENV.PEX_ROOT # If option not specified fallback to env variable.
with ENV.patch(PEX_VERBOSE=str(options.verbosity),
PEX_ROOT=options.pex_root) as patched_env:

# Don't alter cache if it is disabled.
if options.cache_dir:
options.cache_dir = make_relative_to_root(options.cache_dir)
# Don't alter cache if it is disabled.
if options.cache_dir:
options.cache_dir = make_relative_to_root(options.cache_dir)

with ENV.patch(PEX_VERBOSE=str(options.verbosity)) as patched_env:
with TRACER.timed('Building pex'):
pex_builder = build_pex(reqs, options)

Expand Down
86 changes: 76 additions & 10 deletions pex/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import time
import zipfile
from collections import defaultdict
from contextlib import contextmanager
from datetime import datetime
from uuid import uuid4

Expand Down Expand Up @@ -217,19 +218,84 @@ def safe_sleep(seconds):
current_time = time.time()


def rename_if_empty(src, dest, allowable_errors=(errno.EEXIST, errno.ENOTEMPTY)):
"""Rename `src` to `dest` using `os.rename()`.
class AtomicDirectory(object):
def __init__(self, target_dir):
self._target_dir = target_dir
self._work_dir = '{}.{}'.format(target_dir, uuid4().hex)

If an `OSError` with errno in `allowable_errors` is encountered during the rename, the `dest`
dir is left unchanged and the `src` directory will simply be removed.
@property
def work_dir(self):
return self._work_dir

@property
def target_dir(self):
return self._target_dir

@property
def is_finalized(self):
return os.path.exists(self._target_dir)

def finalize(self, source=None):
"""Rename `work_dir` to `target_dir` using `os.rename()`.

:param str source: An optional source offset into the `work_dir`` to use for the atomic
update of `target_dir`. By default the whole `work_dir` is used.

If a race is lost and `target_dir` already exists, the `target_dir` dir is left unchanged and
the `work_dir` directory will simply be removed.
"""
if self.is_finalized:
return

source = os.path.join(self._work_dir, source) if source else self._work_dir
try:
# Perform an atomic rename.
#
# Per the docs: https://docs.python.org/2.7/library/os.html#os.rename
#
# The operation may fail on some Unix flavors if src and dst are on different filesystems.
# If successful, the renaming will be an atomic operation (this is a POSIX requirement).
#
# We have satisfied the single filesystem constraint by arranging the `work_dir` to be a
# sibling of the `target_dir`.
os.rename(source, self._target_dir)
except OSError as e:
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
jsirois marked this conversation as resolved.
Show resolved Hide resolved
raise e
finally:
self.cleanup()

def cleanup(self):
safe_rmtree(self._work_dir)


@contextmanager
def atomic_directory(target_dir, source=None):
"""A context manager that yields a new empty work directory path it will move to `target_dir`.

:param str target_dir: The target directory to atomically update.
:param str source: An optional source offset into the work directory to use for the atomic update
of the target directory. By default the whole work directory is used.

If the `target_dir` already exists the enclosed block will be yielded `None` to signal there is
no work to do.

If the enclosed block fails the `target_dir` will be undisturbed.

The new work directory will be cleaned up regardless of whether or not the enclosed block
succeeds.
"""
atomic_dir = AtomicDirectory(target_dir=target_dir)
if atomic_dir.is_finalized:
yield None
return

safe_mkdir(atomic_dir.work_dir)
try:
os.rename(src, dest)
except OSError as e:
if e.errno in allowable_errors:
safe_rmtree(src)
else:
raise
yield atomic_dir.work_dir
atomic_dir.finalize(source=source)
finally:
atomic_dir.cleanup()
jsirois marked this conversation as resolved.
Show resolved Hide resolved


def chmod_plus_x(path):
Expand Down
62 changes: 17 additions & 45 deletions pex/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@
import os
import site
import sys
import uuid
import zipfile
from collections import OrderedDict

from pex import pex_builder, pex_warnings
from pex.bootstrap import Bootstrap
from pex.common import die, open_zip, rename_if_empty, safe_mkdir, safe_rmtree
from pex.common import atomic_directory, die, open_zip
from pex.interpreter import PythonInterpreter
from pex.package import distribution_compatible
from pex.pex_info import PexInfo
from pex.platforms import Platform
from pex.third_party.pkg_resources import (
DistributionNotFound,
Expand All @@ -27,7 +25,7 @@
find_distributions
)
from pex.tracer import TRACER
from pex.util import CacheHelper, DistributionHelper
from pex.util import CacheHelper


def _import_pkg_resources():
Expand Down Expand Up @@ -111,21 +109,14 @@ def _force_local(cls, pex_file, pex_info):
return pex_file
explode_dir = os.path.join(pex_info.zip_unsafe_cache, pex_info.code_hash)
TRACER.log('PEX is not zip safe, exploding to %s' % explode_dir)
if not os.path.exists(explode_dir):
explode_tmp = explode_dir + '.' + uuid.uuid4().hex
with TRACER.timed('Unzipping %s' % pex_file):
try:
safe_mkdir(explode_tmp)
with atomic_directory(explode_dir) as explode_tmp:
if explode_tmp:
with TRACER.timed('Unzipping %s' % pex_file):
with open_zip(pex_file) as pex_zip:
pex_files = (x for x in pex_zip.namelist()
jsirois marked this conversation as resolved.
Show resolved Hide resolved
if not x.startswith(pex_builder.BOOTSTRAP_DIR) and
not x.startswith(PexInfo.INTERNAL_CACHE))
not x.startswith(pex_info.internal_cache))
pex_zip.extractall(explode_tmp, pex_files)
except: # noqa: T803
safe_rmtree(explode_tmp)
raise
TRACER.log('Renaming %s to %s' % (explode_tmp, explode_dir))
rename_if_empty(explode_tmp, explode_dir)
return explode_dir

@classmethod
Expand Down Expand Up @@ -162,34 +153,14 @@ def _update_module_paths(cls, pex_file):
reimported_module.__path__.append(path_item)

@classmethod
def _write_zipped_internal_cache(cls, pex, pex_info):
prefix_length = len(pex_info.internal_cache) + 1
existing_cached_distributions = []
newly_cached_distributions = []
with open_zip(pex) as zf:
# Distribution names are the first element after ".deps/" and before the next "/"
distribution_names = set(filter(None, (filename[prefix_length:].split('/')[0]
for filename in zf.namelist() if filename.startswith(pex_info.internal_cache))))
# Create Distribution objects from these, and possibly write to disk if necessary.
for distribution_name in distribution_names:
internal_dist_path = '/'.join([pex_info.internal_cache, distribution_name])
# First check if this is already cached
dist_digest = pex_info.distributions.get(distribution_name) or CacheHelper.zip_hash(
zf, internal_dist_path)
cached_location = os.path.join(pex_info.install_cache, '%s.%s' % (
distribution_name, dist_digest))
if os.path.exists(cached_location):
dist = DistributionHelper.distribution_from_path(cached_location)
if dist is not None:
existing_cached_distributions.append(dist)
continue

dist = DistributionHelper.distribution_from_path(os.path.join(pex, internal_dist_path))
with TRACER.timed('Caching %s' % dist):
newly_cached_distributions.append(
CacheHelper.cache_distribution(zf, internal_dist_path, cached_location))

return existing_cached_distributions, newly_cached_distributions
def _write_zipped_internal_cache(cls, zf, pex_info):
cached_distributions = []
for distribution_name, dist_digest in pex_info.distributions.items():
internal_dist_path = '/'.join([pex_info.internal_cache, distribution_name])
jsirois marked this conversation as resolved.
Show resolved Hide resolved
cached_location = os.path.join(pex_info.install_cache, dist_digest, distribution_name)
dist = CacheHelper.cache_distribution(zf, internal_dist_path, cached_location)
cached_distributions.append(dist)
return cached_distributions

@classmethod
def _load_internal_cache(cls, pex, pex_info):
Expand All @@ -200,8 +171,9 @@ def _load_internal_cache(cls, pex, pex_info):
for dist in find_distributions(internal_cache):
yield dist
else:
for dist in itertools.chain(*cls._write_zipped_internal_cache(pex, pex_info)):
yield dist
with open_zip(pex) as zf:
for dist in cls._write_zipped_internal_cache(zf, pex_info):
yield dist

def __init__(self, pex, pex_info, interpreter=None, **kw):
self._internal_cache = os.path.join(pex, pex_info.internal_cache)
Expand Down
48 changes: 11 additions & 37 deletions pex/pex_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
import os

from pex.common import Chroot, chmod_plus_x, open_zip, safe_mkdir, safe_mkdtemp
from pex.common import Chroot, chmod_plus_x, safe_mkdir, safe_mkdtemp, temporary_dir
from pex.compatibility import to_bytes
from pex.compiler import Compiler
from pex.distribution_target import DistributionTarget
Expand Down Expand Up @@ -276,37 +276,14 @@ def _add_dist_dir(self, path, dist_name):
self._copy_or_link(filename, target)
return CacheHelper.dir_hash(path)

def _add_dist_zip(self, path, dist_name):
# We need to distinguish between wheels and other zips. Most of the time,
# when we have a zip, it contains its contents in an importable form.
# But wheels don't have to be importable, so we need to force them
# into an importable shape. We can do that by installing it into its own
# wheel dir.
if dist_name.endswith("whl"):
tmp = safe_mkdtemp()
whltmp = os.path.join(tmp, dist_name)
os.mkdir(whltmp)
install_job = spawn_install_wheel(
def _add_dist_wheel_file(self, path, dist_name):
with temporary_dir() as install_dir:
spawn_install_wheel(
wheel=path,
install_dir=whltmp,
install_dir=install_dir,
target=DistributionTarget.for_interpreter(self.interpreter)
)
install_job.wait()
for root, _, files in os.walk(whltmp):
pruned_dir = os.path.relpath(root, tmp)
for f in files:
fullpath = os.path.join(root, f)
target = os.path.join(self._pex_info.internal_cache, pruned_dir, f)
self._copy_or_link(fullpath, target)
return CacheHelper.dir_hash(whltmp)

with open_zip(path) as zf:
for name in zf.namelist():
if name.endswith('/'):
continue
target = os.path.join(self._pex_info.internal_cache, dist_name, name)
self._chroot.write(zf.read(name), target)
return CacheHelper.zip_hash(zf)
).wait()
return self._add_dist_dir(install_dir, dist_name)

def _prepare_code_hash(self):
self._pex_info.code_hash = CacheHelper.pex_hash(self._chroot.path())
Expand All @@ -325,8 +302,10 @@ def add_distribution(self, dist, dist_name=None):

if os.path.isdir(dist.location):
dist_hash = self._add_dist_dir(dist.location, dist_name)
elif dist.location.endswith('.whl'):
dist_hash = self._add_dist_wheel_file(dist.location, dist_name)
else:
dist_hash = self._add_dist_zip(dist.location, dist_name)
raise self.InvalidDistribution('Unsupported distribution type: {}'.format(dist))
jsirois marked this conversation as resolved.
Show resolved Hide resolved

# add dependency key so that it can rapidly be retrieved from cache
self._pex_info.add_distribution(dist_name, dist_hash)
Expand All @@ -347,14 +326,9 @@ def add_dist_location(self, dist, name=None):
bdist = DistributionHelper.distribution_from_path(dist)
if bdist is None:
raise self.InvalidDistribution('Could not find distribution at %s' % dist)
self.add_distribution(bdist)
self.add_distribution(bdist, dist_name=name)
self.add_requirement(bdist.as_requirement())

def add_egg(self, egg):
"""Alias for add_dist_location."""
self._ensure_unfrozen('Adding an egg')
return self.add_dist_location(egg)

def _precompile_source(self):
source_relpaths = [path for label in ('source', 'executable', 'main', 'bootstrap')
for path in self._chroot.filesets.get(label, ()) if path.endswith('.py')]
Expand Down
6 changes: 3 additions & 3 deletions pex/pex_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PexInfo(object):
"""

PATH = 'PEX-INFO'
INTERNAL_CACHE = '.deps'
INSTALL_CACHE = 'installed_wheels'

@classmethod
def make_build_properties(cls, interpreter=None):
Expand Down Expand Up @@ -290,11 +290,11 @@ def pex_root(self, value):

@property
def internal_cache(self):
return self.INTERNAL_CACHE
return '.deps'

@property
def install_cache(self):
return os.path.join(self.pex_root, 'install')
return os.path.join(self.pex_root, self.INSTALL_CACHE)

@property
def zip_unsafe_cache(self):
Expand Down
Loading