Skip to content

Commit

Permalink
Re-work atomic_directory locking for faster / clearer failures. (#1961)
Browse files Browse the repository at this point in the history
Change `atomic_directory` to always grab an exclusive lock and use a
stable work directory per target directory to surface multiple lock
owners as up-front warnings instead of possibly slow corruptions.
  • Loading branch information
jsirois authored Oct 21, 2022
1 parent 20ba2e5 commit dbd4c13
Show file tree
Hide file tree
Showing 19 changed files with 247 additions and 212 deletions.
187 changes: 187 additions & 0 deletions pex/atomic_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import absolute_import

import errno
import fcntl
import os
import threading
from contextlib import contextmanager

from pex import pex_warnings
from pex.common import safe_mkdir, safe_rmtree
from pex.enum import Enum
from pex.typing import TYPE_CHECKING, cast

if TYPE_CHECKING:
from typing import Callable, Iterator, Optional


class AtomicDirectory(object):
def __init__(self, target_dir):
# type: (str) -> None
self._target_dir = target_dir
self._work_dir = "{}.workdir".format(target_dir)

@property
def work_dir(self):
# type: () -> str
return self._work_dir

@property
def target_dir(self):
# type: () -> str
return self._target_dir

def is_finalized(self):
# type: () -> bool
return os.path.exists(self._target_dir)

def finalize(self, source=None):
# type: (Optional[str]) -> None
"""Rename `work_dir` to `target_dir` using `os.rename()`.
:param source: An optional source offset into the `work_dir`` to use for the atomic update
of `target_dir`. By default, the whole `work_dir` is used.
If a race is lost and `target_dir` already exists, the `target_dir` dir is left unchanged and
the `work_dir` directory will simply be removed.
"""
if self.is_finalized():
return

source = os.path.join(self._work_dir, source) if source else self._work_dir
try:
# Perform an atomic rename.
#
# Per the docs: https://docs.python.org/2.7/library/os.html#os.rename
#
# The operation may fail on some Unix flavors if src and dst are on different
# filesystems. If successful, the renaming will be an atomic operation (this is a
# POSIX requirement).
#
# We have satisfied the single filesystem constraint by arranging the `work_dir` to be a
# sibling of the `target_dir`.
os.rename(source, self._target_dir)
except OSError as e:
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
raise e
finally:
self.cleanup()

def cleanup(self):
# type: () -> None
safe_rmtree(self._work_dir)


class FileLockStyle(Enum["FileLockStyle.Value"]):
class Value(Enum.Value):
pass

BSD = Value("bsd")
POSIX = Value("posix")


@contextmanager
def atomic_directory(
target_dir, # type: str
lock_style=FileLockStyle.POSIX, # type: FileLockStyle.Value
source=None, # type: Optional[str]
):
# type: (...) -> Iterator[AtomicDirectory]
"""A context manager that yields an exclusively locked AtomicDirectory.
:param target_dir: The target directory to atomically update.
:param lock_style: By default, a POSIX fcntl lock will be used to ensure exclusivity.
:param source: An optional source offset into the work directory to use for the atomic update
of the target directory. By default, the whole work directory is used.
If the `target_dir` already exists the enclosed block will be yielded an AtomicDirectory that
`is_finalized` to signal there is no work to do.
If the enclosed block fails the `target_dir` will not be created if it does not already exist.
The new work directory will be cleaned up regardless of whether the enclosed block succeeds.
"""

# We use double-checked locking with the check being target_dir existence and the lock being an
# exclusive blocking file lock.

atomic_dir = AtomicDirectory(target_dir=target_dir)
if atomic_dir.is_finalized():
# Our work is already done for us so exit early.
yield atomic_dir
return

head, tail = os.path.split(atomic_dir.target_dir)
if head:
safe_mkdir(head)
lockfile = os.path.join(head, ".{}.atomic_directory.lck".format(tail or "here"))

# N.B.: We don't actually write anything to the lock file but the fcntl file locking
# operations only work on files opened for at least write.
lock_fd = os.open(lockfile, os.O_CREAT | os.O_WRONLY)

lock_api = cast(
"Callable[[int, int], None]",
fcntl.flock if lock_style is FileLockStyle.BSD else fcntl.lockf,
)

def unlock():
# type: () -> None
try:
lock_api(lock_fd, fcntl.LOCK_UN)
finally:
os.close(lock_fd)

# N.B.: Since lockf and flock operate on an open file descriptor and these are
# guaranteed to be closed by the operating system when the owning process exits,
# this lock is immune to staleness.
lock_api(lock_fd, fcntl.LOCK_EX) # A blocking write lock.
if atomic_dir.is_finalized():
# We lost the double-checked locking race and our work was done for us by the race
# winner so exit early.
try:
yield atomic_dir
finally:
unlock()
return

# If there is an error making the work_dir that means that either file-locking guarantees have
# failed somehow and another process has the lock and has made the work_dir already or else a
# process holding the lock ended abnormally.
try:
os.mkdir(atomic_dir.work_dir)
except OSError as e:
ident = "[pid:{pid}, tid:{tid}, cwd:{cwd}]".format(
pid=os.getpid(), tid=threading.current_thread().ident, cwd=os.getcwd()
)
pex_warnings.warn(
"{ident}: After obtaining an exclusive lock on {lockfile}, failed to establish a work "
"directory at {workdir} due to: {err}".format(
ident=ident,
lockfile=lockfile,
workdir=atomic_dir.work_dir,
err=e,
),
)
if e.errno != errno.EEXIST:
raise
pex_warnings.warn(
"{ident}: Continuing to forcibly re-create the work directory at {workdir}.".format(
ident=ident,
workdir=atomic_dir.work_dir,
)
)
safe_mkdir(atomic_dir.work_dir, clean=True)

try:
yield atomic_dir
except Exception:
atomic_dir.cleanup()
raise
else:
atomic_dir.finalize(source=source)
finally:
unlock()
152 changes: 1 addition & 151 deletions pex/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import atexit
import contextlib
import errno
import fcntl
import itertools
import os
import re
Expand All @@ -18,12 +17,10 @@
import time
import zipfile
from collections import defaultdict, namedtuple
from contextlib import contextmanager
from datetime import datetime
from uuid import uuid4

from pex.enum import Enum
from pex.typing import TYPE_CHECKING, cast
from pex.typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import (
Expand All @@ -37,7 +34,6 @@
Set,
Sized,
Tuple,
Union,
)

# We use the start of MS-DOS time, which is what zipfiles use (see section 4.4.6 of
Expand Down Expand Up @@ -333,152 +329,6 @@ def safe_sleep(seconds):
current_time = time.time()


class AtomicDirectory(object):
def __init__(self, target_dir):
# type: (str) -> None
self._target_dir = target_dir
self._work_dir = "{}.{}".format(target_dir, uuid4().hex)

@property
def work_dir(self):
# type: () -> str
return self._work_dir

@property
def target_dir(self):
# type: () -> str
return self._target_dir

def is_finalized(self):
# type: () -> bool
return os.path.exists(self._target_dir)

def finalize(self, source=None):
# type: (Optional[str]) -> None
"""Rename `work_dir` to `target_dir` using `os.rename()`.
:param source: An optional source offset into the `work_dir`` to use for the atomic update
of `target_dir`. By default the whole `work_dir` is used.
If a race is lost and `target_dir` already exists, the `target_dir` dir is left unchanged and
the `work_dir` directory will simply be removed.
"""
if self.is_finalized():
return

source = os.path.join(self._work_dir, source) if source else self._work_dir
try:
# Perform an atomic rename.
#
# Per the docs: https://docs.python.org/2.7/library/os.html#os.rename
#
# The operation may fail on some Unix flavors if src and dst are on different filesystems.
# If successful, the renaming will be an atomic operation (this is a POSIX requirement).
#
# We have satisfied the single filesystem constraint by arranging the `work_dir` to be a
# sibling of the `target_dir`.
os.rename(source, self._target_dir)
except OSError as e:
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
raise e
finally:
self.cleanup()

def cleanup(self):
# type: () -> None
safe_rmtree(self._work_dir)


class FileLockStyle(Enum["FileLockStyle.Value"]):
class Value(Enum.Value):
pass

BSD = Value("bsd")
POSIX = Value("posix")


@contextmanager
def atomic_directory(
target_dir, # type: str
exclusive, # type: Union[bool, FileLockStyle.Value]
source=None, # type: Optional[str]
):
# type: (...) -> Iterator[AtomicDirectory]
"""A context manager that yields a potentially exclusively locked AtomicDirectory.
:param target_dir: The target directory to atomically update.
:param exclusive: If `True`, its guaranteed that only one process will be yielded a non `None`
workdir; otherwise two or more processes might be yielded unique non-`None`
workdirs with the last process to finish "winning". By default, a POSIX fcntl
lock will be used to ensure exclusivity. To change this, pass an explicit
`LockStyle` instead of `True`.
:param source: An optional source offset into the work directory to use for the atomic update
of the target directory. By default the whole work directory is used.
If the `target_dir` already exists the enclosed block will be yielded an AtomicDirectory that
`is_finalized` to signal there is no work to do.
If the enclosed block fails the `target_dir` will be undisturbed.
The new work directory will be cleaned up regardless of whether or not the enclosed block
succeeds.
If the contents of the resulting directory will be subsequently mutated it's probably correct to
pass `exclusive=True` to ensure mutations that race the creation process are not lost.
"""
atomic_dir = AtomicDirectory(target_dir=target_dir)
if atomic_dir.is_finalized():
# Our work is already done for us so exit early.
yield atomic_dir
return

lock_fd = None # type: Optional[int]
lock_api = cast(
"Callable[[int, int], None]",
fcntl.flock if exclusive is FileLockStyle.BSD else fcntl.lockf,
)

def unlock():
# type: () -> None
if lock_fd is None:
return
try:
lock_api(lock_fd, fcntl.LOCK_UN)
finally:
os.close(lock_fd)

if exclusive:
head, tail = os.path.split(atomic_dir.target_dir)
if head:
safe_mkdir(head)
# N.B.: We don't actually write anything to the lock file but the fcntl file locking
# operations only work on files opened for at least write.
lock_fd = os.open(
os.path.join(head, ".{}.atomic_directory.lck".format(tail or "here")),
os.O_CREAT | os.O_WRONLY,
)
# N.B.: Since lockf and flock operate on an open file descriptor and these are
# guaranteed to be closed by the operating system when the owning process exits,
# this lock is immune to staleness.
lock_api(lock_fd, fcntl.LOCK_EX) # A blocking write lock.
if atomic_dir.is_finalized():
# We lost the double-checked locking race and our work was done for us by the race
# winner so exit early.
try:
yield atomic_dir
finally:
unlock()
return

try:
os.makedirs(atomic_dir.work_dir)
yield atomic_dir
atomic_dir.finalize(source=source)
finally:
unlock()
atomic_dir.cleanup()


def chmod_plus_x(path):
# type: (str) -> None
"""Equivalent of unix `chmod a+x path`"""
Expand Down
5 changes: 3 additions & 2 deletions pex/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,12 +828,13 @@ def create_interpreter(
import os
import sys
from pex.common import atomic_directory, safe_open
from pex.atomic_directory import atomic_directory
from pex.common import safe_open
from pex.interpreter import PythonIdentity
encoded_identity = PythonIdentity.get(binary={binary!r}).encode()
with atomic_directory({cache_dir!r}, exclusive=False) as cache_dir:
with atomic_directory({cache_dir!r}) as cache_dir:
if not cache_dir.is_finalized():
with safe_open(
os.path.join(cache_dir.work_dir, {info_file!r}), 'w'
Expand Down
Loading

0 comments on commit dbd4c13

Please sign in to comment.