Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(hdfs): Use available port for HdfsMiniCluster #11996

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
#include <gmock/gmock-matchers.h>
#include <atomic>
#include <random>
#include "HdfsMiniCluster.h"
#include "gtest/gtest.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.h"
#include "velox/core/QueryConfig.h"
#include "velox/exec/tests/utils/TempFilePath.h"
#include "velox/external/hdfs/ArrowHdfsInternal.h"
Expand All @@ -36,14 +36,9 @@ using filesystems::arrow::io::internal::LibHdfsShim;

constexpr int kOneMB = 1 << 20;
static const std::string destinationPath = "/test_file.txt";
static const std::string hdfsPort = "7878";
static const std::string localhost = "localhost";
static const std::string fullDestinationPath =
"hdfs://" + localhost + ":" + hdfsPort + destinationPath;
static const std::string simpleDestinationPath = "hdfs://" + destinationPath;
static const std::string viewfsDestinationPath = "viewfs://" + destinationPath;
static const std::unordered_map<std::string, std::string> configurationValues(
{{"hive.hdfs.host", localhost}, {"hive.hdfs.port", hdfsPort}});
std::unordered_map<std::string, std::string> configurationValues;

class HdfsFileSystemTest : public testing::Test {
public:
Expand All @@ -55,6 +50,12 @@ class HdfsFileSystemTest : public testing::Test {
auto tempFile = createFile();
miniCluster->addFile(tempFile->getPath(), destinationPath);
}
configurationValues.insert(
{"hive.hdfs.host", std::string(miniCluster->host())});
configurationValues.insert(
{"hive.hdfs.port", std::string(miniCluster->nameNodePort())});
fullDestinationPath_ =
fmt::format("{}{}", miniCluster->url(), destinationPath);
}

void SetUp() override {
Expand All @@ -67,8 +68,18 @@ class HdfsFileSystemTest : public testing::Test {
static void TearDownTestSuite() {
miniCluster->stop();
}

static std::unique_ptr<WriteFile> openFileForWrite(std::string_view path) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFilePath = fmt::format("{}{}", miniCluster->url(), path);
auto hdfsFileSystem = filesystems::getFileSystem(hdfsFilePath, config);
return hdfsFileSystem->openFileForWrite(path);
}

static std::atomic<bool> startThreads;
static std::shared_ptr<filesystems::test::HdfsMiniCluster> miniCluster;
static std::string fullDestinationPath_;

private:
static std::shared_ptr<::exec::test::TempFilePath> createFile() {
Expand All @@ -84,6 +95,7 @@ class HdfsFileSystemTest : public testing::Test {
std::shared_ptr<filesystems::test::HdfsMiniCluster>
HdfsFileSystemTest::miniCluster = nullptr;
std::atomic<bool> HdfsFileSystemTest::startThreads = false;
std::string HdfsFileSystemTest::fullDestinationPath_;

void readData(ReadFile* readFile) {
ASSERT_EQ(readFile->size(), 15 + kOneMB);
Expand All @@ -107,15 +119,6 @@ void readData(ReadFile* readFile) {
ASSERT_EQ(warfFromBuf, "abbbbbcc");
}

std::unique_ptr<WriteFile> openFileForWrite(std::string_view path) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
std::string hdfsFilePath =
"hdfs://" + localhost + ":" + hdfsPort + std::string(path);
auto hdfsFileSystem = filesystems::getFileSystem(hdfsFilePath, config);
return hdfsFileSystem->openFileForWrite(path);
}

void checkReadErrorMessages(
ReadFile* readFile,
std::string errorMessage,
Expand Down Expand Up @@ -184,54 +187,61 @@ void verifyFailures(LibHdfsShim* driver, hdfsFS hdfs) {
}

hdfsFS connectHdfsDriver(
filesystems::arrow::io::internal::LibHdfsShim** driver) {
filesystems::arrow::io::internal::LibHdfsShim** driver,
const std::string host,
const std::string port) {
filesystems::arrow::io::internal::LibHdfsShim* libhdfs_shim;
auto status = filesystems::arrow::io::internal::ConnectLibHdfs(&libhdfs_shim);
if (!status.ok()) {
LOG(ERROR) << "ConnectLibHdfs failed ";
}
VELOX_CHECK(status.ok(), "ConnectLibHdfs failed.");

// Connect to HDFS with the builder object
hdfsBuilder* builder = libhdfs_shim->NewBuilder();
libhdfs_shim->BuilderSetNameNode(builder, localhost.c_str());
libhdfs_shim->BuilderSetNameNodePort(builder, 7878);
libhdfs_shim->BuilderSetNameNode(builder, host.c_str());
libhdfs_shim->BuilderSetNameNodePort(builder, std::stoi(port));
libhdfs_shim->BuilderSetForceNewInstance(builder);

auto hdfs = libhdfs_shim->BuilderConnect(builder);
VELOX_CHECK_NOT_NULL(
hdfs,
"Unable to connect to HDFS: {}, got error",
std::string(localhost.c_str()) + ":7878");
"Unable to connect to HDFS at {}:{}, got error",
host.c_str(),
port);
*driver = libhdfs_shim;
return hdfs;
}

TEST_F(HdfsFileSystemTest, read) {
filesystems::arrow::io::internal::LibHdfsShim* driver;
auto hdfs = connectHdfsDriver(&driver);
auto hdfs = connectHdfsDriver(
&driver,
std::string(miniCluster->host()),
std::string(miniCluster->nameNodePort()));
HdfsReadFile readFile(driver, hdfs, destinationPath);
readData(&readFile);
}

TEST_F(HdfsFileSystemTest, viaFileSystem) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath_);
readData(readFile.get());
}

TEST_F(HdfsFileSystemTest, initializeFsWithEndpointInfoInFilePath) {
// Without host/port configured.
auto config = std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>());
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath_);
readData(readFile.get());

// Wrong endpoint info specified in hdfs file path.
const std::string wrongFullDestinationPath =
"hdfs://not_exist_host:" + hdfsPort + destinationPath;
"hdfs://not_exist_host:" + std::string(miniCluster->nameNodePort()) +
destinationPath;
VELOX_ASSERT_THROW(
filesystems::getFileSystem(wrongFullDestinationPath, config),
"Unable to connect to HDFS");
Expand All @@ -240,23 +250,25 @@ TEST_F(HdfsFileSystemTest, initializeFsWithEndpointInfoInFilePath) {
TEST_F(HdfsFileSystemTest, fallbackToUseConfig) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath_);
readData(readFile.get());
}

TEST_F(HdfsFileSystemTest, oneFsInstanceForOneEndpoint) {
auto hdfsFileSystem1 =
filesystems::getFileSystem(fullDestinationPath, nullptr);
filesystems::getFileSystem(fullDestinationPath_, nullptr);
auto hdfsFileSystem2 =
filesystems::getFileSystem(fullDestinationPath, nullptr);
filesystems::getFileSystem(fullDestinationPath_, nullptr);
ASSERT_TRUE(hdfsFileSystem1 == hdfsFileSystem2);
}

TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);

VELOX_ASSERT_RUNTIME_THROW_CODE(
hdfsFileSystem->openFileForRead(
Expand All @@ -268,7 +280,7 @@ TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) {
TEST_F(HdfsFileSystemTest, missingHost) {
try {
std::unordered_map<std::string, std::string> missingHostConfiguration(
{{"hive.hdfs.port", hdfsPort}});
{{"hive.hdfs.port", std::string(miniCluster->nameNodePort())}});
auto config = std::make_shared<const config::ConfigBase>(
std::move(missingHostConfiguration));
filesystems::HdfsFileSystem hdfsFileSystem(
Expand All @@ -287,7 +299,7 @@ TEST_F(HdfsFileSystemTest, missingHost) {
TEST_F(HdfsFileSystemTest, missingPort) {
try {
std::unordered_map<std::string, std::string> missingPortConfiguration(
{{"hive.hdfs.host", localhost}});
{{"hive.hdfs.host", std::string(miniCluster->host())}});
auto config = std::make_shared<const config::ConfigBase>(
std::move(missingPortConfiguration));
filesystems::HdfsFileSystem hdfsFileSystem(
Expand All @@ -306,7 +318,10 @@ TEST_F(HdfsFileSystemTest, missingPort) {
TEST_F(HdfsFileSystemTest, missingFileViaReadFile) {
try {
filesystems::arrow::io::internal::LibHdfsShim* driver;
auto hdfs = connectHdfsDriver(&driver);
auto hdfs = connectHdfsDriver(
&driver,
std::string(miniCluster->host()),
std::string(miniCluster->nameNodePort()));
HdfsReadFile readFile(driver, hdfs, "/path/that/does/not/exist");
FAIL() << "expected VeloxException";
} catch (VeloxException const& error) {
Expand All @@ -329,8 +344,8 @@ TEST_F(HdfsFileSystemTest, schemeMatching) {
"No registered file system matched with file path 'file://'"));
}
auto fs = std::dynamic_pointer_cast<filesystems::HdfsFileSystem>(
filesystems::getFileSystem(fullDestinationPath, nullptr));
ASSERT_TRUE(fs->isHdfsFile(fullDestinationPath));
filesystems::getFileSystem(fullDestinationPath_, nullptr));
ASSERT_TRUE(fs->isHdfsFile(fullDestinationPath_));

fs = std::dynamic_pointer_cast<filesystems::HdfsFileSystem>(
filesystems::getFileSystem(viewfsDestinationPath, nullptr));
Expand All @@ -342,7 +357,7 @@ TEST_F(HdfsFileSystemTest, writeNotSupported) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath, config);
filesystems::getFileSystem(fullDestinationPath_, config);
hdfsFileSystem->openFileForWrite("/path");
} catch (VeloxException const& error) {
EXPECT_EQ(error.message(), "Write to HDFS is unsupported");
Expand All @@ -354,7 +369,7 @@ TEST_F(HdfsFileSystemTest, removeNotSupported) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath, config);
filesystems::getFileSystem(fullDestinationPath_, config);
hdfsFileSystem->remove("/path");
} catch (VeloxException const& error) {
EXPECT_EQ(error.message(), "Does not support removing files from hdfs");
Expand All @@ -365,8 +380,10 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithReadFile) {
startThreads = false;

filesystems::arrow::io::internal::LibHdfsShim* driver;
auto hdfs = connectHdfsDriver(&driver);

auto hdfs = connectHdfsDriver(
&driver,
std::string(miniCluster->host()),
std::string(miniCluster->nameNodePort()));
std::vector<std::thread> threads;
std::mt19937 generator(std::random_device{}());
std::vector<int> sleepTimesInMicroseconds = {0, 500, 50000};
Expand Down Expand Up @@ -396,7 +413,8 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithFileSystem) {
startThreads = false;
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);

std::vector<std::thread> threads;
std::mt19937 generator(std::random_device{}());
Expand All @@ -414,7 +432,7 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithFileSystem) {
}
std::this_thread::sleep_for(
std::chrono::microseconds(sleepTimesInMicroseconds[index]));
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath_);
readData(readFile.get());
});
threads.emplace_back(std::move(thread));
Expand Down Expand Up @@ -478,6 +496,9 @@ TEST_F(HdfsFileSystemTest, writeWithParentDirNotExist) {

TEST_F(HdfsFileSystemTest, readFailures) {
filesystems::arrow::io::internal::LibHdfsShim* driver;
auto hdfs = connectHdfsDriver(&driver);
auto hdfs = connectHdfsDriver(
&driver,
std::string(miniCluster->host()),
std::string(miniCluster->nameNodePort()));
verifyFailures(driver, hdfs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
* limitations under the License.
*/

#include "HdfsMiniCluster.h"
#include "velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.h"

#include "velox/exec/tests/utils/PortUtil.h"

namespace facebook::velox::filesystems::test {
void HdfsMiniCluster::start() {
Expand All @@ -28,16 +30,16 @@ void HdfsMiniCluster::start() {
noMapReduceOption,
formatNameNodeOption,
httpPortOption,
httpPort,
httpPort_,
nameNodePortOption,
nameNodePort,
nameNodePort_,
configurationOption,
turnOffPermissions);
serverProcess_->wait_for(std::chrono::duration<int, std::milli>(60000));
VELOX_CHECK_EQ(
serverProcess_->exit_code(),
383,
"Minicluster process exited, code: ",
"Minicluster process exited, code: {}",
serverProcess_->exit_code());
} catch (const std::exception& e) {
VELOX_FAIL("Failed to launch Minicluster server: {}", e.what());
Expand Down Expand Up @@ -71,6 +73,11 @@ HdfsMiniCluster::HdfsMiniCluster() {
VELOX_FAIL(
"Failed to find minicluster executable {}'", miniClusterExecutableName);
}
constexpr auto kHostAddressTemplate = "hdfs://{}:{}";
auto ports = facebook::velox::exec::test::getFreePorts(2);
nameNodePort_ = fmt::format("{}", ports[0]);
httpPort_ = fmt::format("{}", ports[1]);
filesystemUrl_ = fmt::format(kHostAddressTemplate, host(), nameNodePort_);
boost::filesystem::path hadoopHomeDirectory = exePath_;
hadoopHomeDirectory.remove_leaf().remove_leaf();
setupEnvironment(hadoopHomeDirectory.string());
Expand All @@ -82,12 +89,12 @@ void HdfsMiniCluster::addFile(std::string source, std::string destination) {
exePath_,
filesystemCommand,
filesystemUrlOption,
filesystemUrl,
filesystemUrl_,
filePutOption,
source,
destination);
bool isExited =
filePutProcess->wait_for(std::chrono::duration<int, std::milli>(5000));
filePutProcess->wait_for(std::chrono::duration<int, std::milli>(15000));
if (!isExited) {
VELOX_FAIL(
"Failed to add file to hdfs, exit code: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@ static const std::string miniclusterCommand{"minicluster"};
static const std::string noMapReduceOption{"-nomr"};
static const std::string formatNameNodeOption{"-format"};
static const std::string httpPortOption{"-nnhttpport"};
static const std::string httpPort{"7676"};
static const std::string nameNodePortOption{"-nnport"};
static const std::string nameNodePort{"7878"};
static const std::string configurationOption{"-D"};
static const std::string turnOffPermissions{"dfs.permissions=false"};
static const std::string filesystemCommand{"fs"};
static const std::string filesystemUrlOption{"-fs"};
static const std::string filesystemUrl{"hdfs://localhost:" + nameNodePort};
static const std::string filePutOption{"-put"};

class HdfsMiniCluster {
Expand All @@ -54,11 +51,27 @@ class HdfsMiniCluster {
void addFile(std::string source, std::string destination);
virtual ~HdfsMiniCluster();

std::string_view nameNodePort() const {
return nameNodePort_;
}

std::string_view url() const {
return filesystemUrl_;
}

std::string_view host() const {
static const std::string_view kLocalhost = "localhost";
return kLocalhost;
}

private:
void setupEnvironment(const std::string& homeDirectory);

std::unique_ptr<::boost::process::child> serverProcess_;
boost::filesystem::path exePath_;
boost::process::environment env_;
std::string nameNodePort_;
std::string httpPort_;
std::string filesystemUrl_;
};
} // namespace facebook::velox::filesystems::test
Loading
Loading