Skip to content

Commit

Permalink
2.0 release (#260)
Browse files Browse the repository at this point in the history
* [breaking] utils: cleanup copyfile (#256)

utils: cleanup copyfile

* filesystem: remove fsspec version check (#257)

dvc-objects cannot enforce a particular version of fsspec.
It's upto dvc and dvc plugins to ensure this.

* cleanup callbacks (#259)

* rename to wrap_fn
  • Loading branch information
skshetry authored Dec 13, 2023
1 parent b3a4b48 commit 902b65c
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 190 deletions.
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@ classifiers = [
requires-python = ">=3.8"
dynamic = ["version"]
dependencies = [
"tqdm>=4.63.1,<5",
"shortuuid>=0.5.0",
"funcy>=1.14",
"fsspec>=2022.10.0",
"typing-extensions>=3.7.4",
"packaging>=19",
]

[project.urls]
Expand All @@ -42,11 +39,13 @@ tests = [
"pytest-mock",
"pytest-benchmark",
"reflink",
"tqdm>=4.63.1,<5",
]
dev = [
"dvc-objects[tests]",
"mypy==1.7.1",
"types-tqdm",
"typing-extensions>=3.7.4",
]

[tool.setuptools.packages.find]
Expand Down
59 changes: 19 additions & 40 deletions src/dvc_objects/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
)

from fsspec.asyn import get_loop
from funcy import once_per_args

from dvc_objects.executors import ThreadPoolExecutor, batch_coros
from dvc_objects.utils import cached_property

from .callbacks import DEFAULT_CALLBACK, Callback
from .callbacks import (
DEFAULT_CALLBACK,
Callback,
CallbackStream,
wrap_and_branch_callback,
wrap_fn,
)
from .errors import RemoteMissingDepsError

if TYPE_CHECKING:
Expand Down Expand Up @@ -60,33 +65,6 @@ def __init__(self, link: str, fs: "FileSystem", path: str) -> None:
)


@once_per_args
def check_required_version(
pkg: str, dist: str = "dvc_objects", log_level=logging.WARNING
):
from importlib import metadata

from packaging.requirements import InvalidRequirement, Requirement

try:
reqs = {
r.name: r.specifier for r in map(Requirement, metadata.requires(dist) or [])
}
version = metadata.version(pkg)
except (metadata.PackageNotFoundError, InvalidRequirement):
return

specifier = reqs.get(pkg)
if specifier and version and version not in specifier:
logger.log(
log_level,
"'%s%s' is required, but you have %r installed which is incompatible.",
pkg,
specifier,
version,
)


class FileSystem:
sep = "/"

Expand Down Expand Up @@ -176,7 +154,6 @@ def get_missing_deps(cls) -> List[str]:
def _check_requires(self, **kwargs):
from .scheme import Schemes

check_required_version(pkg="fsspec")
missing = self.get_missing_deps()
if not missing:
return
Expand Down Expand Up @@ -367,9 +344,10 @@ def exists(
loop,
)
return fut.result()
executor = ThreadPoolExecutor(max_workers=jobs, cancel_on_error=True)
with executor:
return list(executor.map(callback.wrap_fn(self.fs.exists), path))

func = wrap_fn(callback, self.fs.exists)
with ThreadPoolExecutor(max_workers=jobs, cancel_on_error=True) as executor:
return list(executor.map(func, path))

def lexists(self, path: AnyFSPath) -> bool:
return self.fs.lexists(path)
Expand Down Expand Up @@ -507,10 +485,11 @@ def info(self, path, callback=DEFAULT_CALLBACK, batch_size=None, **kwargs):
loop,
)
return fut.result()
executor = ThreadPoolExecutor(max_workers=jobs, cancel_on_error=True)
with executor:
func = partial(self.fs.info, **kwargs)
return list(executor.map(callback.wrap_fn(func), path))

func = partial(self.fs.info, **kwargs)
wrapped = wrap_fn(callback, func)
with ThreadPoolExecutor(max_workers=jobs, cancel_on_error=True) as executor:
return list(executor.map(wrapped, path))

def mkdir(
self, path: AnyFSPath, create_parents: bool = True, **kwargs: Any
Expand All @@ -531,7 +510,7 @@ def put_file(
if size:
callback.set_size(size)
if hasattr(from_file, "read"):
stream = callback.wrap_attr(cast("BinaryIO", from_file))
stream = cast("BinaryIO", CallbackStream(from_file, callback))
self.upload_fobj(stream, to_info, size=size)
else:
assert isinstance(from_file, str)
Expand Down Expand Up @@ -602,7 +581,7 @@ def put(
callback.set_size(len(from_infos))
executor = ThreadPoolExecutor(max_workers=jobs, cancel_on_error=True)
with executor:
put_file = callback.wrap_and_branch(self.put_file)
put_file = wrap_and_branch_callback(callback, self.put_file)
list(executor.imap_unordered(put_file, from_infos, to_infos))

def get(
Expand All @@ -621,7 +600,7 @@ def get_file(rpath, lpath, **kwargs):
localfs.makedirs(localfs.path.parent(lpath), exist_ok=True)
self.fs.get_file(rpath, lpath, **kwargs)

get_file = callback.wrap_and_branch(get_file)
get_file = wrap_and_branch_callback(callback, get_file)

if isinstance(from_info, list) and isinstance(to_info, list):
from_infos: List[AnyFSPath] = from_info
Expand Down
Loading

0 comments on commit 902b65c

Please sign in to comment.