Skip to content

Commit

Permalink
index/checkout: hash file concurrently and save state in batches (#546)
Browse files Browse the repository at this point in the history
  • Loading branch information
skshetry authored Aug 7, 2024
1 parent 0bc4e3a commit 6ad5866
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 33 deletions.
88 changes: 58 additions & 30 deletions src/dvc_data/index/build.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Iterable
from itertools import chain
from collections.abc import Iterable, Iterator
from itertools import chain, repeat
from typing import TYPE_CHECKING, Any, Optional

from dvc_objects.fs.local import LocalFileSystem
Expand All @@ -13,6 +13,7 @@
from dvc_objects.fs.base import FileSystem

from dvc_data.hashfile._ignore import Ignore
from dvc_data.hashfile.hash_info import HashInfo
from dvc_data.hashfile.state import StateBase


Expand All @@ -39,6 +40,37 @@ def build_entry(
)


def safe_walk(
path: str,
fs: "FileSystem",
ignore: Optional["Ignore"] = None,
) -> Iterator[tuple[str, dict[str, dict], dict[str, dict], set[str]]]:
if not isinstance(fs, LocalFileSystem):
for root, dirs, files in fs.walk(path, detail=True):
yield root, dirs, files, set()

return

# NOTE: can't use detail=True with walk, because that will make it error
# out on broken symlinks.
sep = fs.sep
walk_iter = ignore.walk(fs, path, detail=False) if ignore else fs.walk(path)
for root, dirs, files in walk_iter:
_dirs: dict[str, dict] = {}
_files: dict[str, dict] = {}
broken = set()

for name, d in chain(zip(dirs, repeat(_dirs)), zip(files, repeat(_files))):
p = f"{root}{sep}{name}"
try:
d[name] = fs.info(p)
except FileNotFoundError:
d[name] = {}
broken.add(name)
yield root, _dirs, _files, broken
dirs[:] = list(_dirs)


def build_entries(
path: str,
fs: "FileSystem",
Expand All @@ -47,40 +79,36 @@ def build_entries(
state: Optional["StateBase"] = None,
hash_name: str = DEFAULT_ALGORITHM,
) -> Iterable[DataIndexEntry]:
# NOTE: can't use detail=True with walk, because that will make it error
# out on broken symlinks.
detail = not isinstance(fs, LocalFileSystem)
if ignore:
walk_iter = ignore.walk(fs, path, detail=detail)
else:
walk_iter = fs.walk(path, detail=detail)
from dvc_data.hashfile.build import _get_hashes

for root, dirs, files in walk_iter:
sep = fs.sep
for root, dirs, files, broken in safe_walk(path, fs, ignore=ignore):
if root == path:
root_key: tuple[str, ...] = ()
else:
root_key = fs.relparts(root, path)

entries: Iterable[tuple[str, Optional[dict]]]
if detail:
entries = chain(dirs.items(), files.items())
else:
entries = ((name, None) for name in chain(dirs, files))

for name, info in entries:
try:
entry = build_entry(
fs.join(root, name),
fs,
compute_hash=compute_hash,
state=state,
hash_name=hash_name,
info=info,
)
except FileNotFoundError:
entry = DataIndexEntry()
entry.key = (*root_key, name)
yield entry
hashes: dict[str, tuple[Meta, HashInfo, dict]] = {}
if compute_hash:
file_infos = {
f"{root}{sep}{name}": info for name, info in files.items() if info
}
file_paths = list(file_infos)
hashes = _get_hashes(file_paths, fs, hash_name, file_infos, state=state)

for name, info in chain(dirs.items(), files.items()):
key = (*root_key, name)
if name in broken:
yield DataIndexEntry(key=key)
continue

p = f"{root}{sep}{name}"
if p in hashes:
meta, hash_info, _ = hashes[p]
else:
meta, hash_info = Meta.from_info(info, fs.protocol), None
loaded = meta.isdir or None
yield DataIndexEntry(key=key, meta=meta, hash_info=hash_info, loaded=loaded)


def build(path: str, fs: "FileSystem", ignore: Optional["Ignore"] = None) -> DataIndex:
Expand Down
8 changes: 5 additions & 3 deletions src/dvc_data/index/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dvc_objects.fs.base import AnyFSPath, FileSystem
from fsspec import Callback

from dvc_data.hashfile.hash_info import HashInfo
from dvc_data.hashfile.state import StateBase

from .diff import Change
Expand Down Expand Up @@ -123,15 +124,16 @@ def _create_files( # noqa: C901, PLR0912, PLR0913

_check_versioning(dest_paths, fs)

if state:
if state and isinstance(fs, LocalFileSystem):
_infos: list[tuple[str, HashInfo, dict]] = []
for entry, _, dest_path in args:
if not entry.hash_info:
continue

try:
state.save(dest_path, fs, entry.hash_info)
_infos.append((dest_path, entry.hash_info, fs.info(dest_path)))
except FileNotFoundError:
continue
state.save_many(_infos, fs)

if update_meta:
if callback == DEFAULT_CALLBACK:
Expand Down

0 comments on commit 6ad5866

Please sign in to comment.