From aeb81a62627c5fe4a47e615cba8a479baf2f17f2 Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Fri, 15 Dec 2023 05:47:35 +0200 Subject: [PATCH] fs: replace fs.path with plain fs methods (#57) --- dvc_s3/__init__.py | 53 ++++++++++++++++++++++++++++++++++++---------- dvc_s3/path.py | 42 ------------------------------------ 2 files changed, 42 insertions(+), 53 deletions(-) delete mode 100644 dvc_s3/path.py diff --git a/dvc_s3/__init__.py b/dvc_s3/__init__.py index 407d907..b9f1c84 100644 --- a/dvc_s3/__init__.py +++ b/dvc_s3/__init__.py @@ -1,14 +1,13 @@ import os import threading from collections import defaultdict -from typing import Any, Dict +from typing import Any, Dict, Optional, Tuple +from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit from dvc.utils.objects import cached_property from dvc_objects.fs.base import ObjectFileSystem from dvc_objects.fs.errors import ConfigError -from funcy import wrap_prop - -from .path import S3Path +from funcy import first, wrap_prop _AWS_CONFIG_PATH = os.path.join(os.path.expanduser("~"), ".aws", "config") @@ -44,6 +43,8 @@ class S3FileSystem(ObjectFileSystem): REQUIRES = {"s3fs": "s3fs", "boto3": "boto3"} PARAM_CHECKSUM = "etag" + VERSION_ID_KEY = "versionId" + _GRANTS = { "grant_full_control": "GrantFullControl", "grant_read": "GrantRead", @@ -58,20 +59,50 @@ class S3FileSystem(ObjectFileSystem): "multipart_chunksize": "multipart_chunksize", } - @cached_property - def path(self) -> S3Path: - def _getcwd(): - return self.fs.root_marker + def getcwd(self): + return self.fs.root_marker - return S3Path(self.sep, getcwd=_getcwd) + @classmethod + def split_version(cls, path: str) -> Tuple[str, Optional[str]]: + parts = list(urlsplit(path)) + query = parse_qs(parts[3]) + if cls.VERSION_ID_KEY in query: + version_id = first(query[cls.VERSION_ID_KEY]) + del query[cls.VERSION_ID_KEY] + parts[3] = urlencode(query) + else: + version_id = None + return urlunsplit(parts), version_id + + @classmethod + def join_version(cls, path: str, version_id: Optional[str]) -> str: + parts = list(urlsplit(path)) + query = parse_qs(parts[3]) + if cls.VERSION_ID_KEY in query: + raise ValueError("path already includes a version query") + parts[3] = f"{cls.VERSION_ID_KEY}={version_id}" if version_id else "" + return urlunsplit(parts) + + @classmethod + def version_path(cls, path: str, version_id: Optional[str]) -> str: + path, _ = cls.split_version(path) + return cls.join_version(path, version_id) + + @classmethod + def coalesce_version( + cls, path: str, version_id: Optional[str] + ) -> Tuple[str, Optional[str]]: + path, path_version_id = cls.split_version(path) + versions = {ver for ver in (version_id, path_version_id) if ver} + if len(versions) > 1: + raise ValueError("Path version mismatch: '{path}', '{version_id}'") + return path, (versions.pop() if versions else None) @classmethod def _get_kwargs_from_urls(cls, urlpath: str) -> Dict[str, Any]: ret = super()._get_kwargs_from_urls(urlpath) url_query = ret.get("url_query") if url_query is not None: - from urllib.parse import parse_qs - parsed = parse_qs(url_query) if "versionId" in parsed: ret["version_aware"] = True diff --git a/dvc_s3/path.py b/dvc_s3/path.py deleted file mode 100644 index 84c5195..0000000 --- a/dvc_s3/path.py +++ /dev/null @@ -1,42 +0,0 @@ -from typing import Optional, Tuple -from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit - -from dvc_objects.fs.base import AnyFSPath -from dvc_objects.fs.path import Path -from funcy import first - - -class S3Path(Path): - VERSION_ID_KEY = "versionId" - - def split_version(self, path: AnyFSPath) -> Tuple[str, Optional[str]]: - parts = list(urlsplit(path)) - query = parse_qs(parts[3]) - if self.VERSION_ID_KEY in query: - version_id = first(query[self.VERSION_ID_KEY]) - del query[self.VERSION_ID_KEY] - parts[3] = urlencode(query) - else: - version_id = None - return urlunsplit(parts), version_id - - def join_version(self, path: AnyFSPath, version_id: Optional[str]) -> str: - parts = list(urlsplit(path)) - query = parse_qs(parts[3]) - if self.VERSION_ID_KEY in query: - raise ValueError("path already includes a version query") - parts[3] = f"{self.VERSION_ID_KEY}={version_id}" if version_id else "" - return urlunsplit(parts) - - def version_path(self, path: AnyFSPath, version_id: Optional[str]) -> str: - path, _ = self.split_version(path) - return self.join_version(path, version_id) - - def coalesce_version( - self, path: AnyFSPath, version_id: Optional[str] - ) -> Tuple[AnyFSPath, Optional[str]]: - path, path_version_id = self.split_version(path) - versions = {ver for ver in (version_id, path_version_id) if ver} - if len(versions) > 1: - raise ValueError("Path version mismatch: '{path}', '{version_id}'") - return path, (versions.pop() if versions else None)