Skip to content

Commit

Permalink
update: only update the missing hashes with --to-remote (#5773)
Browse files Browse the repository at this point in the history
* objects: merge transfer() with stage()

* objects: drop streamfile

* objects: preserve the signature of get_file_hash

* objects: override hash_jobs if jobs= passed

* objects: support no progress bar directive

* tests: spy on _get_file_hash

* objects: update=... directive

* update: only transfer missing caches with --to-remote

* tests: remove local_remote from the primary test (due to it moves)

* objects: add move=True/False option

* objects: implement _get_file_obj on top of get_file_hash

* objects: use threading on save()

* objects: pass jobs= on to-cache as well

* objects: use in place tqdm

* objects: check for FileExistsError

* Update dvc/objects/__init__.py

Co-authored-by: Ruslan Kuprieiev <kupruser@gmail.com>

Co-authored-by: Ruslan Kuprieiev <kupruser@gmail.com>
  • Loading branch information
isidentical and efiop authored Apr 19, 2021
1 parent a8b7fda commit 68897aa
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 144 deletions.
9 changes: 7 additions & 2 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,15 @@ def pull(
show_checksums=show_checksums,
)

def transfer(self, source, jobs=None, remote=None, command=None):
def transfer(
self, source, jobs=None, update=False, remote=None, command=None
):
"""Transfer data items in a cloud-agnostic way.
Args:
source (str): url for the source location.
jobs (int): number of jobs that can be running simultaneously.
update (bool): whether to update existing data or sync fully
remote (dvc.remote.base.BaseRemote): optional remote to compare
cache to. By default remote from core.remote config option
is used.
Expand All @@ -108,7 +111,9 @@ def transfer(self, source, jobs=None, remote=None, command=None):

from_fs = get_cloud_fs(self.repo, url=source)
remote = self.get_remote(remote, command)
return remote.transfer(from_fs, from_fs.path_info, jobs=jobs)
return remote.transfer(
from_fs, from_fs.path_info, jobs=jobs, update=update
)

def status(
self,
Expand Down
23 changes: 20 additions & 3 deletions dvc/objects/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

from dvc.progress import Tqdm

Expand All @@ -7,10 +8,26 @@
logger = logging.getLogger(__name__)


def save(odb, obj, **kwargs):
def save(odb, obj, jobs=None, **kwargs):
if isinstance(obj, Tree):
for _, entry in Tqdm(obj):
odb.add(entry.path_info, entry.fs, entry.hash_info, **kwargs)
with ThreadPoolExecutor(max_workers=jobs) as executor:
for future in Tqdm(
as_completed(
executor.submit(
odb.add,
entry.path_info,
entry.fs,
entry.hash_info,
**kwargs
)
for _, entry in obj
),
desc="Saving files",
total=len(obj),
unit="file",
):
future.result()

odb.add(obj.path_info, obj.fs, obj.hash_info, **kwargs)


Expand Down
34 changes: 28 additions & 6 deletions dvc/objects/db/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from typing import Optional
Expand Down Expand Up @@ -42,7 +43,7 @@ def get(self, hash_info):
hash_info,
)

def add(self, path_info, fs, hash_info, **kwargs):
def add(self, path_info, fs, hash_info, move=True, **kwargs):
try:
self.check(hash_info)
return
Expand All @@ -52,11 +53,32 @@ def add(self, path_info, fs, hash_info, **kwargs):
cache_info = self.hash_to_path_info(hash_info.value)
# using our makedirs to create dirs with proper permissions
self.makedirs(cache_info.parent)
if isinstance(fs, type(self.fs)):
self.fs.move(path_info, cache_info)
else:
with fs.open(path_info, mode="rb") as fobj:
self.fs.upload_fobj(fobj, cache_info)
use_move = isinstance(fs, type(self.fs)) and move
try:
if use_move:
self.fs.move(path_info, cache_info)
else:
with fs.open(path_info, mode="rb") as fobj:
self.fs.upload_fobj(fobj, cache_info)
except OSError as exc:
# If the target file already exists, we are going to simply
# ignore the exception (#4992).
#
# On Windows, it is not always guaranteed that you'll get
# FileExistsError (it might be PermissionError or a bare OSError)
# but all of those exceptions raised from the original
# FileExistsError so we have a separate check for that.
if isinstance(exc, FileExistsError) or (
os.name == "nt"
and exc.__context__
and isinstance(exc.__context__, FileExistsError)
):
logger.debug("'%s' file already exists, skipping", path_info)
if use_move:
fs.remove(path_info)
else:
raise

self.protect(cache_info)
self.fs.repo.state.save(cache_info, self.fs, hash_info)

Expand Down
3 changes: 2 additions & 1 deletion dvc/objects/file.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging

from .errors import ObjectFormatError
from .stage import get_file_hash

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -37,6 +36,8 @@ def __eq__(self, other):
)

def check(self, odb):
from .stage import get_file_hash

actual = get_file_hash(
self.path_info, self.fs, self.hash_info.name, odb.repo.state
)
Expand Down
91 changes: 63 additions & 28 deletions dvc/objects/stage.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
import errno
import os
from concurrent.futures import ThreadPoolExecutor
from functools import partial

from dvc.exceptions import DvcIgnoreInCollectedDirError
from dvc.hash_info import HashInfo
from dvc.ignore import DvcIgnore
from dvc.objects.file import HashFile
from dvc.progress import Tqdm
from dvc.utils import file_md5


def _upload_file(path_info, fs, odb):
from dvc.utils import tmp_fname
from dvc.utils.stream import HashedStreamReader

tmp_info = odb.fs.path_info / tmp_fname()
with fs.open(path_info, mode="rb", chunk_size=fs.CHUNK_SIZE) as stream:
stream = HashedStreamReader(stream)
odb.fs.upload_fobj(
stream, tmp_info, desc=path_info.name, total=fs.getsize(path_info)
)

obj = HashFile(tmp_info, odb.fs, stream.hash_info)
return path_info, obj


def _get_file_hash(path_info, fs, name):
info = fs.info(path_info)
if name in info:
Expand Down Expand Up @@ -49,43 +66,64 @@ def get_file_hash(path_info, fs, name, state=None):
return hash_info


def _calculate_hashes(path_info, fs, name, state, **kwargs):
def _hash(file_info):
return file_info, get_file_hash(file_info, fs, name, state)
def _get_file_obj(path_info, fs, name, odb=None, state=None, upload=False):
if upload:
assert odb and name == "md5"
return _upload_file(path_info, fs, odb)

obj = HashFile(
path_info, fs, get_file_hash(path_info, fs, name, state=state)
)
return path_info, obj


def _build_objects(path_info, fs, name, odb, state, upload, **kwargs):
with Tqdm(
unit="md5", desc="Computing file/dir hashes (only done once)",
unit="md5",
desc="Computing file/dir hashes (only done once)",
disable=kwargs.pop("no_progress_bar", False),
) as pbar:
worker = pbar.wrap_fn(_hash)
with ThreadPoolExecutor(max_workers=fs.hash_jobs) as executor:
pairs = executor.map(worker, fs.walk_files(path_info, **kwargs))
return dict(pairs)
worker = pbar.wrap_fn(
partial(
_get_file_obj,
fs=fs,
name=name,
odb=odb,
state=state,
upload=upload,
)
)
with ThreadPoolExecutor(
max_workers=kwargs.pop("jobs", fs.hash_jobs)
) as executor:
yield from executor.map(worker, fs.walk_files(path_info, **kwargs))


def _iter_hashes(path_info, fs, name, state, **kwargs):
if name in fs.DETAIL_FIELDS:
def _iter_objects(path_info, fs, name, odb, state, upload, **kwargs):
if not upload and name in fs.DETAIL_FIELDS:
for details in fs.ls(path_info, recursive=True, detail=True):
file_info = path_info.replace(path=details["name"])
hash_info = HashInfo(
name, details[name], size=details.get("size"),
)
yield file_info, hash_info
yield file_info, HashFile(file_info, fs, hash_info)

return None

yield from _calculate_hashes(path_info, fs, name, state, **kwargs).items()
yield from _build_objects(
path_info, fs, name, odb, state, upload, **kwargs
)


def _build_tree(path_info, fs, name, state, **kwargs):
from .file import HashFile
def _build_tree(path_info, fs, name, odb, state, upload, **kwargs):
from .tree import Tree

tree = Tree(None, None, None)
for fi, hi in _iter_hashes(path_info, fs, name, state, **kwargs):
if DvcIgnore.DVCIGNORE_FILE == fi.name:
raise DvcIgnoreInCollectedDirError(fi.parent)

obj = HashFile(fi, fs, hi)
for file_info, obj in _iter_objects(
path_info, fs, name, odb, state, upload, **kwargs
):
if DvcIgnore.DVCIGNORE_FILE == file_info.name:
raise DvcIgnoreInCollectedDirError(file_info.parent)

# NOTE: this is lossy transformation:
# "hey\there" -> "hey/there"
Expand All @@ -95,14 +133,14 @@ def _build_tree(path_info, fs, name, state, **kwargs):
#
# Yes, this is a BUG, as long as we permit "/" in
# filenames on Windows and "\" on Unix
tree.add(fi.relative_to(path_info).parts, obj)
tree.add(file_info.relative_to(path_info).parts, obj)

tree.digest()

return tree


def _get_tree_obj(path_info, fs, name, odb, state, **kwargs):
def _get_tree_obj(path_info, fs, name, odb, state, upload, **kwargs):
from .tree import Tree

value = fs.info(path_info).get(name)
Expand All @@ -113,7 +151,7 @@ def _get_tree_obj(path_info, fs, name, odb, state, **kwargs):
except FileNotFoundError:
pass

tree = _build_tree(path_info, fs, name, state, **kwargs)
tree = _build_tree(path_info, fs, name, odb, state, upload, **kwargs)

odb.add(tree.path_info, tree.fs, tree.hash_info)
if name != "md5":
Expand All @@ -132,7 +170,7 @@ def _get_tree_obj(path_info, fs, name, odb, state, **kwargs):
return tree


def stage(odb, path_info, fs, name, **kwargs):
def stage(odb, path_info, fs, name, upload=False, **kwargs):
assert path_info and (
isinstance(path_info, str) or path_info.scheme == fs.scheme
)
Expand Down Expand Up @@ -174,12 +212,9 @@ def stage(odb, path_info, fs, name, **kwargs):
return obj

if fs.isdir(path_info):
obj = _get_tree_obj(path_info, fs, name, odb, state, **kwargs)
obj = _get_tree_obj(path_info, fs, name, odb, state, upload, **kwargs)
else:
from .file import HashFile

hash_info = get_file_hash(path_info, fs, name, state)
obj = HashFile(path_info, fs, hash_info)
_, obj = _get_file_obj(path_info, fs, name, odb, state, upload)

if obj.hash_info and fs.exists(path_info):
state.save(path_info, fs, obj.hash_info)
Expand Down
92 changes: 0 additions & 92 deletions dvc/objects/transfer.py

This file was deleted.

Loading

0 comments on commit 68897aa

Please sign in to comment.