diff --git a/src/dvc_objects/fs/generic.py b/src/dvc_objects/fs/generic.py index 9d749b3..3cd20c6 100644 --- a/src/dvc_objects/fs/generic.py +++ b/src/dvc_objects/fs/generic.py @@ -2,9 +2,21 @@ import errno import logging import os +import threading +from concurrent import futures from contextlib import suppress from functools import wraps -from typing import TYPE_CHECKING, Any, Callable, List, Optional, Union +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Collection, + Dict, + List, + Optional, + Set, + Union, +) from fsspec.asyn import get_loop @@ -397,3 +409,118 @@ def test_links( from_fs.remove(from_file) return ret + + +def status( + fs: "FileSystem", + file_paths: Union["AnyFSPath", Collection["AnyFSPath"]], + callback: "Callback" = DEFAULT_CALLBACK, + batch_size: Optional[int] = None, +) -> Dict[str, bool]: + """Return status (existence) of files in fs. + + Runs batched fs.exists() calls in parallel with fs.ls() until all paths + have been checked. + """ + if not file_paths: + return {} + paths = {file_paths} if isinstance(file_paths, str) else set(file_paths) + if len(paths) == 1: + path = paths.pop() + return {path: fs.exists(path)} + + paths_lock = threading.Lock() + results: Dict[str, bool] = {} + results_lock = threading.Lock() + callback.set_size(len(paths)) + jobs = batch_size or fs.jobs + exists_jobs = jobs - 1 if jobs > 1 else 1 + executor = ThreadPoolExecutor(max_workers=2, cancel_on_error=True) + logger.debug("Querying status for '%d' files", len(paths)) + exist_fut = executor.submit( + _exist_query, + fs, + paths, + paths_lock, + results, + results_lock, + exists_jobs, + callback, + ) + list_fut = executor.submit( + _list_query, + fs, + paths, + paths_lock, + results, + results_lock, + callback, + ) + done, not_done = futures.wait( + [exist_fut, list_fut], return_when=futures.FIRST_COMPLETED + ) + for fut in not_done: + fut.cancel() + # NOTE: if we started a long running lsdir it will continue to run in + # the background until the task completes + executor.shutdown(wait=False) + return results + + +def _exist_query( + fs: "FileSystem", + paths: Set["AnyFSPath"], + paths_lock: threading.Lock, + results: Dict[str, bool], + results_lock: threading.Lock, + batch_size: int, + callback: "Callback", +): + while True: + with paths_lock: + if not paths: + return + batch = [paths.pop() for _ in range(batch_size) if paths] + for i, result in enumerate(fs.exists(batch, batch_size=batch_size)): + with results_lock: + path = batch[i] + if path not in results: + results[path] = result + callback.relative_update() + + +def _list_query( + fs: "FileSystem", + paths: Set["AnyFSPath"], + paths_lock: threading.Lock, + results: Dict[str, bool], + results_lock: threading.Lock, + callback: "Callback", +): + with paths_lock: + parents = {fs.path.parent(path) for path in paths} + for parent in parents: + with paths_lock: + if not paths: + return + kwargs = {} + if fs.version_aware: + kwargs["versions"] = True + contents = fs.ls(parent, **kwargs) + with paths_lock: + exist_paths = set() + for path in contents: + if path in paths: + paths.remove(path) + exist_paths.add(path) + with results_lock: + for path in exist_paths: + if path not in results: + results[path] = True + callback.relative_update() + with paths_lock, results_lock: + while paths: + path = paths.pop() + if path not in results: + results[path] = False + callback.relative_update()