diff --git a/luigi/contrib/hdfs/__init__.py b/luigi/contrib/hdfs/__init__.py index cf9d8b7558..ecf354eb40 100644 --- a/luigi/contrib/hdfs/__init__.py +++ b/luigi/contrib/hdfs/__init__.py @@ -18,7 +18,7 @@ """ Provides access to HDFS using the :py:class:`HdfsTarget`, a subclass of :py:class:`~luigi.target.Target`. You can configure what client by setting the "client" config under the "hdfs" section in the configuration, or using the ``--hdfs-client`` command line option. -"hadoopcli" is the slowest, but should work out of the box. "snakebite" is the fastest, but requires Snakebite to be installed. +"hadoopcli" is the slowest, but should work out of the box. Since the hdfs functionality is quite big in luigi, it's split into smaller files under ``luigi/contrib/hdfs/*.py``. But for the sake of convenience and @@ -29,7 +29,6 @@ from luigi.contrib.hdfs import config as hdfs_config from luigi.contrib.hdfs import clients as hdfs_clients from luigi.contrib.hdfs import error as hdfs_error -from luigi.contrib.hdfs import snakebite_client as hdfs_snakebite_client from luigi.contrib.hdfs import hadoopcli_clients as hdfs_hadoopcli_clients from luigi.contrib.hdfs import webhdfs_client as hdfs_webhdfs_client from luigi.contrib.hdfs import format as hdfs_format @@ -47,9 +46,7 @@ # clients HDFSCliError = hdfs_error.HDFSCliError call_check = hdfs_hadoopcli_clients.HdfsClient.call_check -list_path = hdfs_snakebite_client.SnakebiteHdfsClient.list_path HdfsClient = hdfs_hadoopcli_clients.HdfsClient -SnakebiteHdfsClient = hdfs_snakebite_client.SnakebiteHdfsClient WebHdfsClient = hdfs_webhdfs_client.WebHdfsClient HdfsClientCdh3 = hdfs_hadoopcli_clients.HdfsClientCdh3 HdfsClientApache1 = hdfs_hadoopcli_clients.HdfsClientApache1 diff --git a/luigi/contrib/hdfs/clients.py b/luigi/contrib/hdfs/clients.py index a45a3b8f86..d3c113a9b9 100644 --- a/luigi/contrib/hdfs/clients.py +++ b/luigi/contrib/hdfs/clients.py @@ -16,17 +16,14 @@ # """ -The implementations of the hdfs clients. The hadoop cli client and the -snakebite client. +The implementations of the hdfs clients. """ import logging import threading from luigi.contrib.hdfs import config as hdfs_config -from luigi.contrib.hdfs import snakebite_client as hdfs_snakebite_client from luigi.contrib.hdfs import webhdfs_client as hdfs_webhdfs_client from luigi.contrib.hdfs import hadoopcli_clients as hdfs_hadoopcli_clients -import luigi.contrib.target logger = logging.getLogger('luigi-interface') @@ -43,13 +40,6 @@ def get_autoconfig_client(client_cache=_AUTOCONFIG_CLIENT): configured_client = hdfs_config.get_configured_hdfs_client() if configured_client == "webhdfs": client_cache.client = hdfs_webhdfs_client.WebHdfsClient() - elif configured_client == "snakebite": - client_cache.client = hdfs_snakebite_client.SnakebiteHdfsClient() - elif configured_client == "snakebite_with_hadoopcli_fallback": - client_cache.client = luigi.contrib.target.CascadingClient([ - hdfs_snakebite_client.SnakebiteHdfsClient(), - hdfs_hadoopcli_clients.create_hadoopcli_client(), - ]) elif configured_client == "hadoopcli": client_cache.client = hdfs_hadoopcli_clients.create_hadoopcli_client() else: diff --git a/luigi/contrib/hdfs/config.py b/luigi/contrib/hdfs/config.py index e79f48593e..e80ca12fa9 100644 --- a/luigi/contrib/hdfs/config.py +++ b/luigi/contrib/hdfs/config.py @@ -17,14 +17,12 @@ """ You can configure what client by setting the "client" config under the "hdfs" section in the configuration, or using the ``--hdfs-client`` command line option. -"hadoopcli" is the slowest, but should work out of the box. "snakebite" is the fastest, but requires Snakebite to be installed. +"hadoopcli" is the slowest, but should work out of the box. """ import random import luigi import luigi.configuration -from luigi import six -import warnings import os import getpass @@ -33,12 +31,6 @@ class hdfs(luigi.Config): client_version = luigi.IntParameter(default=None) - effective_user = luigi.OptionalParameter( - default=os.getenv('HADOOP_USER_NAME'), - description="Optionally specifies the effective user for snakebite. " - "If not set the environment variable HADOOP_USER_NAME is " - "used, else USER") - snakebite_autoconfig = luigi.BoolParameter(default=False) namenode_host = luigi.OptionalParameter(default=None) namenode_port = luigi.IntParameter(default=None) client = luigi.Parameter(default='hadoopcli') @@ -52,7 +44,7 @@ class hadoopcli(luigi.Config): command = luigi.Parameter(default="hadoop", config_path=dict(section="hadoop", name="command"), description='The hadoop command, will run split() on it, ' - 'so you can pass something like "hadoop --param"') + 'so you can pass something like "hadoop --param"') version = luigi.Parameter(default="cdh4", config_path=dict(section="hadoop", name="version"), description='Can also be cdh3 or apache1') @@ -80,20 +72,7 @@ def get_configured_hdfs_client(): the [hdfs] section. It will return the client that retains backwards compatibility when 'client' isn't configured. """ - config = hdfs() - custom = config.client - conf_usinf_snakebite = [ - "snakebite_with_hadoopcli_fallback", - "snakebite", - ] - if six.PY3 and (custom in conf_usinf_snakebite): - warnings.warn( - "snakebite client not compatible with python3 at the moment" - "falling back on hadoopcli", - stacklevel=2 - ) - return "hadoopcli" - return custom + return hdfs().client def tmppath(path=None, include_unix_username=True): diff --git a/luigi/contrib/hdfs/error.py b/luigi/contrib/hdfs/error.py index 59a726cfe6..e89f2891f2 100644 --- a/luigi/contrib/hdfs/error.py +++ b/luigi/contrib/hdfs/error.py @@ -16,8 +16,7 @@ # """ -The implementations of the hdfs clients. The hadoop cli client and the -snakebite client. +The implementations of the hdfs clients. """ diff --git a/luigi/contrib/hdfs/hadoopcli_clients.py b/luigi/contrib/hdfs/hadoopcli_clients.py index e70622eafb..3e2e1dff49 100644 --- a/luigi/contrib/hdfs/hadoopcli_clients.py +++ b/luigi/contrib/hdfs/hadoopcli_clients.py @@ -16,8 +16,7 @@ # """ -The implementations of the hdfs clients. The hadoop cli client and the -snakebite client. +The implementations of the hdfs clients. """ @@ -38,7 +37,7 @@ def create_hadoopcli_client(): """ - Given that we want one of the hadoop cli clients (unlike snakebite), + Given that we want one of the hadoop cli clients, this one will return the right one. """ version = hdfs_config.get_configured_hadoop_version() diff --git a/luigi/contrib/hdfs/snakebite_client.py b/luigi/contrib/hdfs/snakebite_client.py deleted file mode 100644 index b6cd7aedfc..0000000000 --- a/luigi/contrib/hdfs/snakebite_client.py +++ /dev/null @@ -1,299 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2012-2015 Spotify AB -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -A luigi file system client that wraps around snakebite - -Originally written by Alan Brenner github.com/alanbbr -""" - - -from luigi.contrib.hdfs import config as hdfs_config -from luigi.contrib.hdfs import abstract_client as hdfs_abstract_client -from luigi import six -import luigi.contrib.target -import logging -import datetime -import os - -logger = logging.getLogger('luigi-interface') - - -class SnakebiteHdfsClient(hdfs_abstract_client.HdfsFileSystem): - """ - A hdfs client using snakebite. Since Snakebite has a python API, it'll be - about 100 times faster than the hadoop cli client, which does shell out to - a java program on each file system operation. - """ - - def __init__(self): - super(SnakebiteHdfsClient, self).__init__() - self._bite = None - self.pid = -1 - - @staticmethod - def list_path(path): - if isinstance(path, list) or isinstance(path, tuple): - return path - # TODO: Should this be: - # isinstance(path, (six.text_type, six.binary_type))? - if isinstance(path, six.string_types): - return [path, ] - return [str(path), ] - - def get_bite(self): - """ - If Luigi has forked, we have a different PID, and need to reconnect. - """ - config = hdfs_config.hdfs() - if self.pid != os.getpid() or not self._bite: - client_kwargs = dict(filter( - lambda k_v: k_v[1] is not None and k_v[1] != '', six.iteritems({ - 'hadoop_version': config.client_version, - 'effective_user': config.effective_user, - }) - )) - if config.snakebite_autoconfig: - """ - This is fully backwards compatible with the vanilla Client and can be used for a non HA cluster as well. - This client tries to read ``${HADOOP_PATH}/conf/hdfs-site.xml`` to get the address of the namenode. - The behaviour is the same as Client. - """ - from snakebite.client import AutoConfigClient - self._bite = AutoConfigClient(**client_kwargs) - else: - from snakebite.client import Client - self._bite = Client(config.namenode_host, config.namenode_port, **client_kwargs) - return self._bite - - def exists(self, path): - """ - Use snakebite.test to check file existence. - - :param path: path to test - :type path: string - :return: boolean, True if path exists in HDFS - """ - return self.get_bite().test(path, exists=True) - - def move(self, path, dest): - """ - Use snakebite.rename, if available. - - :param path: source file(s) - :type path: either a string or sequence of strings - :param dest: destination file (single input) or directory (multiple) - :type dest: string - :return: list of renamed items - """ - parts = dest.rstrip('/').split('/') - if len(parts) > 1: - dir_path = '/'.join(parts[0:-1]) - if not self.exists(dir_path): - self.mkdir(dir_path, parents=True) - return list(self.get_bite().rename(self.list_path(path), dest)) - - def rename_dont_move(self, path, dest): - """ - Use snakebite.rename_dont_move, if available. - - :param path: source path (single input) - :type path: string - :param dest: destination path - :type dest: string - :return: True if succeeded - :raises: snakebite.errors.FileAlreadyExistsException - """ - from snakebite.errors import FileAlreadyExistsException - try: - self.get_bite().rename2(path, dest, overwriteDest=False) - except FileAlreadyExistsException: - # Unfortunately python2 don't allow exception chaining. - raise luigi.target.FileAlreadyExists() - - def remove(self, path, recursive=True, skip_trash=False): - """ - Use snakebite.delete, if available. - - :param path: delete-able file(s) or directory(ies) - :type path: either a string or a sequence of strings - :param recursive: delete directories trees like \\*nix: rm -r - :type recursive: boolean, default is True - :param skip_trash: do or don't move deleted items into the trash first - :type skip_trash: boolean, default is False (use trash) - :return: list of deleted items - """ - return list(self.get_bite().delete(self.list_path(path), recurse=recursive)) - - def chmod(self, path, permissions, recursive=False): - """ - Use snakebite.chmod, if available. - - :param path: update-able file(s) - :type path: either a string or sequence of strings - :param permissions: \\*nix style permission number - :type permissions: octal - :param recursive: change just listed entry(ies) or all in directories - :type recursive: boolean, default is False - :return: list of all changed items - """ - if type(permissions) == str: - permissions = int(permissions, 8) - return list(self.get_bite().chmod(self.list_path(path), - permissions, recursive)) - - def chown(self, path, owner, group, recursive=False): - """ - Use snakebite.chown/chgrp, if available. - - One of owner or group must be set. Just setting group calls chgrp. - - :param path: update-able file(s) - :type path: either a string or sequence of strings - :param owner: new owner, can be blank - :type owner: string - :param group: new group, can be blank - :type group: string - :param recursive: change just listed entry(ies) or all in directories - :type recursive: boolean, default is False - :return: list of all changed items - """ - bite = self.get_bite() - if owner: - if group: - return all(bite.chown(self.list_path(path), "%s:%s" % (owner, group), - recurse=recursive)) - return all(bite.chown(self.list_path(path), owner, recurse=recursive)) - return list(bite.chgrp(self.list_path(path), group, recurse=recursive)) - - def count(self, path): - """ - Use snakebite.count, if available. - - :param path: directory to count the contents of - :type path: string - :return: dictionary with content_size, dir_count and file_count keys - """ - try: - res = self.get_bite().count(self.list_path(path)).next() - dir_count = res['directoryCount'] - file_count = res['fileCount'] - content_size = res['spaceConsumed'] - except StopIteration: - dir_count = file_count = content_size = 0 - return {'content_size': content_size, 'dir_count': dir_count, - 'file_count': file_count} - - def copy(self, path, destination): - """ - Raise a NotImplementedError exception. - """ - raise NotImplementedError("SnakebiteClient in luigi doesn't implement copy") - - def put(self, local_path, destination): - """ - Raise a NotImplementedError exception. - """ - raise NotImplementedError("Snakebite doesn't implement put") - - def get(self, path, local_destination): - """ - Use snakebite.copyToLocal, if available. - - :param path: HDFS file - :type path: string - :param local_destination: path on the system running Luigi - :type local_destination: string - """ - return list(self.get_bite().copyToLocal(self.list_path(path), - local_destination)) - - def get_merge(self, path, local_destination): - """ - Using snakebite getmerge to implement this. - :param path: HDFS directory - :param local_destination: path on the system running Luigi - :return: merge of the directory - """ - return list(self.get_bite().getmerge(path=path, dst=local_destination)) - - def mkdir(self, path, parents=True, mode=0o755, raise_if_exists=False): - """ - Use snakebite.mkdir, if available. - - Snakebite's mkdir method allows control over full path creation, so by - default, tell it to build a full path to work like ``hadoop fs -mkdir``. - - :param path: HDFS path to create - :type path: string - :param parents: create any missing parent directories - :type parents: boolean, default is True - :param mode: \\*nix style owner/group/other permissions - :type mode: octal, default 0755 - """ - result = list(self.get_bite().mkdir(self.list_path(path), - create_parent=parents, mode=mode)) - if raise_if_exists and "ile exists" in result[0].get('error', ''): - raise luigi.target.FileAlreadyExists("%s exists" % (path, )) - return result - - def listdir(self, path, ignore_directories=False, ignore_files=False, - include_size=False, include_type=False, include_time=False, - recursive=False): - """ - Use snakebite.ls to get the list of items in a directory. - - :param path: the directory to list - :type path: string - :param ignore_directories: if True, do not yield directory entries - :type ignore_directories: boolean, default is False - :param ignore_files: if True, do not yield file entries - :type ignore_files: boolean, default is False - :param include_size: include the size in bytes of the current item - :type include_size: boolean, default is False (do not include) - :param include_type: include the type (d or f) of the current item - :type include_type: boolean, default is False (do not include) - :param include_time: include the last modification time of the current item - :type include_time: boolean, default is False (do not include) - :param recursive: list subdirectory contents - :type recursive: boolean, default is False (do not recurse) - :return: yield with a string, or if any of the include_* settings are - true, a tuple starting with the path, and include_* items in order - """ - bite = self.get_bite() - for entry in bite.ls(self.list_path(path), recurse=recursive): - if ignore_directories and entry['file_type'] == 'd': - continue - if ignore_files and entry['file_type'] == 'f': - continue - rval = [entry['path'], ] - if include_size: - rval.append(entry['length']) - if include_type: - rval.append(entry['file_type']) - if include_time: - rval.append(datetime.datetime.fromtimestamp(entry['modification_time'] / 1000)) - if len(rval) > 1: - yield tuple(rval) - else: - yield rval[0] - - def touchz(self, path): - """ - Raise a NotImplementedError exception. - """ - raise NotImplementedError("SnakebiteClient in luigi doesn't implement touchz") diff --git a/luigi/contrib/hdfs/webhdfs_client.py b/luigi/contrib/hdfs/webhdfs_client.py index 546049fae9..38c70ecc2b 100644 --- a/luigi/contrib/hdfs/webhdfs_client.py +++ b/luigi/contrib/hdfs/webhdfs_client.py @@ -19,9 +19,6 @@ A luigi file system client that wraps around the hdfs-library (a webhdfs client) -This is a sensible fast alternative to snakebite. In particular for python3 -users, where snakebite is not supported at the time of writing (dec 2015). - Note. This wrapper client is not feature complete yet. As with most software the authors only implement the features they need. If you need to wrap more of the file system operations, please do and contribute back. diff --git a/test/contrib/hadoop_test.py b/test/contrib/hadoop_test.py index b4229c462e..177cbe101a 100644 --- a/test/contrib/hadoop_test.py +++ b/test/contrib/hadoop_test.py @@ -26,7 +26,6 @@ import luigi.contrib.hdfs import luigi.contrib.mrrunner import luigi.notifications -import minicluster import mock from luigi.mock import MockTarget from luigi.six import StringIO @@ -34,8 +33,6 @@ luigi.notifications.DEBUG = True -luigi.contrib.hadoop.attach(minicluster) - class OutputMixin(luigi.Task): use_hdfs = luigi.BoolParameter(default=False) @@ -50,10 +47,7 @@ def get_output(self, fn): class HadoopJobTask(luigi.contrib.hadoop.JobTask, OutputMixin): def job_runner(self): - if self.use_hdfs: - return minicluster.MiniClusterHadoopJobRunner() - else: - return luigi.contrib.hadoop.LocalJobRunner() + return luigi.contrib.hadoop.LocalJobRunner() class Words(OutputMixin): @@ -292,28 +286,6 @@ def setUp(self): MockTarget.fs.clear() -@attr('minicluster') -class MapreduceIntegrationTest(minicluster.MiniClusterTestCase): - - """ Uses the Minicluster functionality to test this against Hadoop """ - use_hdfs = True - - def test_run(self): - CommonTests.test_run(self) - - def test_run_2(self): - CommonTests.test_run_2(self) - - def test_map_only(self): - CommonTests.test_map_only(self) - - # TODO(erikbern): some really annoying issue with minicluster causes - # test_unicode_job to hang - - def test_failing_job(self): - CommonTests.test_failing_job(self) - - @attr('apache') class CreatePackagesArchive(unittest.TestCase): diff --git a/test/contrib/hdfs/webhdfs_client_test.py b/test/contrib/hdfs/webhdfs_client_test.py index 48a6cf4b4d..8aa89db37f 100644 --- a/test/contrib/hdfs/webhdfs_client_test.py +++ b/test/contrib/hdfs/webhdfs_client_test.py @@ -21,34 +21,9 @@ from nose.plugins.attrib import attr from helpers import with_config -from webhdfs_minicluster import WebHdfsMiniClusterTestCase -from contrib.hdfs_test import HdfsTargetTestMixin from luigi.contrib.hdfs import WebHdfsClient -@attr('minicluster') -class WebHdfsTargetTest(WebHdfsMiniClusterTestCase, HdfsTargetTestMixin): - - def run(self, result=None): - conf = {'hdfs': {'client': 'webhdfs'}, - 'webhdfs': {'port': str(self.cluster.webhdfs_port)}, - } - with_config(conf)(super(WebHdfsTargetTest, self).run)(result) - - def test_actually_using_webhdfs(self): - self.assertTrue(isinstance(self.create_target().fs, WebHdfsClient)) - - # Here is a bunch of tests that are currently failing. As should be - # mentioned in the WebHdfsClient docs, it is not yet feature complete. - test_slow_exists = None - test_glob_exists = None - test_with_close = None - test_with_exception = None - - # This one fails when run together with the whole test suite - test_write_cleanup_no_close = None - - @attr('apache') class TestWebHdfsClient(unittest.TestCase): diff --git a/test/contrib/hdfs_test.py b/test/contrib/hdfs_test.py index 9991d3c852..420bbd06d2 100644 --- a/test/contrib/hdfs_test.py +++ b/test/contrib/hdfs_test.py @@ -15,25 +15,14 @@ # limitations under the License. # -import functools import re -from helpers import unittest import random -import threading import pickle -import helpers import luigi -import mock import luigi.format from luigi.contrib import hdfs from luigi import six -from luigi.contrib.hdfs import SnakebiteHdfsClient -from luigi.contrib.hdfs.hadoopcli_clients import HdfsClient -from luigi.contrib.hdfs.format import HdfsAtomicWriteError, HdfsReadPipe -from luigi.contrib.target import CascadingClient -from minicluster import MiniClusterTestCase -from nose.plugins.attrib import attr import luigi.contrib.hdfs.clients from target_test import FileSystemTargetTestMixin @@ -57,327 +46,6 @@ class TestException(Exception): pass -@attr('minicluster') -class ConfigurationTest(MiniClusterTestCase): - - def tezt_rename_dont_move(self, client): - """ I happen to just want to test this, Since I know the codepaths will - be quite different for the three kinds of clients """ - if client.exists('d'): - client.remove('d') - client.mkdir('d/a') - client.mkdir('d/b') - self.assertEqual(2, len(list(client.listdir('d')))) - target = hdfs.HdfsTarget('d/a', fs=client) - self.assertRaises(luigi.target.FileSystemException, lambda: target.move_dir('d/b')) - self.assertEqual(2, len(list(client.listdir('d')))) - target.move_dir('d/c') - self.assertEqual(2, len(list(client.listdir('d')))) - - @helpers.with_config({"hdfs": {}}, replace_sections=True) - def test_when_not_specified(self): - self.assertEqual('hadoopcli', hdfs.config.get_configured_hdfs_client()) - - @helpers.with_config({"hdfs": {"client": "hadoopcli"}}) - def test_hadoopcli(self): - client = hdfs.get_autoconfig_client(threading.local()) - self.assertTrue(isinstance(client, HdfsClient)) - self.tezt_rename_dont_move(client) - - @unittest.skipIf(six.PY3, "snakebite doesn't work on Python 3 yet.") - @helpers.with_config({"hdfs": {"client": "snakebite"}}) - def test_snakebite(self): - client = hdfs.get_autoconfig_client(threading.local()) - self.assertTrue(isinstance(client, SnakebiteHdfsClient)) - self.tezt_rename_dont_move(client) - - @unittest.skipIf(six.PY3, "snakebite doesn't work on Python 3 yet.") - @helpers.with_config({"hdfs": {"client": "snakebite_with_hadoopcli_fallback"}}) - def test_snakebite_with_hadoopcli_fallback(self): - client = hdfs.get_autoconfig_client(threading.local()) - self.assertTrue(isinstance(client, CascadingClient)) - self.tezt_rename_dont_move(client) - - -@attr('minicluster') -class ErrorHandling(MiniClusterTestCase): - - def test_connection_refused(self): - """ The point of this test is to see if file existence checks - can distinguish file non-existence from errors - - this test would fail if hdfs would run locally on port 0 - """ - self.assertRaises( - hdfs.HDFSCliError, - self.fs.exists, - 'hdfs://127.0.0.1:0/foo' - ) - - def test_mkdir_exists(self): - path = self._test_dir() - if not self.fs.exists(path): - self.fs.mkdir(path) - self.assertTrue(self.fs.exists(path)) - self.assertRaises( - luigi.target.FileAlreadyExists, - functools.partial(self.fs.mkdir, parents=False, raise_if_exists=True), - path - ) - self.fs.remove(path, skip_trash=True) - - -@attr('minicluster') -class AtomicHdfsOutputPipeTests(MiniClusterTestCase): - - def test_atomicity(self): - testpath = self._test_dir() - if self.fs.exists(testpath): - self.fs.remove(testpath, skip_trash=True) - - pipe = hdfs.HdfsAtomicWritePipe(testpath) - self.assertFalse(self.fs.exists(testpath)) - pipe.close() - self.assertTrue(self.fs.exists(testpath)) - - def test_with_close(self): - testpath = self._test_file() - try: - if self.fs.exists(testpath): - self.fs.remove(testpath, skip_trash=True) - except BaseException: - if self.fs.exists(self._test_dir()): - self.fs.remove(self._test_dir(), skip_trash=True) - - with hdfs.HdfsAtomicWritePipe(testpath) as fobj: - fobj.write(b'hej') - - self.assertTrue(self.fs.exists(testpath)) - - def test_with_noclose(self): - testpath = self._test_file() - try: - if self.fs.exists(testpath): - self.fs.remove(testpath, skip_trash=True) - except BaseException: - if self.fs.exists(self._test_dir()): - self.fs.remove(self._test_dir(), skip_trash=True) - - def foo(): - with hdfs.HdfsAtomicWritePipe(testpath) as fobj: - fobj.write(b'hej') - raise TestException('Test triggered exception') - self.assertRaises(TestException, foo) - self.assertFalse(self.fs.exists(testpath)) - - def test_target_path_exists(self): - testpath = self._test_file() - try: - if self.fs.exists(testpath): - self.fs.remove(testpath, skip_trash=True) - except BaseException: - if self.fs.exists(self._test_dir()): - self.fs.remove(self._test_dir(), skip_trash=True) - - with hdfs.HdfsAtomicWritePipe(testpath) as fobj: - fobj.write(b'test1') - with hdfs.HdfsAtomicWritePipe(testpath) as fobj: - fobj.write(b'test2') - - with HdfsReadPipe(testpath) as read_pipe: - contents = read_pipe.read() - - self.assertEqual(b'test2', contents) - - @mock.patch('luigi.contrib.hdfs.format.remove') - def test_target_path_exists_rename_fails_hadoopcli(self, remove): - testpath = self._test_file() - try: - if self.fs.exists(testpath): - self.fs.remove(testpath, skip_trash=True) - except BaseException: - if self.fs.exists(self._test_dir()): - self.fs.remove(self._test_dir(), skip_trash=True) - - with hdfs.HdfsAtomicWritePipe(testpath) as fobj: - fobj.write(b'test1') - fobj = hdfs.HdfsAtomicWritePipe(testpath) - self.assertRaises(hdfs.HDFSCliError, fobj.close) - - @unittest.skipIf(six.PY3, "snakebite doesn't work on Python 3 yet.") - @helpers.with_config({"hdfs": {"client": "snakebite"}}) - @mock.patch('luigi.contrib.hdfs.format.rename') - @mock.patch('luigi.contrib.hdfs.format.remove') - def test_target_path_exists_rename_fails_snakebite(self, remove, rename): - rename.side_effect = hdfs.get_autoconfig_client(threading.local()).rename - testpath = self._test_file() - try: - if self.fs.exists(testpath): - self.fs.remove(testpath, skip_trash=True) - except BaseException: - if self.fs.exists(self._test_dir()): - self.fs.remove(self._test_dir(), skip_trash=True) - - with hdfs.HdfsAtomicWritePipe(testpath) as fobj: - fobj.write(b'test1') - fobj = hdfs.HdfsAtomicWritePipe(testpath) - self.assertRaises(HdfsAtomicWriteError, fobj.close) - - -@attr('minicluster') -class HdfsAtomicWriteDirPipeTests(MiniClusterTestCase): - - def setUp(self): - super(HdfsAtomicWriteDirPipeTests, self).setUp() - self.path = self._test_file() - if self.fs.exists(self.path): - self.fs.remove(self.path, skip_trash=True) - - def test_atomicity(self): - pipe = hdfs.HdfsAtomicWriteDirPipe(self.path) - self.assertFalse(self.fs.exists(self.path)) - pipe.close() - self.assertTrue(self.fs.exists(self.path)) - - def test_readback(self): - pipe = hdfs.HdfsAtomicWriteDirPipe(self.path) - self.assertFalse(self.fs.exists(self.path)) - pipe.write(b"foo\nbar") - pipe.close() - self.assertTrue(hdfs.exists(self.path)) - dirlist = hdfs.listdir(self.path) - datapath = '%s/data' % self.path - returnlist = [d for d in dirlist] - self.assertTrue(returnlist[0].endswith(datapath)) - pipe = hdfs.HdfsReadPipe(datapath) - self.assertEqual(pipe.read(), b"foo\nbar") - - def test_with_close(self): - with hdfs.HdfsAtomicWritePipe(self.path) as fobj: - fobj.write(b'hej') - - self.assertTrue(self.fs.exists(self.path)) - - def test_with_noclose(self): - def foo(): - with hdfs.HdfsAtomicWritePipe(self.path) as fobj: - fobj.write(b'hej') - raise TestException('Test triggered exception') - self.assertRaises(TestException, foo) - self.assertFalse(self.fs.exists(self.path)) - - def test_target_path_exists(self): - with hdfs.HdfsAtomicWriteDirPipe(self.path) as fobj: - fobj.write(b'test1') - with hdfs.HdfsAtomicWritePipe(self.path) as fobj: - fobj.write(b'test2') - - with HdfsReadPipe(self.path) as read_pipe: - contents = read_pipe.read() - - self.assertEqual(b'test2', contents) - - @mock.patch('luigi.contrib.hdfs.format.remove') - def test_rename_into_existing_subdir_after_failed_remove(self, remove): - with hdfs.HdfsAtomicWriteDirPipe(self.path) as fobj: - fobj.write(b'test1') - fobj = hdfs.HdfsAtomicWriteDirPipe(self.path) - self.assertRaises(HdfsAtomicWriteError, fobj.close) - - @mock.patch('luigi.contrib.hdfs.format.remove') - def test_target_path_exists_rename_fails_hadoopcli(self, remove): - with hdfs.HdfsAtomicWritePipe(self.path) as fobj: - fobj.write(b'test1') - fobj = hdfs.HdfsAtomicWriteDirPipe(self.path) - self.assertRaises(hdfs.HDFSCliError, fobj.close) - - @unittest.skipIf(six.PY3, "snakebite doesn't work on Python 3 yet.") - @helpers.with_config({"hdfs": {"client": "snakebite"}}) - @mock.patch('luigi.contrib.hdfs.format.rename') - @mock.patch('luigi.contrib.hdfs.format.remove') - def test_target_path_exists_rename_fails_snakebite(self, remove, rename): - rename.side_effect = hdfs.get_autoconfig_client(threading.local()).rename - with hdfs.HdfsAtomicWritePipe(self.path) as fobj: - fobj.write(b'test1') - fobj = hdfs.HdfsAtomicWriteDirPipe(self.path) - self.assertRaises(HdfsAtomicWriteError, fobj.close) - - -# This class is a mixin, and does not inherit from TestCase, in order to avoid running the base class as a test case. -@attr('minicluster') -class _HdfsFormatTest(object): - format = None # override with luigi.format.Format subclass - - def setUp(self): - super(_HdfsFormatTest, self).setUp() - self.target = hdfs.HdfsTarget(self._test_file(), format=self.format) - if self.target.exists(): - self.target.remove(skip_trash=True) - - def test_with_write_success(self): - with self.target.open('w') as fobj: - fobj.write(b'foo') - self.assertTrue(self.target.exists()) - - def test_with_write_failure(self): - def dummy(): - with self.target.open('w') as fobj: - fobj.write(b'foo') - raise TestException() - - self.assertRaises(TestException, dummy) - self.assertFalse(self.target.exists()) - - -@attr('minicluster') -class PlainFormatTest(_HdfsFormatTest, MiniClusterTestCase): - format = hdfs.Plain - - -@attr('minicluster') -class PlainDirFormatTest(_HdfsFormatTest, MiniClusterTestCase): - format = hdfs.PlainDir - - def test_multifile(self): - with self.target.open('w') as fobj: - fobj.write(b'foo\n') - second = hdfs.HdfsTarget(self.target.path + '/data2', format=hdfs.Plain) - - with second.open('w') as fobj: - fobj.write(b'bar\n') - invisible = hdfs.HdfsTarget(self.target.path + '/_SUCCESS', format=hdfs.Plain) - with invisible.open('w') as fobj: - fobj.write(b'b0rk\n') - self.assertTrue(second.exists()) - self.assertTrue(invisible.exists()) - self.assertTrue(self.target.exists()) - with self.target.open('r') as fobj: - parts = sorted(fobj.read().strip(b'\n').split(b'\n')) - self.assertEqual(tuple(parts), (b'bar', b'foo')) - - -@attr('minicluster') -class ComplexOldFormatTest(MiniClusterTestCase): - format = ComplexOldFormat() - - def setUp(self): - super(ComplexOldFormatTest, self).setUp() - self.target = hdfs.HdfsTarget(self._test_file(), format=self.format) - if self.target.exists(): - self.target.remove(skip_trash=True) - - def test_with_write_success(self): - with self.target.open('w') as fobj: - fobj.write(u'foo') - self.assertTrue(self.target.exists()) - - with self.target.open('r') as fobj: - a = fobj.read() - - self.assertFalse(isinstance(a, six.text_type)) - self.assertEqual(a, b'foo') - - class HdfsTargetTestMixin(FileSystemTargetTestMixin): def create_target(self, format=None): @@ -399,10 +67,12 @@ def test_slow_exists(self): def should_raise(): self.fs.exists("hdfs://doesnotexist/foo") + self.assertRaises(hdfs.HDFSCliError, should_raise) def should_raise_2(): self.fs.exists("hdfs://_doesnotexist_/foo") + self.assertRaises(hdfs.HDFSCliError, should_raise_2) def test_create_ancestors(self): @@ -572,302 +242,6 @@ def test_flag_target_fails_if_not_directory(self): hdfs.HdfsFlagTarget("/home/file.txt") -@attr('minicluster') -class HdfsTargetTest(MiniClusterTestCase, HdfsTargetTestMixin): - pass - - -@attr('minicluster') -class HdfsClientTest(MiniClusterTestCase): - - def create_file(self, target): - fobj = target.open("w") - fobj.close() - - def put_file(self, local_target, local_filename, target_path, delpath=True): - if local_target.exists(): - local_target.remove() - self.create_file(local_target) - - if delpath: - target = hdfs.HdfsTarget(target_path) - if target.exists(): - target.remove(skip_trash=True) - self.fs.mkdir(target.path) - - self.fs.put(local_target.path, target_path) - target_file_path = target_path + "/" + local_filename - return hdfs.HdfsTarget(target_file_path) - - def test_put(self): - local_dir = "test/data" - local_filename = "file1.dat" - local_path = "%s/%s" % (local_dir, local_filename) - target_path = self._test_dir() - - local_target = luigi.LocalTarget(local_path) - target = self.put_file(local_target, local_filename, target_path) - self.assertTrue(target.exists()) - local_target.remove() - - def test_get(self): - local_dir = "test/data" - local_filename = "file1.dat" - local_path = "%s/%s" % (local_dir, local_filename) - target_path = self._test_dir() - - local_target = luigi.LocalTarget(local_path) - target = self.put_file(local_target, local_filename, target_path) - self.assertTrue(target.exists()) - local_target.remove() - - local_copy_path = "%s/file1.dat.cp" % local_dir - local_copy = luigi.LocalTarget(local_copy_path) - if local_copy.exists(): - local_copy.remove() - self.fs.get(target.path, local_copy_path) - self.assertTrue(local_copy.exists()) - local_copy.remove() - - def test_getmerge(self): - local_dir = "test/data" - local_filename1 = "file1.dat" - local_path1 = "%s/%s" % (local_dir, local_filename1) - local_filename2 = "file2.dat" - local_path2 = "%s/%s" % (local_dir, local_filename2) - target_dir = self._test_dir() - - local_target1 = luigi.LocalTarget(local_path1) - target1 = self.put_file(local_target1, local_filename1, target_dir) - self.assertTrue(target1.exists()) - local_target1.remove() - - local_target2 = luigi.LocalTarget(local_path2) - target2 = self.put_file(local_target2, local_filename2, target_dir) - self.assertTrue(target2.exists()) - local_target2.remove() - - local_copy_path = "%s/file.dat.cp" % (local_dir) - local_copy = luigi.LocalTarget(local_copy_path) - if local_copy.exists(): - local_copy.remove() - self.fs.getmerge(target_dir, local_copy_path) - self.assertTrue(local_copy.exists()) - local_copy.remove() - - local_copy_crc_path = "%s/.file.dat.cp.crc" % (local_dir) - local_copy_crc = luigi.LocalTarget(local_copy_crc_path) - self.assertTrue(local_copy_crc.exists()) - local_copy_crc.remove() - - def _setup_listdir(self): - """Create the test directory, and things in it.""" - target_dir = self._test_dir() - local_dir = "test/data" - - local_filename1 = "file1.dat" - local_path1 = "%s/%s" % (local_dir, local_filename1) - local_target1 = luigi.LocalTarget(local_path1) - target1 = self.put_file(local_target1, local_filename1, target_dir) - self.assertTrue(target1.exists()) - - local_filename2 = "file2.dat" - local_path2 = "%s/%s" % (local_dir, local_filename2) - local_target2 = luigi.LocalTarget(local_path2) - target2 = self.put_file(local_target2, local_filename2, - target_dir, delpath=False) - self.assertTrue(target2.exists()) - - local_filename3 = "file3.dat" - local_path3 = "%s/%s" % (local_dir, local_filename3) - local_target3 = luigi.LocalTarget(local_path3) - target3 = self.put_file(local_target3, local_filename3, - target_dir + '/sub1') - self.assertTrue(target3.exists()) - - local_filename4 = "file4.dat" - local_path4 = "%s/%s" % (local_dir, local_filename4) - local_target4 = luigi.LocalTarget(local_path4) - target4 = self.put_file(local_target4, local_filename4, - target_dir + '/sub2') - self.assertTrue(target4.exists()) - - return target_dir - - def test_listdir_base_list(self): - """Verify we get the base four items created by _setup_listdir()""" - path = self._setup_listdir() - dirlist = self.fs.listdir(path, ignore_directories=False, - ignore_files=False, include_size=False, - include_type=False, include_time=False, - recursive=False) - entries = [dd for dd in dirlist] - self.assertEqual(4, len(entries), msg="%r" % entries) - self.assertEqual(path + '/file1.dat', entries[0], msg="%r" % entries) - self.assertEqual(path + '/file2.dat', entries[1], msg="%r" % entries) - self.assertEqual(path + '/sub1', entries[2], msg="%r" % entries) - self.assertEqual(path + '/sub2', entries[3], msg="%r" % entries) - - def test_listdir_base_list_files_only(self): - """Verify we get the base two files created by _setup_listdir()""" - path = self._setup_listdir() - dirlist = self.fs.listdir(path, ignore_directories=True, - ignore_files=False, include_size=False, - include_type=False, include_time=False, - recursive=False) - entries = [dd for dd in dirlist] - self.assertEqual(2, len(entries), msg="%r" % entries) - self.assertEqual(path + '/file1.dat', entries[0], msg="%r" % entries) - self.assertEqual(path + '/file2.dat', entries[1], msg="%r" % entries) - - def test_listdir_base_list_dirs_only(self): - """Verify we get the base two directories created by _setup_listdir()""" - path = self._setup_listdir() - dirlist = self.fs.listdir(path, ignore_directories=False, - ignore_files=True, include_size=False, - include_type=False, include_time=False, - recursive=False) - entries = [dd for dd in dirlist] - self.assertEqual(2, len(entries), msg="%r" % entries) - self.assertEqual(path + '/sub1', entries[0], msg="%r" % entries) - self.assertEqual(path + '/sub2', entries[1], msg="%r" % entries) - - def test_listdir_base_list_recusion(self): - """Verify we get the every item created by _setup_listdir()""" - path = self._setup_listdir() - dirlist = self.fs.listdir(path, ignore_directories=False, - ignore_files=False, include_size=False, - include_type=False, include_time=False, - recursive=True) - entries = [dd for dd in dirlist] - self.assertEqual(6, len(entries), msg="%r" % entries) - self.assertEqual(path + '/file1.dat', entries[0], msg="%r" % entries) - self.assertEqual(path + '/file2.dat', entries[1], msg="%r" % entries) - self.assertEqual(path + '/sub1', entries[2], msg="%r" % entries) - self.assertEqual(path + '/sub1/file3.dat', entries[3], msg="%r" % entries) - self.assertEqual(path + '/sub2', entries[4], msg="%r" % entries) - self.assertEqual(path + '/sub2/file4.dat', entries[5], msg="%r" % entries) - - def test_listdir_base_list_get_sizes(self): - """Verify we get sizes for the two base files.""" - path = self._setup_listdir() - dirlist = self.fs.listdir(path, ignore_directories=False, - ignore_files=False, include_size=True, - include_type=False, include_time=False, - recursive=False) - entries = [dd for dd in dirlist] - self.assertEqual(4, len(entries), msg="%r" % entries) - self.assertEqual(2, len(entries[0]), msg="%r" % entries) - self.assertEqual(path + '/file1.dat', entries[0][0], msg="%r" % entries) - self.assertEqual(0, entries[0][1], msg="%r" % entries) - self.assertEqual(2, len(entries[1]), msg="%r" % entries) - self.assertEqual(path + '/file2.dat', entries[1][0], msg="%r" % entries) - self.assertEqual(0, entries[1][1], msg="%r" % entries) - - def test_listdir_base_list_get_types(self): - """Verify we get the types for the four base items.""" - path = self._setup_listdir() - dirlist = self.fs.listdir(path, ignore_directories=False, - ignore_files=False, include_size=False, - include_type=True, include_time=False, - recursive=False) - entries = [dd for dd in dirlist] - self.assertEqual(4, len(entries), msg="%r" % entries) - self.assertEqual(2, len(entries[0]), msg="%r" % entries) - self.assertEqual(path + '/file1.dat', entries[0][0], msg="%r" % entries) - self.assertTrue(re.match(r'[-f]', entries[0][1]), msg="%r" % entries) - self.assertEqual(2, len(entries[1]), msg="%r" % entries) - self.assertEqual(path + '/file2.dat', entries[1][0], msg="%r" % entries) - self.assertTrue(re.match(r'[-f]', entries[1][1]), msg="%r" % entries) - self.assertEqual(2, len(entries[2]), msg="%r" % entries) - self.assertEqual(path + '/sub1', entries[2][0], msg="%r" % entries) - self.assertEqual('d', entries[2][1], msg="%r" % entries) - self.assertEqual(2, len(entries[3]), msg="%r" % entries) - self.assertEqual(path + '/sub2', entries[3][0], msg="%r" % entries) - self.assertEqual('d', entries[3][1], msg="%r" % entries) - - def test_listdir_base_list_get_times(self): - """Verify we get the times, even if we can't fully check them.""" - path = self._setup_listdir() - dirlist = self.fs.listdir(path, ignore_directories=False, - ignore_files=False, include_size=False, - include_type=False, include_time=True, - recursive=False) - entries = [dd for dd in dirlist] - self.assertEqual(4, len(entries), msg="%r" % entries) - self.assertEqual(2, len(entries[0]), msg="%r" % entries) - self.assertEqual(path + '/file1.dat', entries[0][0], msg="%r" % entries) - - def test_listdir_full_list_get_everything(self): - """Verify we get all the values, even if we can't fully check them.""" - path = self._setup_listdir() - dirlist = self.fs.listdir(path, ignore_directories=False, - ignore_files=False, include_size=True, - include_type=True, include_time=True, - recursive=True) - entries = [dd for dd in dirlist] - self.assertEqual(6, len(entries), msg="%r" % entries) - self.assertEqual(4, len(entries[0]), msg="%r" % entries) - self.assertEqual(path + '/file1.dat', entries[0][0], msg="%r" % entries) - self.assertEqual(0, entries[0][1], msg="%r" % entries) - self.assertTrue(re.match(r'[-f]', entries[0][2]), msg="%r" % entries) - self.assertEqual(4, len(entries[1]), msg="%r" % entries) - self.assertEqual(path + '/file2.dat', entries[1][0], msg="%r" % entries) - self.assertEqual(4, len(entries[2]), msg="%r" % entries) - self.assertEqual(path + '/sub1', entries[2][0], msg="%r" % entries) - self.assertEqual(4, len(entries[3]), msg="%r" % entries) - self.assertEqual(path + '/sub1/file3.dat', entries[3][0], msg="%r" % entries) - self.assertEqual(4, len(entries[4]), msg="%r" % entries) - self.assertEqual(path + '/sub2', entries[4][0], msg="%r" % entries) - self.assertEqual(4, len(entries[5]), msg="%r" % entries) - self.assertEqual(path + '/sub2/file4.dat', entries[5][0], msg="%r" % entries) - - @mock.patch('luigi.contrib.hdfs.hadoopcli_clients.HdfsClient.call_check') - def test_cdh3_client(self, call_check): - cdh3_client = luigi.contrib.hdfs.HdfsClientCdh3() - cdh3_client.remove("/some/path/here") - self.assertEqual(['fs', '-rmr', '/some/path/here'], call_check.call_args[0][0][-3:]) - - cdh3_client.remove("/some/path/here", recursive=False) - self.assertEqual(['fs', '-rm', '/some/path/here'], call_check.call_args[0][0][-3:]) - - @mock.patch('subprocess.Popen') - def test_apache1_client(self, popen): - comm = mock.Mock(name='communicate_mock') - comm.return_value = "some return stuff", "" - - preturn = mock.Mock(name='open_mock') - preturn.returncode = 0 - preturn.communicate = comm - popen.return_value = preturn - - apache_client = luigi.contrib.hdfs.HdfsClientApache1() - returned = apache_client.exists("/some/path/somewhere") - self.assertTrue(returned) - - preturn.returncode = 1 - returned = apache_client.exists("/some/path/somewhere") - self.assertFalse(returned) - - preturn.returncode = 13 - self.assertRaises(luigi.contrib.hdfs.HDFSCliError, apache_client.exists, "/some/path/somewhere") - - -@attr('apache') -class SnakebiteConfigTest(unittest.TestCase): - @helpers.with_config({"hdfs": {"snakebite_autoconfig": "true"}}) - def testBoolOverride(self): - # See #743 - self.assertEqual(hdfs.config.hdfs().snakebite_autoconfig, True) - - class DummyTestTask(luigi.Task): - pass - - luigi.run(['--local-scheduler', '--no-lock', 'DummyTestTask']) - - self.assertEqual(hdfs.config.hdfs().snakebite_autoconfig, True) - - class _MiscOperationsMixin(object): # TODO: chown/chmod/count should really be methods on HdfsTarget rather than the client! @@ -892,18 +266,3 @@ def test_chmod(self): def test_chown(self): t = self.get_target() self.get_client().chown(t.path, 'root', 'root') - - -@attr('minicluster') -class TestCliMisc(MiniClusterTestCase, _MiscOperationsMixin): - def get_client(self): - return luigi.contrib.hdfs.create_hadoopcli_client() - - -@attr('minicluster') -class TestSnakebiteMisc(MiniClusterTestCase, _MiscOperationsMixin): - def get_client(self): - if six.PY3: - raise unittest.SkipTest("snakebite doesn't work on Python yet.") - - return luigi.contrib.hdfs.SnakebiteHdfsClient() diff --git a/test/minicluster.py b/test/minicluster.py deleted file mode 100644 index dae74a94d3..0000000000 --- a/test/minicluster.py +++ /dev/null @@ -1,96 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2012-2015 Spotify AB -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import getpass -import os - -import luigi.contrib.hadoop -import luigi.contrib.hdfs -from nose.plugins.attrib import attr - -import unittest - -try: - from snakebite.minicluster import MiniCluster -except ImportError: - raise unittest.SkipTest('To use minicluster, snakebite must be installed.') - - -@attr('minicluster') -class MiniClusterTestCase(unittest.TestCase): - - """ Base class for test cases that rely on Hadoop's minicluster functionality. This - in turn depends on Snakebite's minicluster setup: - - http://hadoop.apache.org/docs/r2.5.1/hadoop-project-dist/hadoop-common/CLIMiniCluster.html - https://github.com/spotify/snakebite""" - cluster = None - - @classmethod - def instantiate_cluster(cls): - return MiniCluster(None, nnport=50030) - - @classmethod - def setupClass(cls): - if not cls.cluster: - cls.cluster = cls.instantiate_cluster() - cls.cluster.mkdir("/tmp") - - @classmethod - def tearDownClass(cls): - if cls.cluster: - cls.cluster.terminate() - - def setUp(self): - self.fs = luigi.contrib.hdfs.get_autoconfig_client() - cfg_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testconfig") - hadoop_bin = os.path.join(os.environ['HADOOP_HOME'], 'bin/hadoop') - cmd = "{} --config {}".format(hadoop_bin, cfg_path) - self.stashed_hdfs_client = luigi.configuration.get_config().get('hadoop', 'command', None) - luigi.configuration.get_config().set('hadoop', 'command', cmd) - - def tearDown(self): - if self.fs.exists(self._test_dir()): - self.fs.remove(self._test_dir(), skip_trash=True) - if self.stashed_hdfs_client: - luigi.configuration.get_config().set('hadoop', 'command', self.stashed_hdfs_client) - - @staticmethod - def _test_dir(): - return '/tmp/luigi_tmp_testdir_%s' % getpass.getuser() - - @staticmethod - def _test_file(suffix=""): - return '%s/luigi_tmp_testfile%s' % (MiniClusterTestCase._test_dir(), suffix) - - -class MiniClusterHadoopJobRunner(luigi.contrib.hadoop.HadoopJobRunner): - - ''' The default job runner just reads from config and sets stuff ''' - - def __init__(self): - # Locate the hadoop streaming jar in the hadoop directory - hadoop_tools_lib = os.path.join(os.environ['HADOOP_HOME'], 'share/hadoop/tools/lib') - - for path in os.listdir(hadoop_tools_lib): - if path.startswith('hadoop-streaming') and path.endswith('.jar'): - streaming_jar = os.path.join(hadoop_tools_lib, path) - break - else: - raise Exception('Could not locate streaming jar in ' + hadoop_tools_lib) - - super(MiniClusterHadoopJobRunner, self).__init__(streaming_jar=streaming_jar) diff --git a/test/snakebite_test.py b/test/snakebite_test.py deleted file mode 100644 index cdfea47583..0000000000 --- a/test/snakebite_test.py +++ /dev/null @@ -1,131 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2012-2015 Spotify AB -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import datetime -import getpass -import os -import posixpath -import time -import unittest - -import luigi.target -from luigi import six -from nose.plugins.attrib import attr - -if six.PY3: - raise unittest.SkipTest("snakebite doesn't work on Python 3 yet.") - -try: - from luigi.contrib.hdfs import SnakebiteHdfsClient - from minicluster import MiniClusterTestCase -except ImportError: - raise unittest.SkipTest('Snakebite not installed') - - -@attr('minicluster') -class TestSnakebiteClient(MiniClusterTestCase): - - """This test requires a snakebite -- it finds it from your - luigi.cfg""" - snakebite = None - - def get_client(self): - return SnakebiteHdfsClient() - - """ - This hdfs client is used for writing file to hdfs and - then getting merge of it using snakebite - """ - def get_hdfs_client(self): - return luigi.contrib.hdfs.create_hadoopcli_client() - - def setUp(self): - """ We override setUp because we want to also use snakebite for - creating the testing directory. """ - self.testDir = "/tmp/luigi-test-{0}-{1}".format( - os.environ["USER"], - time.mktime(datetime.datetime.now().timetuple()) - ) - self.snakebite = self.get_client() - self.assertTrue(self.snakebite.mkdir(self.testDir)) - - def tearDown(self): - if self.snakebite.exists(self.testDir): - self.snakebite.remove(self.testDir, True) - - def test_get_merge(self): - hdfs_client = self.get_hdfs_client() - local_filename = "file2.dat" - - target_dir = '/tmp/luigi_tmp_testdir_%s' % getpass.getuser() - local_target1 = luigi.LocalTarget(local_filename) - target1 = hdfs_client.put_file(local_target1, local_filename, target_dir) - self.assertTrue(target1.exists()) - - local_dir = "test/data" - local_copy_path = "%s/file.dat.cp" % (local_dir) - local_copy = luigi.LocalTarget(local_copy_path) - if local_copy.exists(): - local_copy.remove() - self.snakebite.get_merge(target_dir, local_copy_path) - self.assertTrue(local_copy.exists()) - local_copy.remove() - - def test_exists(self): - self.assertTrue(self.snakebite.exists(self.testDir)) - - def test_rename(self): - foo = posixpath.join(self.testDir, "foo") - bar = posixpath.join(self.testDir, "bar") - self.assertTrue(self.snakebite.mkdir(foo)) - self.assertTrue(self.snakebite.rename(foo, bar)) - self.assertTrue(self.snakebite.exists(bar)) - - def test_rename_trailing_slash(self): - foo = posixpath.join(self.testDir, "foo") - bar = posixpath.join(self.testDir, "bar/") - self.assertTrue(self.snakebite.mkdir(foo)) - self.assertTrue(self.snakebite.rename(foo, bar)) - self.assertTrue(self.snakebite.exists(bar)) - self.assertFalse(self.snakebite.exists(posixpath.join(bar, 'foo'))) - - def test_relativepath(self): - rel_test_dir = "." + os.path.split(self.testDir)[1] - try: - self.assertFalse(self.snakebite.exists(rel_test_dir)) - self.snakebite.mkdir(rel_test_dir) - self.assertTrue(self.snakebite.exists(rel_test_dir)) - finally: - if self.snakebite.exists(rel_test_dir): - self.snakebite.remove(rel_test_dir, True) - - def test_rename_dont_move(self): - foo = posixpath.join(self.testDir, "foo") - bar = posixpath.join(self.testDir, "bar") - self.assertTrue(self.snakebite.mkdir(foo)) - self.assertTrue(self.snakebite.mkdir(bar)) - self.assertTrue(self.snakebite.exists(foo)) # For sanity - self.assertTrue(self.snakebite.exists(bar)) # For sanity - - self.assertRaises(luigi.target.FileAlreadyExists, - lambda: self.snakebite.rename_dont_move(foo, bar)) - self.assertTrue(self.snakebite.exists(foo)) - self.assertTrue(self.snakebite.exists(bar)) - - self.snakebite.rename_dont_move(foo, foo + '2') - self.assertFalse(self.snakebite.exists(foo)) - self.assertTrue(self.snakebite.exists(foo + '2')) diff --git a/test/webhdfs_minicluster.py b/test/webhdfs_minicluster.py deleted file mode 100644 index 3f803030f9..0000000000 --- a/test/webhdfs_minicluster.py +++ /dev/null @@ -1,79 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2015 VNG Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from minicluster import MiniClusterTestCase -import unittest -import subprocess -import select -import re - -try: - from snakebite.minicluster import MiniCluster -except ImportError: - raise unittest.SkipTest('To use minicluster, snakebite must be installed.') - - -class WebHdfsMiniCluster(MiniCluster): - ''' - This is a unclean class overriding of the snakebite minicluster. - - But since it seemed pretty inflexible I had to override private methods - here. - ''' - @property - def webhdfs_port(self): - return self.port - - def _start_mini_cluster(self, nnport=None): - """ - Copied in an ugly manner from snakebite source code. - """ - if self._jobclient_jar: - hadoop_jar = self._jobclient_jar - else: - hadoop_jar = self._find_mini_cluster_jar(self._hadoop_home) - if not hadoop_jar: - raise Exception("No hadoop jobclient test jar found") - cmd = [self._hadoop_cmd, 'jar', hadoop_jar, - 'minicluster', '-nomr', '-format'] - if nnport: - cmd.extend(['-nnport', "%s" % nnport]) - if True: - # luigi webhdfs version - cmd.extend(['-Ddfs.webhdfs.enabled=true']) - self.hdfs = subprocess.Popen(cmd, bufsize=0, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, universal_newlines=True) - - def _get_namenode_port(self): - just_seen_webhdfs = False - while self.hdfs.poll() is None: - rlist, wlist, xlist = select.select([self.hdfs.stderr, self.hdfs.stdout], [], []) - for f in rlist: - line = f.readline() - print(line.rstrip()) - - m = re.match(".*Jetty bound to port (\\d+).*", line) - if just_seen_webhdfs and m: - return int(m.group(1)) - just_seen_webhdfs = re.match(".*namenode.*webhdfs.*", line) - - -class WebHdfsMiniClusterTestCase(MiniClusterTestCase): - - @classmethod - def instantiate_cluster(cls): - return WebHdfsMiniCluster(None, nnport=50030) diff --git a/tox.ini b/tox.ini index 592a10e3ac..c0c1cc7ac3 100644 --- a/tox.ini +++ b/tox.ini @@ -19,7 +19,6 @@ deps = elasticsearch<2.0.0 psutil<4.0 enum34>1.1.0 - cdh,hdp: snakebite>=2.5.2,<2.6.0 cdh,hdp: hdfs>=2.0.4,<3.0.0 postgres: psycopg2<3.0 mysql-connector-python>=8.0.12