Skip to content

Commit

Permalink
fs: add generic status implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
pmrowla committed Jan 18, 2023
1 parent 8634792 commit 1495b96
Showing 1 changed file with 128 additions and 1 deletion.
129 changes: 128 additions & 1 deletion src/dvc_objects/fs/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,21 @@
import errno
import logging
import os
import threading
from concurrent import futures
from contextlib import suppress
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Union
from typing import (
TYPE_CHECKING,
Any,
Callable,
Collection,
Dict,
List,
Optional,
Set,
Union,
)

from fsspec.asyn import get_loop

Expand Down Expand Up @@ -397,3 +409,118 @@ def test_links(
from_fs.remove(from_file)

return ret


def status(
fs: "FileSystem",
file_paths: Union["AnyFSPath", Collection["AnyFSPath"]],
callback: "Callback" = DEFAULT_CALLBACK,
batch_size: Optional[int] = None,
) -> Dict[str, bool]:
"""Return status (existence) of files in fs.
Runs batched fs.exists() calls in parallel with fs.ls() until all paths
have been checked.
"""
if not file_paths:
return {}
paths = {file_paths} if isinstance(file_paths, str) else set(file_paths)
if len(paths) == 1:
path = paths.pop()
return {path: fs.exists(path)}

paths_lock = threading.Lock()
results: Dict[str, bool] = {}
results_lock = threading.Lock()
callback.set_size(len(paths))
jobs = batch_size or fs.jobs
exists_jobs = jobs - 1 if jobs > 1 else 1
executor = ThreadPoolExecutor(max_workers=2, cancel_on_error=True)
logger.debug("Querying status for '%d' files", len(paths))
exist_fut = executor.submit(
_exist_query,
fs,
paths,
paths_lock,
results,
results_lock,
exists_jobs,
callback,
)
list_fut = executor.submit(
_list_query,
fs,
paths,
paths_lock,
results,
results_lock,
callback,
)
done, not_done = futures.wait(
[exist_fut, list_fut], return_when=futures.FIRST_COMPLETED
)
for fut in not_done:
fut.cancel()
# NOTE: if we started a long running lsdir it will continue to run in
# the background until the task completes
executor.shutdown(wait=False)
return results


def _exist_query(
fs: "FileSystem",
paths: Set["AnyFSPath"],
paths_lock: threading.Lock,
results: Dict[str, bool],
results_lock: threading.Lock,
batch_size: int,
callback: "Callback",
):
while True:
with paths_lock:
if not paths:
return
batch = [paths.pop() for _ in range(batch_size) if paths]
for i, result in enumerate(fs.exists(batch, batch_size=batch_size)):
with results_lock:
path = batch[i]
if path not in results:
results[path] = result
callback.relative_update()


def _list_query(
fs: "FileSystem",
paths: Set["AnyFSPath"],
paths_lock: threading.Lock,
results: Dict[str, bool],
results_lock: threading.Lock,
callback: "Callback",
):
with paths_lock:
parents = {fs.path.parent(path) for path in paths}
for parent in parents:
with paths_lock:
if not paths:
return
kwargs = {}
if fs.version_aware:
kwargs["versions"] = True
contents = fs.ls(parent, **kwargs)
with paths_lock:
exist_paths = set()
for path in contents:
if path in paths:
paths.remove(path)
exist_paths.add(path)
with results_lock:
for path in exist_paths:
if path not in results:
results[path] = True
callback.relative_update()
with paths_lock, results_lock:
while paths:
path = paths.pop()
if path not in results:
results[path] = False
callback.relative_update()

0 comments on commit 1495b96

Please sign in to comment.