Skip to content

Commit

Permalink
dvc: make flufl.lock opt-in and use zc.lockfile
Browse files Browse the repository at this point in the history
As it turned out (see issue numbers down below), we can't really take
hardlinks for granted, so `flufl.lock` is not a panacea for all
filesystems. Considering that the vast majority of filesystems that our
users use support `zc.lockfile`(flock-based) and it has benefits like
more reliable mechanism, auto-delete when process exits, more sturdy
implementation, etc, it makes more sense to bring it back and use by
default again. For filesystems that don't support `flock()`, users will
be able to manually enable `flufl.lock` use through the config option.
It would be ideal if we could auto-detect that flock is not supported,
but in the real world, it turned out to be non-trivial, as it might hang
forever in a kernel context, which makes the implementation way too
complex for our purposes. So what we're doing instead is showing a
message before locking with `zc.lockfile` that, under normal
circumstances will disappear once the lock is taken or failed, otherwise
it will point users to the related documentation where they can learn
about how to opt-in for `flufl.lock`.

Fixes iterative#2831
Fixes iterative#2897
Related iterative#2860
  • Loading branch information
efiop committed Dec 13, 2019
1 parent fd62ba8 commit 25807f2
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 68 deletions.
15 changes: 6 additions & 9 deletions dvc/command/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import logging

import colorama


logger = logging.getLogger(__name__)

Expand All @@ -24,15 +22,13 @@ def fix_subparsers(subparsers):


def append_doc_link(help_message, path):
from dvc.utils import format_link

if not path:
return help_message
doc_base = "https://man.dvc.org/"
return "{message}\nDocumentation: <{blue}{base}{path}{nc}>".format(
message=help_message,
base=doc_base,
path=path,
blue=colorama.Fore.CYAN,
nc=colorama.Fore.RESET,
return "{message}\nDocumentation: {link}".format(
message=help_message, link=format_link(doc_base + path)
)


Expand All @@ -44,7 +40,8 @@ def __init__(self, args):
self.repo = Repo()
self.config = self.repo.config
self.args = args
updater = Updater(self.repo.dvc_dir)
hardlink_lock = self.config.config["core"].get("hardlink_lock", False)
updater = Updater(self.repo.dvc_dir, hardlink_lock=hardlink_lock)
updater.check()

@property
Expand Down
7 changes: 6 additions & 1 deletion dvc/command/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@ class CmdDaemonUpdater(CmdDaemonBase):
def run(self):
import os
from dvc.repo import Repo
from dvc.config import Config
from dvc.updater import Updater

root_dir = Repo.find_root()
dvc_dir = os.path.join(root_dir, Repo.DVC_DIR)
updater = Updater(dvc_dir)
config = Config(dvc_dir, verify=False)
hardlink_lock = config.config.get("core", {}).get(
"hardlink_lock", False
)
updater = Updater(dvc_dir, hardlink_lock=hardlink_lock)
updater.fetch(detach=False)

return 0
Expand Down
2 changes: 2 additions & 0 deletions dvc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class Config(object): # pylint: disable=too-many-instance-attributes
SECTION_CORE_INTERACTIVE = "interactive"
SECTION_CORE_ANALYTICS = "analytics"
SECTION_CORE_CHECKSUM_JOBS = "checksum_jobs"
SECTION_CORE_HARDLINK_LOCK = "hardlink_lock"

SECTION_CACHE = "cache"
SECTION_CACHE_DIR = "dir"
Expand Down Expand Up @@ -160,6 +161,7 @@ class Config(object): # pylint: disable=too-many-instance-attributes
Optional(SECTION_CORE_INTERACTIVE, default=False): Bool,
Optional(SECTION_CORE_ANALYTICS, default=True): Bool,
SECTION_CORE_CHECKSUM_JOBS: All(Coerce(int), Range(1)),
Optional(SECTION_CORE_HARDLINK_LOCK, default=False): Bool,
}

# aws specific options
Expand Down
111 changes: 66 additions & 45 deletions dvc/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import time
from datetime import timedelta

import zc.lockfile
from funcy.py3 import lkeep

from dvc.exceptions import DvcException
from dvc.utils import makedirs
from dvc.utils.compat import is_py3

from dvc.utils import makedirs, format_link
from dvc.utils.compat import is_py3, is_py2
from dvc.progress import Tqdm

DEFAULT_TIMEOUT = 5

Expand All @@ -26,10 +27,60 @@ class LockError(DvcException):
"""Thrown when unable to acquire the lock for dvc repo."""


class Lock(object):
"""Class for dvc repo lock.
Uses zc.lockfile as backend.
"""

def __init__(self, lockfile, friendly=False, **kwargs):
self._friendly = friendly
self.lockfile = lockfile
self._lock = None

@property
def files(self):
return [self.lockfile]

def _do_lock(self):
try:
with Tqdm(
bar_format="{desc}",
disable=not self._friendly,
desc=(
"If DVC froze, see `hardlink_lock` in {}".format(
format_link("man.dvc.org/config#core")
)
),
):
self._lock = zc.lockfile.LockFile(self.lockfile)
except zc.lockfile.LockError:
raise LockError(FAILED_TO_LOCK_MESSAGE)

def lock(self):
try:
self._do_lock()
return
except LockError:
time.sleep(DEFAULT_TIMEOUT)

self._do_lock()

def unlock(self):
self._lock.close()
self._lock = None

def __enter__(self):
self.lock()

def __exit__(self, typ, value, tbck):
self.unlock()


if is_py3:
import flufl.lock

class Lock(flufl.lock.Lock):
class HardlinkLock(flufl.lock.Lock):
"""Class for dvc repo lock.
Args:
Expand All @@ -38,7 +89,7 @@ class Lock(flufl.lock.Lock):
tmp_dir (str): a directory to store claim files.
"""

def __init__(self, lockfile, tmp_dir=None):
def __init__(self, lockfile, tmp_dir=None, **kwargs):
import socket

self._tmp_dir = tmp_dir
Expand Down Expand Up @@ -101,44 +152,14 @@ def __del__(self):
pass


else:
import zc.lockfile

class Lock(object):
"""Class for dvc repo lock.
Uses zc.lockfile as backend.
"""

def __init__(self, lockfile, tmp_dir=None):
self.lockfile = lockfile
self._lock = None

@property
def files(self):
return [self.lockfile]

def _do_lock(self):
try:
self._lock = zc.lockfile.LockFile(self.lockfile)
except zc.lockfile.LockError:
raise LockError(FAILED_TO_LOCK_MESSAGE)

def lock(self):
try:
self._do_lock()
return
except LockError:
time.sleep(DEFAULT_TIMEOUT)

self._do_lock()

def unlock(self):
self._lock.close()
self._lock = None

def __enter__(self):
self.lock()
def make_lock(lockfile, tmp_dir=None, friendly=False, hardlink_lock=False):
if hardlink_lock and is_py2:
raise DvcException(
"Hardlink locks are not supported on Python <3.5. "
"See `hardlink_lock` in {}".format(
format_link("man.dvc.org/config#core")
)
)

def __exit__(self, typ, value, tbck):
self.unlock()
cls = HardlinkLock if hardlink_lock else Lock
return cls(lockfile, tmp_dir=tmp_dir, friendly=friendly)
7 changes: 5 additions & 2 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Repo(object):

def __init__(self, root_dir=None):
from dvc.state import State
from dvc.lock import Lock
from dvc.lock import make_lock
from dvc.scm import SCM
from dvc.cache import Cache
from dvc.data_cloud import DataCloud
Expand All @@ -88,9 +88,12 @@ def __init__(self, root_dir=None):

self.tree = WorkingTree(self.root_dir)

self.lock = Lock(
hardlink_lock = self.config.config["core"].get("hardlink_lock", False)
self.lock = make_lock(
os.path.join(self.dvc_dir, "lock"),
tmp_dir=os.path.join(self.dvc_dir, "tmp"),
hardlink_lock=hardlink_lock,
friendly=True,
)
# NOTE: storing state and link_state in the repository itself to avoid
# any possible state corruption in 'shared cache dir' scenario.
Expand Down
12 changes: 7 additions & 5 deletions dvc/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
from packaging import version

from dvc import __version__
from dvc.lock import Lock
from dvc.lock import LockError
from dvc.lock import make_lock, LockError
from dvc.utils import boxify
from dvc.utils import env2bool
from dvc.utils.pkg import PKG
Expand All @@ -24,11 +23,14 @@ class Updater(object): # pragma: no cover
TIMEOUT = 24 * 60 * 60 # every day
TIMEOUT_GET = 10

def __init__(self, dvc_dir):
def __init__(self, dvc_dir, friendly=False, hardlink_lock=False):
self.dvc_dir = dvc_dir
self.updater_file = os.path.join(dvc_dir, self.UPDATER_FILE)
self.lock = Lock(
self.updater_file + ".lock", tmp_dir=os.path.join(dvc_dir, "tmp")
self.lock = make_lock(
self.updater_file + ".lock",
tmp_dir=os.path.join(dvc_dir, "tmp"),
friendly=friendly,
hardlink_lock=hardlink_lock,
)
self.current = version.parse(__version__).base_version

Expand Down
8 changes: 8 additions & 0 deletions dvc/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,11 @@ def resolve_output(inp, out):
if os.path.isdir(out):
return os.path.join(out, name)
return out


def format_link(link):
import colorama

return "<{blue}{link}{nc}>".format(
blue=colorama.Fore.CYAN, link=link, nc=colorama.Fore.RESET
)
8 changes: 2 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def run(self):
"shortuuid>=0.5.0",
"tqdm>=4.40.0,<5",
"packaging>=19.0",
"zc.lockfile>=1.2.1",
"win-unicode-console>=0.5; sys_platform == 'win32'",
"pywin32>=225; sys_platform == 'win32'",
]
Expand Down Expand Up @@ -163,12 +164,7 @@ def run(self):
"ssh_gssapi": ssh_gssapi,
"hdfs": hdfs,
# NOTE: https://github.com/inveniosoftware/troubleshooting/issues/1
":python_version=='2.7'": [
"futures",
"pathlib2",
"contextlib2",
"zc.lockfile>=1.2.1",
],
":python_version=='2.7'": ["futures", "pathlib2", "contextlib2"],
":python_version>='3.0'": ["flufl.lock>=3.2"],
"tests": tests_requirements,
},
Expand Down

0 comments on commit 25807f2

Please sign in to comment.