Skip to content

Commit

Permalink
Add ability to cache package payloads synchronously (#1679)
Browse files Browse the repository at this point in the history
Adds a `package_cache_async` flag which allows users to run caching synchronously (blocking) or asynchronously from the config. Also adds CLI argument `--pkg-cache-mode` to manually override the caching mode.

---

Signed-off-by: ttrently <41705925+ttrently@users.noreply.github.com>
Signed-off-by: Ben Andersen <ben@isohedron.com.au>
Signed-off-by: Jean-Christophe Morin <jean_christophe_morin@hotmail.com>
Co-authored-by: ttrently <41705925+ttrently@users.noreply.github.com>
Co-authored-by: Jean-Christophe Morin <jean_christophe_morin@hotmail.com>
  • Loading branch information
3 people authored Jun 29, 2024
1 parent 704eb71 commit 671e32f
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 26 deletions.
15 changes: 14 additions & 1 deletion src/rez/cli/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ def setup_parser(parser, completions=False):
parser.add_argument(
"--no-pkg-cache", action="store_true",
help="Disable package caching")
parser.add_argument(
"--pkg-cache-mode", choices=["sync", "async"],
help="If provided, override the rezconfig's package_cache_async key. "
"If 'sync', the process will block until packages are cached. "
"If 'async', the process will not block while packages are cached.")
parser.add_argument(
"--pre-command", type=str, help=SUPPRESS)
PKG_action = parser.add_argument(
Expand Down Expand Up @@ -201,6 +206,13 @@ def command(opts, parser, extra_arg_groups=None):
rule = Rule.parse_rule(rule_str)
package_filter.add_inclusion(rule)

if opts.pkg_cache_mode == "async":
package_cache_mode = True
elif opts.pkg_cache_mode == "sync":
package_cache_mode = False
else:
package_cache_mode = None

# perform the resolve
context = ResolvedContext(
package_requests=request,
Expand All @@ -215,7 +227,8 @@ def command(opts, parser, extra_arg_groups=None):
caching=(not opts.no_cache),
suppress_passive=opts.no_passive,
print_stats=opts.stats,
package_caching=(not opts.no_pkg_cache)
package_caching=(not opts.no_pkg_cache),
package_cache_async=package_cache_mode,
)

success = (context.status == ResolverStatus.solved)
Expand Down
1 change: 1 addition & 0 deletions src/rez/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ def _parse_env_var(self, value):
"package_cache_during_build": Bool,
"package_cache_local": Bool,
"package_cache_same_device": Bool,
"package_cache_async": Bool,
"color_enabled": ForceOrBool,
"resolve_caching": Bool,
"cache_package_files": Bool,
Expand Down
118 changes: 103 additions & 15 deletions src/rez/package_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from rez.config import config
from rez.exceptions import PackageCacheError
from rez.vendor.lockfile import LockFile, NotLocked
from rez.vendor.progress.spinner import PixelSpinner
from rez.utils.filesystem import safe_listdir, safe_makedirs, safe_remove, \
forceful_rmtree
from rez.utils.colorize import ColorizedStreamHandler
Expand Down Expand Up @@ -70,6 +71,18 @@ class PackageCache(object):
VARIANT_PENDING = 5 #: Variant is pending caching
VARIANT_REMOVED = 6 #: Variant was deleted

STATUS_DESCRIPTIONS = {
VARIANT_NOT_FOUND: "was not found",
VARIANT_FOUND: "was found",
VARIANT_CREATED: "was created",
VARIANT_COPYING: "payload is still being copied to this cache",
VARIANT_COPY_STALLED: "payload copy has stalled.\nSee "
"https://rez.readthedocs.io/en/stable/caching.html#cleaning-the-cache "
"for more information.",
VARIANT_PENDING: "is pending caching",
VARIANT_REMOVED: "was deleted",
}

_FILELOCK_TIMEOUT = 10
_COPYING_TIME_INC = 0.2
_COPYING_TIME_MAX = 5.0
Expand Down Expand Up @@ -117,7 +130,7 @@ def get_cached_root(self, variant):

return rootpath

def add_variant(self, variant, force=False):
def add_variant(self, variant, force=False, wait_for_copying=False, logger=None):
"""Copy a variant's payload into the cache.
The following steps are taken to ensure muti-thread/proc safety, and to
Expand Down Expand Up @@ -148,6 +161,9 @@ def add_variant(self, variant, force=False):
variant (Variant): The variant to copy into this cache
force (bool): Copy the variant regardless. Use at your own risk (there
is no guarantee the resulting variant payload will be functional).
wait_for_copying (bool): Whether the caching step should block when one of the
pending variants is marked as already copying.
logger (None | Logger): If a logger is provided, log information to it.
Returns:
tuple: 2-tuple:
Expand Down Expand Up @@ -208,15 +224,40 @@ def add_variant(self, variant, force=False):
% package.repository
)

no_op_statuses = (
no_op_statuses = {
self.VARIANT_FOUND,
self.VARIANT_COPYING,
self.VARIANT_COPY_STALLED
)
self.VARIANT_COPY_STALLED,
}
if not wait_for_copying:
# Copying variants are only no-ops if we want to ignore them.
no_op_statuses.add(self.VARIANT_COPYING)

# variant already exists, or is being copied to cache by another thread/proc
status, rootpath = self._get_cached_root(variant)
if status in no_op_statuses:
if logger:
logger.warning(f"Not caching {variant.qualified_name}. "
f"Variant {self.STATUS_DESCRIPTIONS[status]}")
return (rootpath, status)

if wait_for_copying and status == self.VARIANT_COPYING:
spinner = PixelSpinner(f"Waiting for {variant.qualified_name} to finish copying. ")
while status == self.VARIANT_COPYING:
spinner.next()
time.sleep(self._COPYING_TIME_INC)
status, rootpath = self._get_cached_root(variant)

# Status has changed, so report the change and return
if logger:
if status == self.VARIANT_FOUND:
# We have resolved into a satisfactory state
logger.info(
f"{variant.qualified_name} {self.STATUS_DESCRIPTIONS[status]}"
)
else:
logger.warning(
f"{variant.qualified_name} {self.STATUS_DESCRIPTIONS[status]}"
)
return (rootpath, status)

# 1.
Expand Down Expand Up @@ -365,28 +406,47 @@ def add_variants_async(self, variants):
This method is called when a context is created or sourced. Variants
are then added to the cache in a separate process.
.. deprecated:: 3.2.0
Use :method:`add_variants` instead.
"""
return self.add_variants(variants, package_cache_async=True)

# A prod install is necessary because add_variants_async works by
def add_variants(self, variants, package_cache_async=True):
"""Add the given variants to the package payload cache.
"""

# A prod install is necessary because add_variants works by
# starting a rez-pkg-cache proc, and this can only be done reliably in
# a prod install. On non-windows we could fork instead, but there would
# remain no good solution on windows.
#
if not system.is_production_rez_install:
raise PackageCacheError(
"PackageCache.add_variants_async is only supported in a "
"PackageCache.add_variants is only supported in a "
"production rez installation."
)

variants_ = []
cachable_statuses = {
self.VARIANT_NOT_FOUND,
}
if not package_cache_async:
# We want to monitor copying variants if we're synchronous.
# We also want to report that a status has been stalled, so we'll
# hand that off to the caching function as well
cachable_statuses.update({
self.VARIANT_COPYING,
self.VARIANT_COPY_STALLED,
})

# trim down to those variants that are cachable, and not already cached
for variant in variants:
if not variant.parent.is_cachable:
continue

status, _ = self._get_cached_root(variant)
if status == self.VARIANT_NOT_FOUND:
if status in cachable_statuses:
variants_.append(variant)

# if there are no variants to add, and no potential cleanup to do, then exit
Expand Down Expand Up @@ -427,6 +487,20 @@ def add_variants_async(self, variants):
with open(filepath, 'w') as f:
f.write(json.dumps(handle_dict))

if package_cache_async:
self._subprocess_package_caching_daemon(self.path)
else:
# syncronous caching
self._run_caching_operation(wait_for_copying=True)

@staticmethod
def _subprocess_package_caching_daemon(path):
"""
Run the package cache in a daemon process
Returns:
subprocess.Popen : The package caching daemon process
"""
# configure executable
if platform.system() == "Windows":
kwargs = {
Expand All @@ -443,7 +517,7 @@ def add_variants_async(self, variants):
raise RuntimeError("Did not find rez-pkg-cache executable")

# start caching subproc
args = [exe, "--daemon", self.path]
args = [exe, "--daemon", path]

try:
with open(os.devnull, 'w') as devnull:
Expand All @@ -454,8 +528,8 @@ def add_variants_async(self, variants):
else:
out_target = devnull

subprocess.Popen(
[exe, "--daemon", self.path],
return subprocess.Popen(
args,
stdout=out_target,
stderr=out_target,
**kwargs
Expand Down Expand Up @@ -558,6 +632,15 @@ def run_daemon(self):
if pid > 0:
sys.exit(0)

self._run_caching_operation(wait_for_copying=False)

def _run_caching_operation(self, wait_for_copying=True):
"""Copy pending variants.
Args:
wait_for_copying (bool): Whether the caching step should block when one of the
pending variants is marked as already copying.
"""
logger = self._init_logging()

# somewhere for the daemon to store stateful info
Expand All @@ -568,7 +651,7 @@ def run_daemon(self):
# copy variants into cache
try:
while True:
keep_running = self._run_daemon_step(state)
keep_running = self._run_caching_step(state, wait_for_copying=wait_for_copying)
if not keep_running:
break
except Exception:
Expand Down Expand Up @@ -682,12 +765,13 @@ def _lock(self):
except NotLocked:
pass

def _run_daemon_step(self, state):
def _run_caching_step(self, state, wait_for_copying=False):
logger = state["logger"]

# pick a random pending variant to copy
pending_filenames = set(os.listdir(self._pending_dir))
pending_filenames -= set(state.get("copying", set()))
if not wait_for_copying:
pending_filenames -= set(state.get("copying", set()))
if not pending_filenames:
return False

Expand All @@ -710,7 +794,11 @@ def _run_daemon_step(self, state):
t = time.time()

try:
rootpath, status = self.add_variant(variant)
rootpath, status = self.add_variant(
variant,
wait_for_copying=wait_for_copying,
logger=logger,
)

except PackageCacheError as e:
# variant cannot be cached, so remove as a pending variant
Expand Down
16 changes: 12 additions & 4 deletions src/rez/resolved_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def __init__(self, package_requests, verbosity=0, timestamp=None,
package_filter=None, package_orderers=None, max_fails=-1,
add_implicit_packages=True, time_limit=-1, callback=None,
package_load_callback=None, buf=None, suppress_passive=False,
print_stats=False, package_caching=None):
print_stats=False, package_caching=None, package_cache_async=None):
"""Perform a package resolve, and store the result.
Args:
Expand Down Expand Up @@ -205,6 +205,8 @@ def __init__(self, package_requests, verbosity=0, timestamp=None,
package_caching (bool|None): If True, apply package caching settings
as per the config. If None, enable as determined by config
setting :data:`package_cache_during_build`.
package_cache_async (bool|None): If True, cache packages asynchronously.
If None, use the config setting :data:`package_cache_async`
"""
self.load_path = None

Expand Down Expand Up @@ -246,9 +248,12 @@ def __init__(self, package_requests, verbosity=0, timestamp=None,
package_caching = config.package_cache_during_build
else:
package_caching = True

self.package_caching = package_caching

if package_cache_async is None:
package_cache_async = config.package_cache_async
self.package_cache_async = package_cache_async

# patch settings
self.default_patch_lock = PatchLock.no_lock
self.patch_locks = {}
Expand Down Expand Up @@ -1839,13 +1844,16 @@ def _update_package_cache(self):
not self.success:
return

# see PackageCache.add_variants_async
# see PackageCache.add_variants
if not system.is_production_rez_install:
return

pkgcache = self._get_package_cache()
if pkgcache:
pkgcache.add_variants_async(self.resolved_packages)
pkgcache.add_variants(
self.resolved_packages,
self.package_cache_async,
)

@classmethod
def _init_context_tracking_payload_base(cls):
Expand Down
10 changes: 8 additions & 2 deletions src/rez/rezconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@
# Enable package caching during a package build.
package_cache_during_build = False

# Asynchronously cache packages. If this is false, resolves will block until
# all packages are cached.
#
# .. versionadded:: 3.2.0
package_cache_async = True

# Allow caching of local packages. You would only want to set this True for
# testing purposes.
package_cache_local = False
Expand Down Expand Up @@ -313,7 +319,7 @@
# This is useful as Platform.os might show different
# values depending on the availability of ``lsb-release`` on the system.
# The map supports regular expression, e.g. to keep versions.
#
#
# .. note::
# The following examples are not necessarily recommendations.
#
Expand Down Expand Up @@ -1137,7 +1143,7 @@

# Enables/disables colorization globally.
#
# .. warning::
# .. warning::
# Turned off for Windows currently as there seems to be a problem with the colorama module.
#
# May also set to the string ``force``, which will make rez output color styling
Expand Down
Loading

0 comments on commit 671e32f

Please sign in to comment.