Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hashfile.checkout: use multiprocessing.Pool to checkout concurrently #547

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 63 additions & 29 deletions src/dvc_data/hashfile/checkout.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import logging
from itertools import chain
from functools import partial
from multiprocessing import Pool
from typing import TYPE_CHECKING, Optional

from dvc_objects.fs.generic import test_links, transfer
from dvc_objects.fs.local import LocalFileSystem
from fsspec.callbacks import DEFAULT_CALLBACK

from dvc_data.compat import batched

from .build import build
from .diff import ROOT
from .diff import diff as odiff
Expand Down Expand Up @@ -150,56 +153,34 @@ def _diff(


class Link:
def __init__(self, links, callback: "Callback" = DEFAULT_CALLBACK):
def __init__(self, links, callback: Optional["Callback"] = None):
self._links = links
self._callback = callback

def __call__(self, cache, from_path, to_fs, to_path):
parent = to_fs.parent(to_path)
to_fs.makedirs(parent)
kw = {"callback": self._callback} if self._callback else {}
try:
transfer(
cache.fs,
from_path,
to_fs,
to_path,
links=self._links,
callback=self._callback,
**kw,
)
except FileNotFoundError as exc:
raise CheckoutError([to_path]) from exc
except OSError as exc:
raise LinkError(to_path) from exc


def _checkout( # noqa: C901
diff,
path,
fs,
cache,
force=False,
progress_callback: "Callback" = DEFAULT_CALLBACK,
relink=False,
state=None,
prompt=None,
):
if not diff:
return

links = test_links(cache.cache_types, cache.fs, cache.path, fs, path)
if not links:
raise LinkError(path)

progress_callback.set_size(sum(diff.stats.values()))
link = Link(links, callback=progress_callback)
for change in diff.deleted:
entry_path = fs.join(path, *change.old.key) if change.old.key != ROOT else path
_remove(entry_path, fs, change.old.in_cache, force=force, prompt=prompt)

def _checkout_files(changes, fs, path, cache, force, link, relink, state, prompt):
failed = []
hashes_to_update: list[tuple[str, HashInfo, None]] = []
hashes_to_update = []
is_local_fs = isinstance(fs, LocalFileSystem)
for change in chain(diff.added, diff.modified):
for change in changes:
entry_path = fs.join(path, *change.new.key) if change.new.key != ROOT else path
if change.new.oid.isdir:
fs.makedirs(entry_path)
Expand All @@ -223,6 +204,59 @@ def _checkout( # noqa: C901
if is_local_fs:
info = fs.info(entry_path)
hashes_to_update.append((entry_path, change.new.oid, info))
return failed, hashes_to_update


def _checkout(
diff,
path,
fs,
cache,
force=False,
progress_callback: "Callback" = DEFAULT_CALLBACK,
relink=False,
state=None,
prompt=None,
):
if not diff:
return

links = test_links(cache.cache_types, cache.fs, cache.path, fs, path)
if not links:
raise LinkError(path)

progress_callback.set_size(sum(diff.stats.values()))
link = Link(links, None if isinstance(fs, LocalFileSystem) else progress_callback)
for change in diff.deleted:
entry_path = fs.join(path, *change.old.key) if change.old.key != ROOT else path
_remove(entry_path, fs, change.old.in_cache, force=force, prompt=prompt)

progress_callback.relative_update(len(diff.deleted))

failed = []
hashes_to_update: list[tuple[str, HashInfo, None]] = []

checkout_func = partial(
_checkout_files,
fs=fs,
path=path,
cache=cache,
force=force,
link=link,
relink=relink,
state=state,
prompt=prompt,
)
changes = diff.added + diff.modified
if isinstance(fs, LocalFileSystem):
with Pool() as pool:
chunks = batched(changes, 1000)
for _failed, _hashes in pool.imap_unordered(checkout_func, chunks):
progress_callback.absolute_update(len(hashes_to_update))
failed.extend(_failed)
hashes_to_update.extend(_hashes)
else:
failed, hashes_to_update = checkout_func(changes)

if state is not None:
state.save_many(hashes_to_update, fs)
Expand Down
Loading