Skip to content

Commit

Permalink
webhdfs: migrate to fsspec (#6662)
Browse files Browse the repository at this point in the history
* webhdfs: migrate to fsspec

* skip the tests that can't use hdfs
  • Loading branch information
isidentical authored Oct 7, 2021
1 parent 32f435c commit 4f3b28e
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 314 deletions.
175 changes: 21 additions & 154 deletions dvc/fs/webhdfs.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,25 @@
import errno
import logging
import os
import posixpath
import shutil
import threading
from contextlib import contextmanager

from funcy import cached_property, wrap_prop

from dvc.hash_info import HashInfo
from dvc.path_info import CloudURLInfo
from dvc.progress import DEFAULT_CALLBACK
from dvc.scheme import Schemes

from .base import BaseFileSystem
# pylint:disable=abstract-method
from .fsspec_wrapper import CallbackMixin, FSSpecWrapper

logger = logging.getLogger(__name__)


def update_pbar(pbar, total):
"""Update pbar to accept the two arguments passed by hdfs"""

def update(_, bytes_transfered):
if bytes_transfered == -1:
pbar.update_to(total)
return
pbar.update_to(bytes_transfered)

return update


def update_callback(callback, total):
def update(_, bytes_transfered):
if bytes_transfered == -1:
return callback.absolute_update(total)
return callback.relative_update(bytes_transfered)

return update


class WebHDFSFileSystem(BaseFileSystem): # pylint:disable=abstract-method
class WebHDFSFileSystem(CallbackMixin, FSSpecWrapper):
scheme = Schemes.WEBHDFS
PATH_CLS = CloudURLInfo
REQUIRES = {"hdfs": "hdfs"}
REQUIRES = {"fsspec": "fsspec"}
PARAM_CHECKSUM = "checksum"
TRAVERSE_PREFIX_LEN = 2

def __init__(self, **config):
super().__init__(**config)

self.host = config["host"]
self.user = config.get("user")
self.port = config.get("port")

self.hdfscli_config = config.get("hdfscli_config")
self.token = config.get("webhdfs_token")
self.alias = config.get("webhdfs_alias")
def _with_bucket(self, path):
if isinstance(path, self.PATH_CLS):
return f"/{path.path.rstrip('/')}"
return path

@staticmethod
def _get_kwargs_from_urls(urlpath):
Expand All @@ -67,120 +31,23 @@ def _get_kwargs_from_urls(urlpath):
)
)

@wrap_prop(threading.Lock())
@cached_property
def hdfs_client(self):
import hdfs

logger.debug("HDFSConfig: %s", self.hdfscli_config)

try:
return hdfs.config.Config(self.hdfscli_config).get_client(
self.alias
)
except hdfs.util.HdfsError as exc:
exc_msg = str(exc)
errors = (
"No alias specified",
"Invalid configuration file",
f"Alias {self.alias} not found",
)
if not any(err in exc_msg for err in errors):
raise

http_url = f"http://{self.host}:{self.port}"
logger.debug("URL: %s", http_url)

if self.token is not None:
client = hdfs.TokenClient(http_url, token=self.token, root="/")
else:
client = hdfs.InsecureClient(
http_url, user=self.user, root="/"
)

return client

@contextmanager
def open(self, path_info, mode="r", encoding=None, **kwargs):
assert mode in {"r", "rt", "rb"}
def _prepare_credentials(self, **config):
if "webhdfs_token" in config:
config["token"] = config.pop("webhdfs_token")

with self.hdfs_client.read(
path_info.path, encoding=encoding
) as reader:
yield reader
return config

def walk_files(self, path_info, **kwargs):
if not self.exists(path_info):
return

root = path_info.path
for path, _, fnames in self.hdfs_client.walk(root):
for fname in fnames:
yield path_info.replace(path=posixpath.join(path, fname))

def remove(self, path_info):
if path_info.scheme != self.scheme:
raise NotImplementedError

self.hdfs_client.delete(path_info.path)

def exists(self, path_info) -> bool:
assert not isinstance(path_info, list)
assert path_info.scheme == "webhdfs"

status = self.hdfs_client.status(path_info.path, strict=False)
return status is not None
@wrap_prop(threading.Lock())
@cached_property
def fs(self):
from fsspec.implementations.webhdfs import WebHDFS

def info(self, path_info):
st = self.hdfs_client.status(path_info.path, strict=False)
if not st:
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), path_info.path
)
return {"size": st["length"], "type": "file"}
return WebHDFS(**self.fs_args)

def checksum(self, path_info):
size = self.info(path_info)["size"]
return HashInfo(
"checksum",
self.hdfs_client.checksum(path_info.path)["bytes"],
size=size,
)

def copy(self, from_info, to_info, **_kwargs):
with self.hdfs_client.read(from_info.path) as reader:
with self.hdfs_client.write(to_info.path) as writer:
shutil.copyfileobj(reader, writer)

def move(self, from_info, to_info):
self.hdfs_client.makedirs(to_info.parent.path)
self.hdfs_client.rename(from_info.path, to_info.path)

def upload_fobj(self, fobj, to_info, **kwargs):
with self.hdfs_client.write(to_info.path) as fdest:
shutil.copyfileobj(fobj, fdest)

def put_file(
self, from_file, to_info, callback=DEFAULT_CALLBACK, **kwargs
):
total = os.path.getsize(from_file)
callback.set_size(total)
path = self._with_bucket(path_info)
ukey = self.fs.ukey(path)

self.hdfs_client.makedirs(to_info.parent.path)
return self.hdfs_client.upload(
to_info.path,
from_file,
overwrite=True,
progress=update_callback(callback, total),
)

def get_file(
self, from_info, to_file, callback=DEFAULT_CALLBACK, **kwargs
):
total = self.getsize(from_info)
if total:
callback.set_size(total)

self.hdfs_client.download(
from_info.path, to_file, progress=update_callback(callback, total)
return HashInfo(
"checksum", ukey["bytes"], size=self.getsize(path_info)
)
1 change: 1 addition & 0 deletions requirements/tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ google-compute-engine==2.8.13
google-cloud-storage==1.42.3
dvclive==0.3.0
pywin32>=225; sys_platform == 'win32' and python_version < '3.10'
hdfs==2.6.0

# required by collective.checkdocs
Pygments==2.10.0
Expand Down
1 change: 0 additions & 1 deletion requirements/webhdfs.txt
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
hdfs==2.6.0
2 changes: 1 addition & 1 deletion tests/remotes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
)
from .ssh import SSH, ssh, ssh_connection, ssh_server # noqa: F401; noqa: F401
from .webdav import Webdav, webdav, webdav_server # noqa: F401
from .webhdfs import WebHDFS, real_webhdfs, webhdfs # noqa: F401
from .webhdfs import WebHDFS, webhdfs # noqa: F401

TEST_REMOTE = "upstream"
TEST_CONFIG = {
Expand Down
10 changes: 9 additions & 1 deletion tests/remotes/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,17 @@ def _check():
fobj.write(b"test")
return True
except (pyarrow.ArrowException, OSError):
import traceback

traceback.print_exc()
return False

docker_services.wait_until_responsive(timeout=30.0, pause=5, check=_check)
try:
docker_services.wait_until_responsive(
timeout=30.0, pause=5, check=_check
)
except Exception: # pylint: disable=broad-except
pytest.skip("couldn't start hdfs server")

return {"hdfs": port, "webhdfs": web_port}

Expand Down
Loading

0 comments on commit 4f3b28e

Please sign in to comment.