diff --git a/dvc/fs/webhdfs.py b/dvc/fs/webhdfs.py index 80a2682d44..224732cf7f 100644 --- a/dvc/fs/webhdfs.py +++ b/dvc/fs/webhdfs.py @@ -1,61 +1,25 @@ -import errno -import logging -import os -import posixpath -import shutil import threading -from contextlib import contextmanager from funcy import cached_property, wrap_prop from dvc.hash_info import HashInfo from dvc.path_info import CloudURLInfo -from dvc.progress import DEFAULT_CALLBACK from dvc.scheme import Schemes -from .base import BaseFileSystem +# pylint:disable=abstract-method +from .fsspec_wrapper import CallbackMixin, FSSpecWrapper -logger = logging.getLogger(__name__) - -def update_pbar(pbar, total): - """Update pbar to accept the two arguments passed by hdfs""" - - def update(_, bytes_transfered): - if bytes_transfered == -1: - pbar.update_to(total) - return - pbar.update_to(bytes_transfered) - - return update - - -def update_callback(callback, total): - def update(_, bytes_transfered): - if bytes_transfered == -1: - return callback.absolute_update(total) - return callback.relative_update(bytes_transfered) - - return update - - -class WebHDFSFileSystem(BaseFileSystem): # pylint:disable=abstract-method +class WebHDFSFileSystem(CallbackMixin, FSSpecWrapper): scheme = Schemes.WEBHDFS PATH_CLS = CloudURLInfo - REQUIRES = {"hdfs": "hdfs"} + REQUIRES = {"fsspec": "fsspec"} PARAM_CHECKSUM = "checksum" - TRAVERSE_PREFIX_LEN = 2 - - def __init__(self, **config): - super().__init__(**config) - - self.host = config["host"] - self.user = config.get("user") - self.port = config.get("port") - self.hdfscli_config = config.get("hdfscli_config") - self.token = config.get("webhdfs_token") - self.alias = config.get("webhdfs_alias") + def _with_bucket(self, path): + if isinstance(path, self.PATH_CLS): + return f"/{path.path.rstrip('/')}" + return path @staticmethod def _get_kwargs_from_urls(urlpath): @@ -67,120 +31,23 @@ def _get_kwargs_from_urls(urlpath): ) ) - @wrap_prop(threading.Lock()) - @cached_property - def hdfs_client(self): - import hdfs - - logger.debug("HDFSConfig: %s", self.hdfscli_config) - - try: - return hdfs.config.Config(self.hdfscli_config).get_client( - self.alias - ) - except hdfs.util.HdfsError as exc: - exc_msg = str(exc) - errors = ( - "No alias specified", - "Invalid configuration file", - f"Alias {self.alias} not found", - ) - if not any(err in exc_msg for err in errors): - raise - - http_url = f"http://{self.host}:{self.port}" - logger.debug("URL: %s", http_url) - - if self.token is not None: - client = hdfs.TokenClient(http_url, token=self.token, root="/") - else: - client = hdfs.InsecureClient( - http_url, user=self.user, root="/" - ) - - return client - - @contextmanager - def open(self, path_info, mode="r", encoding=None, **kwargs): - assert mode in {"r", "rt", "rb"} + def _prepare_credentials(self, **config): + if "webhdfs_token" in config: + config["token"] = config.pop("webhdfs_token") - with self.hdfs_client.read( - path_info.path, encoding=encoding - ) as reader: - yield reader + return config - def walk_files(self, path_info, **kwargs): - if not self.exists(path_info): - return - - root = path_info.path - for path, _, fnames in self.hdfs_client.walk(root): - for fname in fnames: - yield path_info.replace(path=posixpath.join(path, fname)) - - def remove(self, path_info): - if path_info.scheme != self.scheme: - raise NotImplementedError - - self.hdfs_client.delete(path_info.path) - - def exists(self, path_info) -> bool: - assert not isinstance(path_info, list) - assert path_info.scheme == "webhdfs" - - status = self.hdfs_client.status(path_info.path, strict=False) - return status is not None + @wrap_prop(threading.Lock()) + @cached_property + def fs(self): + from fsspec.implementations.webhdfs import WebHDFS - def info(self, path_info): - st = self.hdfs_client.status(path_info.path, strict=False) - if not st: - raise FileNotFoundError( - errno.ENOENT, os.strerror(errno.ENOENT), path_info.path - ) - return {"size": st["length"], "type": "file"} + return WebHDFS(**self.fs_args) def checksum(self, path_info): - size = self.info(path_info)["size"] - return HashInfo( - "checksum", - self.hdfs_client.checksum(path_info.path)["bytes"], - size=size, - ) - - def copy(self, from_info, to_info, **_kwargs): - with self.hdfs_client.read(from_info.path) as reader: - with self.hdfs_client.write(to_info.path) as writer: - shutil.copyfileobj(reader, writer) - - def move(self, from_info, to_info): - self.hdfs_client.makedirs(to_info.parent.path) - self.hdfs_client.rename(from_info.path, to_info.path) - - def upload_fobj(self, fobj, to_info, **kwargs): - with self.hdfs_client.write(to_info.path) as fdest: - shutil.copyfileobj(fobj, fdest) - - def put_file( - self, from_file, to_info, callback=DEFAULT_CALLBACK, **kwargs - ): - total = os.path.getsize(from_file) - callback.set_size(total) + path = self._with_bucket(path_info) + ukey = self.fs.ukey(path) - self.hdfs_client.makedirs(to_info.parent.path) - return self.hdfs_client.upload( - to_info.path, - from_file, - overwrite=True, - progress=update_callback(callback, total), - ) - - def get_file( - self, from_info, to_file, callback=DEFAULT_CALLBACK, **kwargs - ): - total = self.getsize(from_info) - if total: - callback.set_size(total) - - self.hdfs_client.download( - from_info.path, to_file, progress=update_callback(callback, total) + return HashInfo( + "checksum", ukey["bytes"], size=self.getsize(path_info) ) diff --git a/requirements/tests.txt b/requirements/tests.txt index 3aff8bc560..6ec36844ef 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -21,6 +21,7 @@ google-compute-engine==2.8.13 google-cloud-storage==1.42.3 dvclive==0.3.0 pywin32>=225; sys_platform == 'win32' and python_version < '3.10' +hdfs==2.6.0 # required by collective.checkdocs Pygments==2.10.0 diff --git a/requirements/webhdfs.txt b/requirements/webhdfs.txt index d920bad6a2..e69de29bb2 100644 --- a/requirements/webhdfs.txt +++ b/requirements/webhdfs.txt @@ -1 +0,0 @@ -hdfs==2.6.0 diff --git a/tests/remotes/__init__.py b/tests/remotes/__init__.py index 388d67a342..9457ff055a 100644 --- a/tests/remotes/__init__.py +++ b/tests/remotes/__init__.py @@ -34,7 +34,7 @@ ) from .ssh import SSH, ssh, ssh_connection, ssh_server # noqa: F401; noqa: F401 from .webdav import Webdav, webdav, webdav_server # noqa: F401 -from .webhdfs import WebHDFS, real_webhdfs, webhdfs # noqa: F401 +from .webhdfs import WebHDFS, webhdfs # noqa: F401 TEST_REMOTE = "upstream" TEST_CONFIG = { diff --git a/tests/remotes/hdfs.py b/tests/remotes/hdfs.py index 20d7cfbce6..468cb8cc57 100644 --- a/tests/remotes/hdfs.py +++ b/tests/remotes/hdfs.py @@ -143,9 +143,17 @@ def _check(): fobj.write(b"test") return True except (pyarrow.ArrowException, OSError): + import traceback + + traceback.print_exc() return False - docker_services.wait_until_responsive(timeout=30.0, pause=5, check=_check) + try: + docker_services.wait_until_responsive( + timeout=30.0, pause=5, check=_check + ) + except Exception: # pylint: disable=broad-except + pytest.skip("couldn't start hdfs server") return {"hdfs": port, "webhdfs": web_port} diff --git a/tests/remotes/webhdfs.py b/tests/remotes/webhdfs.py index cdb2d7c8a2..3a48929538 100644 --- a/tests/remotes/webhdfs.py +++ b/tests/remotes/webhdfs.py @@ -1,15 +1,12 @@ import locale -import os import uuid from contextlib import contextmanager -from pathlib import Path import pytest from dvc.path_info import CloudURLInfo from .base import Base -from .hdfs import _hdfs_root, md5md5crc32c class WebHDFS(Base, CloudURLInfo): # pylint: disable=abstract-method @@ -17,7 +14,9 @@ class WebHDFS(Base, CloudURLInfo): # pylint: disable=abstract-method def _webhdfs(self): from hdfs import InsecureClient - client = InsecureClient(f"http://{self.host}:{self.port}", self.user) + client = InsecureClient( + f"http://{self.host}:{self.port}", self.user, root="/" + ) yield client def is_file(self): @@ -58,155 +57,7 @@ def read_text(self, encoding=None, errors=None): @pytest.fixture -def real_webhdfs(hdfs_server): +def webhdfs(hdfs_server): port = hdfs_server["webhdfs"] url = f"webhdfs://127.0.0.1:{port}/{uuid.uuid4()}" yield WebHDFS(url) - - -class FakeClient: - def __init__(self, *args, **kwargs): - self._root = Path(_hdfs_root.name) - - def _path(self, path): - return self._root / path.lstrip("/") - - def makedirs(self, path, permission=None): - self._path(path).mkdir( - mode=permission or 0o777, exist_ok=True, parents=True - ) - - def write(self, hdfs_path, overwrite=False): - from hdfs.util import HdfsError - - path = self._path(hdfs_path) - - if not overwrite and path.exists(): - raise HdfsError(f"Remote path {hdfs_path} already exists.") - - path.parent.mkdir(parents=True, exist_ok=True) - return path.open("wb") - - @contextmanager - def read( - self, - hdfs_path, - encoding=None, - chunk_size=0, - delimiter=None, - progress=None, - ): - pobj = self._path(hdfs_path) - if not chunk_size and not delimiter: - if encoding: - yield pobj.open("r", encoding=encoding) - else: - yield pobj.open("rb") - else: - if delimiter: - data = pobj.open("r", encoding=encoding, newline=delimiter) - else: - - def read_chunks(fobj, _chunk_size): - while True: - chunk = fobj.read(_chunk_size) - if not chunk: - break - yield chunk - - data = read_chunks( - pobj.open("rb", encoding=encoding), chunk_size - ) - - if progress: - - def reader(_hdfs_path, _progress): - nbytes = 0 - for chunk in data: - nbytes += len(chunk) - _progress(_hdfs_path, nbytes) - yield chunk - _progress(_hdfs_path, -1) - - yield reader(hdfs_path, progress) - else: - yield data - - def walk(self, hdfs_path): - import posixpath - - local_path = self._path(hdfs_path) - for local_root, dnames, fnames in os.walk(local_path): - if local_root == os.fspath(local_path): - root = hdfs_path - else: - root = posixpath.join( - hdfs_path, os.path.relpath(local_root, local_path) - ) - yield (root, dnames, fnames) - - def delete(self, hdfs_path): - return self._path(hdfs_path).unlink() - - def status(self, hdfs_path, strict=True): - from hdfs.util import HdfsError - - try: - return {"length": self._path(hdfs_path).stat().st_size} - except FileNotFoundError: - if not strict: - return None - raise HdfsError( - f"File does not exist: {hdfs_path}", - exception="FileNotFoundException", - ) - - def checksum(self, hdfs_path): - return { - "algorithm": "MD5-of-0MD5-of-512CRC32", - "bytes": md5md5crc32c(self._path(hdfs_path)) + "00000000", - "size": 28, - } - - def rename(self, from_path, to_path): - from dvc.utils.fs import move - - move(self._path(from_path), self._path(to_path)) - - def upload( - self, - hdfs_path, - local_path, - chunk_size=2 ** 16, - progress=None, - **kwargs, - ): - with open(local_path, "rb") as from_fobj: - with self.write(hdfs_path, **kwargs) as to_fobj: - nbytes = 0 - while True: - chunk = from_fobj.read(chunk_size) - if not chunk: - break - if progress: - nbytes += len(chunk) - progress(local_path, nbytes) - to_fobj.write(chunk) - - def download(self, hdfs_path, local_path, **kwargs): - from dvc.utils.fs import makedirs - - kwargs.setdefault("chunk_size", 2 ** 16) - - makedirs(os.path.dirname(local_path), exist_ok=True) - with open(local_path, "wb") as writer: - with self.read(hdfs_path, **kwargs) as reader: - for chunk in reader: - writer.write(chunk) - - -@pytest.fixture -def webhdfs(mocker): - mocker.patch("hdfs.InsecureClient", FakeClient) - url = f"webhdfs://example.com:12345/{uuid.uuid4()}" - yield WebHDFS(url) diff --git a/tests/unit/remote/test_webhdfs.py b/tests/unit/remote/test_webhdfs.py index 0e2bbca489..b267c32fae 100644 --- a/tests/unit/remote/test_webhdfs.py +++ b/tests/unit/remote/test_webhdfs.py @@ -17,7 +17,5 @@ def test_init(dvc): } fs = WebHDFSFileSystem(**config) - assert fs.token == webhdfs_token - assert fs.alias == webhdfs_alias - assert fs.user == user - assert fs.hdfscli_config == hdfscli_config + assert fs.fs_args["token"] == webhdfs_token + assert fs.fs_args["user"] == user