Skip to content

Commit

Permalink
Remove libhdfs3 support
Browse files Browse the repository at this point in the history
It's no longer maintained.
  • Loading branch information
kou committed Feb 18, 2020
1 parent b54664c commit b311877
Show file tree
Hide file tree
Showing 18 changed files with 48 additions and 313 deletions.
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ PANDAS=latest
DASK=latest
TURBODBC=latest
HDFS=2.9.2
LIBHDFS3=2.3
SPARK=master
DOTNET=2.1
R=3.6
Expand Down
2 changes: 0 additions & 2 deletions ci/docker/conda-python-hdfs.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ ARG python=3.6
FROM ${repo}:${arch}-conda-python-${python}

ARG jdk=8
ARG libhdfs3=2.3
ARG maven=3.5
RUN conda install -q \
libhdfs3=${libhdfs3} \
maven=${maven} \
openjdk=${jdk} \
pandas && \
Expand Down
9 changes: 0 additions & 9 deletions ci/scripts/integration_hdfs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,8 @@ function use_libhdfs_dir() {
export ARROW_LIBHDFS_DIR=$libhdfs_dir
}

# TODO: libhdfs3 backend may be broken
if [ -f $CONDA_PREFIX/lib/libhdfs3.so ]; then
mv $CONDA_PREFIX/lib/libhdfs3.so{,.disabled}
fi

# execute cpp tests
export ARROW_HDFS_TEST_LIBHDFS_REQUIRE=ON
# TODO: libhdfs3 backend may be broken
# export ARROW_HDFS_TEST_LIBHDFS3_REQUIRE=ON
pushd ${build_dir}

debug/arrow-io-hdfs-test
Expand All @@ -66,8 +59,6 @@ popd
export PYARROW_TEST_HDFS=ON

export PYARROW_HDFS_TEST_LIBHDFS_REQUIRE=ON
# TODO: libhdfs3 backend may be broken
# export PYARROW_HDFS_TEST_LIBHDFS3_REQUIRE=ON

pytest -v --pyargs pyarrow.tests.test_fs
pytest -v --pyargs pyarrow.tests.test_hdfs
Expand Down
42 changes: 3 additions & 39 deletions cpp/src/arrow/filesystem/hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ class HadoopFileSystem::Impl {

Status Init() {
io::internal::LibHdfsShim* driver_shim;
if (options_.connection_config.driver == io::HdfsDriver::LIBHDFS3) {
RETURN_NOT_OK(ConnectLibHdfs3(&driver_shim));
} else {
RETURN_NOT_OK(ConnectLibHdfs(&driver_shim));
}
RETURN_NOT_OK(ConnectLibHdfs(&driver_shim));
RETURN_NOT_OK(io::HadoopFileSystem::Connect(&options_.connection_config, &client_));
return Status::OK();
}
Expand Down Expand Up @@ -266,14 +262,6 @@ void HdfsOptions::ConfigureEndPoint(const std::string& host, int port) {
connection_config.port = port;
}

void HdfsOptions::ConfigureHdfsDriver(bool use_hdfs3) {
if (use_hdfs3) {
connection_config.driver = ::arrow::io::HdfsDriver::LIBHDFS3;
} else {
connection_config.driver = ::arrow::io::HdfsDriver::LIBHDFS;
}
}

void HdfsOptions::ConfigureHdfsUser(const std::string& user_name) {
connection_config.user = user_name;
}
Expand All @@ -299,32 +287,8 @@ Result<HdfsOptions> HdfsOptions::FromUri(const Uri& uri) {
options_map.emplace(kv.first, kv.second);
}

auto useHdfs3 = false;
auto it = options_map.find("use_hdfs3");
if (it != options_map.end()) {
const auto& v = it->second;
if (v == "1") {
options.ConfigureHdfsDriver(true);
useHdfs3 = true;
} else if (v == "0") {
options.ConfigureHdfsDriver(false);
} else {
return Status::Invalid(
"Invalid value for option 'use_hdfs3' (allowed values are '0' and '1'): '", v,
"'");
}
}

std::string host;
if (useHdfs3) {
if (uri.scheme() == "viewfs") {
ARROW_LOG(WARNING)
<< "viewfs://namenode resolves to hdfs://namenode with hdfs3 driver.";
}
host = uri.host();
} else {
host = uri.scheme() + "://" + uri.host();
}
host = uri.scheme() + "://" + uri.host();

const auto port = uri.port();
if (port == -1) {
Expand All @@ -334,7 +298,7 @@ Result<HdfsOptions> HdfsOptions::FromUri(const Uri& uri) {
options.ConfigureEndPoint(host, port);
}

it = options_map.find("replication");
auto it = options_map.find("replication");
if (it != options_map.end()) {
const auto& v = it->second;
::arrow::internal::StringConverter<Int16Type> converter;
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/arrow/filesystem/hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ struct ARROW_EXPORT HdfsOptions {
int64_t default_block_size = 0;

void ConfigureEndPoint(const std::string& host, int port);
/// Be cautious that libhdfs3 is a unmaintained project
void ConfigureHdfsDriver(bool use_hdfs3);
void ConfigureHdfsReplication(int16_t replication);
void ConfigureHdfsUser(const std::string& user_name);
void ConfigureHdfsBufferSize(int32_t buffer_size);
Expand Down
53 changes: 9 additions & 44 deletions cpp/src/arrow/filesystem/hdfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
namespace arrow {

using internal::Uri;
using io::HdfsDriver;

namespace fs {

Expand All @@ -44,41 +43,21 @@ TEST(TestHdfsOptions, FromUri) {
ASSERT_EQ(options.connection_config.host, "hdfs://localhost");
ASSERT_EQ(options.connection_config.port, 0);
ASSERT_EQ(options.connection_config.user, "");
ASSERT_EQ(options.connection_config.driver, HdfsDriver::LIBHDFS);

ASSERT_OK(uri.Parse("hdfs://otherhost:9999/?use_hdfs3=0&replication=2"));
ASSERT_OK(uri.Parse("hdfs://otherhost:9999/?replication=2"));
ASSERT_OK_AND_ASSIGN(options, HdfsOptions::FromUri(uri));
ASSERT_EQ(options.replication, 2);
ASSERT_EQ(options.connection_config.host, "hdfs://otherhost");
ASSERT_EQ(options.connection_config.port, 9999);
ASSERT_EQ(options.connection_config.user, "");
ASSERT_EQ(options.connection_config.driver, HdfsDriver::LIBHDFS);

ASSERT_OK(uri.Parse("hdfs://otherhost:9999/?use_hdfs3=1&user=stevereich"));
ASSERT_OK_AND_ASSIGN(options, HdfsOptions::FromUri(uri));
ASSERT_EQ(options.replication, 3);
ASSERT_EQ(options.connection_config.host, "otherhost");
ASSERT_EQ(options.connection_config.port, 9999);
ASSERT_EQ(options.connection_config.user, "stevereich");
ASSERT_EQ(options.connection_config.driver, HdfsDriver::LIBHDFS3);

ASSERT_OK(uri.Parse("viewfs://other-nn/mypath/myfile"));
ASSERT_OK_AND_ASSIGN(options, HdfsOptions::FromUri(uri));
ASSERT_EQ(options.connection_config.host, "viewfs://other-nn");
ASSERT_EQ(options.connection_config.port, 0);
ASSERT_EQ(options.connection_config.user, "");
ASSERT_EQ(options.connection_config.driver, HdfsDriver::LIBHDFS);
}

struct JNIDriver {
static HdfsDriver type;
};

struct PivotalDriver {
static HdfsDriver type;
};

template <typename DRIVER>
class TestHadoopFileSystem : public ::testing::Test {
public:
void SetUp() override {
Expand All @@ -90,15 +69,8 @@ class TestHadoopFileSystem : public ::testing::Test {
int hdfs_port = port == nullptr ? 20500 : atoi(port);
std::string hdfs_user = user == nullptr ? "root" : std::string(user);

if (DRIVER::type == HdfsDriver::LIBHDFS) {
use_hdfs3_ = false;
} else {
use_hdfs3_ = true;
}

options_.ConfigureEndPoint(hdfs_host, hdfs_port);
options_.ConfigureHdfsUser(hdfs_user);
options_.ConfigureHdfsDriver(use_hdfs3_);
options_.ConfigureHdfsReplication(0);

auto result = HadoopFileSystem::Make(options_);
Expand Down Expand Up @@ -207,20 +179,13 @@ class TestHadoopFileSystem : public ::testing::Test {
bool loaded_driver_ = false;
};

HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS;
HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3;

typedef ::testing::Types<JNIDriver, PivotalDriver> DriverTypes;

TYPED_TEST_CASE(TestHadoopFileSystem, DriverTypes);

#define SKIP_IF_NO_DRIVER() \
if (!this->loaded_driver_) { \
ARROW_LOG(INFO) << "Driver not loaded, skipping"; \
return; \
}

TYPED_TEST(TestHadoopFileSystem, CreateDirDeleteDir) {
TEST_F(TestHadoopFileSystem, CreateDirDeleteDir) {
SKIP_IF_NO_DRIVER();

// recursive = true
Expand All @@ -243,7 +208,7 @@ TYPED_TEST(TestHadoopFileSystem, CreateDirDeleteDir) {
ASSERT_RAISES(IOError, this->fs_->DeleteDir("AB"));
}

TYPED_TEST(TestHadoopFileSystem, DeleteDirContents) {
TEST_F(TestHadoopFileSystem, DeleteDirContents) {
SKIP_IF_NO_DRIVER();

ASSERT_OK(this->fs_->CreateDir("AB/CD"));
Expand All @@ -262,7 +227,7 @@ TYPED_TEST(TestHadoopFileSystem, DeleteDirContents) {
ASSERT_OK(this->fs_->DeleteDir("AB"));
}

TYPED_TEST(TestHadoopFileSystem, WriteReadFile) {
TEST_F(TestHadoopFileSystem, WriteReadFile) {
SKIP_IF_NO_DRIVER();

ASSERT_OK(this->fs_->CreateDir("CD"));
Expand All @@ -285,21 +250,21 @@ TYPED_TEST(TestHadoopFileSystem, WriteReadFile) {
ASSERT_OK(this->fs_->DeleteDir("CD"));
}

TYPED_TEST(TestHadoopFileSystem, GetTargetStatsRelative) {
TEST_F(TestHadoopFileSystem, GetTargetStatsRelative) {
// Test GetTargetStats() with relative paths
SKIP_IF_NO_DRIVER();

this->TestGetTargetStats("");
}

TYPED_TEST(TestHadoopFileSystem, GetTargetStatsAbsolute) {
TEST_F(TestHadoopFileSystem, GetTargetStatsAbsolute) {
// Test GetTargetStats() with absolute paths
SKIP_IF_NO_DRIVER();

this->TestGetTargetStats("/");
}

TYPED_TEST(TestHadoopFileSystem, RelativeVsAbsolutePaths) {
TEST_F(TestHadoopFileSystem, RelativeVsAbsolutePaths) {
SKIP_IF_NO_DRIVER();

// XXX This test assumes the current working directory is not "/"
Expand All @@ -313,7 +278,7 @@ TYPED_TEST(TestHadoopFileSystem, RelativeVsAbsolutePaths) {
AssertFileStats(this->fs_.get(), "CD", FileType::NonExistent);
}

TYPED_TEST(TestHadoopFileSystem, MoveDir) {
TEST_F(TestHadoopFileSystem, MoveDir) {
SKIP_IF_NO_DRIVER();

FileStats stat;
Expand All @@ -333,7 +298,7 @@ TYPED_TEST(TestHadoopFileSystem, MoveDir) {
ASSERT_OK(this->fs_->DeleteDir(directory_name_dest));
}

TYPED_TEST(TestHadoopFileSystem, FileSystemFromUri) {
TEST_F(TestHadoopFileSystem, FileSystemFromUri) {
SKIP_IF_NO_DRIVER();

this->TestFileSystemFromUri();
Expand Down
11 changes: 1 addition & 10 deletions cpp/src/arrow/io/hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,7 @@ class HadoopFileSystem::HadoopFileSystemImpl {
HadoopFileSystemImpl() : driver_(NULLPTR), port_(0), fs_(NULLPTR) {}

Status Connect(const HdfsConnectionConfig* config) {
if (config->driver == HdfsDriver::LIBHDFS3) {
RETURN_NOT_OK(ConnectLibHdfs3(&driver_));
} else {
RETURN_NOT_OK(ConnectLibHdfs(&driver_));
}
RETURN_NOT_OK(ConnectLibHdfs(&driver_));

// connect to HDFS with the builder object
hdfsBuilder* builder = driver_->NewBuilder();
Expand Down Expand Up @@ -674,10 +670,5 @@ Status HaveLibHdfs() {
return internal::ConnectLibHdfs(&driver);
}

Status HaveLibHdfs3() {
internal::LibHdfsShim* driver;
return internal::ConnectLibHdfs3(&driver);
}

} // namespace io
} // namespace arrow
6 changes: 0 additions & 6 deletions cpp/src/arrow/io/hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,12 @@ struct HdfsPathInfo {
int16_t permissions;
};

enum class HdfsDriver : char { LIBHDFS, LIBHDFS3 };

struct HdfsConnectionConfig {
std::string host;
int port;
std::string user;
std::string kerb_ticket;
std::unordered_map<std::string, std::string> extra_conf;
HdfsDriver driver;

HdfsConnectionConfig() : driver(HdfsDriver::LIBHDFS) {}
};

class ARROW_EXPORT HadoopFileSystem : public FileSystem {
Expand Down Expand Up @@ -258,7 +253,6 @@ class ARROW_EXPORT HdfsOutputStream : public OutputStream {
};

Status ARROW_EXPORT HaveLibHdfs();
Status ARROW_EXPORT HaveLibHdfs3();

} // namespace io
} // namespace arrow
Expand Down
Loading

0 comments on commit b311877

Please sign in to comment.