Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
majetideepak committed Dec 31, 2024
1 parent 4b0c04d commit 792e047
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
#include <gmock/gmock-matchers.h>
#include <atomic>
#include <random>
#include "HdfsMiniCluster.h"
#include "gtest/gtest.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.h"
#include "velox/core/QueryConfig.h"
#include "velox/exec/tests/utils/TempFilePath.h"
#include "velox/external/hdfs/ArrowHdfsInternal.h"
Expand All @@ -36,14 +36,11 @@ using filesystems::arrow::io::internal::LibHdfsShim;

constexpr int kOneMB = 1 << 20;
static const std::string destinationPath = "/test_file.txt";
static const std::string hdfsPort = "7878";
static const std::string localhost = "localhost";
static const std::string fullDestinationPath =
"hdfs://" + localhost + ":" + hdfsPort + destinationPath;
static const std::string simpleDestinationPath = "hdfs://" + destinationPath;
static const std::string viewfsDestinationPath = "viewfs://" + destinationPath;
static const std::unordered_map<std::string, std::string> configurationValues(
{{"hive.hdfs.host", localhost}, {"hive.hdfs.port", hdfsPort}});
std::unordered_map<std::string, std::string> configurationValues(
{{"hive.hdfs.host", localhost}});

class HdfsFileSystemTest : public testing::Test {
public:
Expand All @@ -55,6 +52,12 @@ class HdfsFileSystemTest : public testing::Test {
auto tempFile = createFile();
miniCluster->addFile(tempFile->getPath(), destinationPath);
}
configurationValues.insert({"hive.hdfs.port", miniCluster->nameNodePort()});
fullDestinationPath_ = fmt::format(
"hdfs://{}:{}{}",
localhost,
miniCluster->nameNodePort(),
destinationPath);
}

void SetUp() override {
Expand All @@ -67,8 +70,19 @@ class HdfsFileSystemTest : public testing::Test {
static void TearDownTestSuite() {
miniCluster->stop();
}

static std::unique_ptr<WriteFile> openFileForWrite(std::string_view path) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
std::string hdfsFilePath = "hdfs://" + localhost + ":" +
miniCluster->nameNodePort() + std::string(path);
auto hdfsFileSystem = filesystems::getFileSystem(hdfsFilePath, config);
return hdfsFileSystem->openFileForWrite(path);
}

static std::atomic<bool> startThreads;
static std::shared_ptr<filesystems::test::HdfsMiniCluster> miniCluster;
static std::string fullDestinationPath_;

private:
static std::shared_ptr<::exec::test::TempFilePath> createFile() {
Expand Down Expand Up @@ -107,15 +121,6 @@ void readData(ReadFile* readFile) {
ASSERT_EQ(warfFromBuf, "abbbbbcc");
}

std::unique_ptr<WriteFile> openFileForWrite(std::string_view path) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
std::string hdfsFilePath =
"hdfs://" + localhost + ":" + hdfsPort + std::string(path);
auto hdfsFileSystem = filesystems::getFileSystem(hdfsFilePath, config);
return hdfsFileSystem->openFileForWrite(path);
}

void checkReadErrorMessages(
ReadFile* readFile,
std::string errorMessage,
Expand Down Expand Up @@ -184,7 +189,8 @@ void verifyFailures(LibHdfsShim* driver, hdfsFS hdfs) {
}

hdfsFS connectHdfsDriver(
filesystems::arrow::io::internal::LibHdfsShim** driver) {
filesystems::arrow::io::internal::LibHdfsShim** driver,
const std::string& port) {
filesystems::arrow::io::internal::LibHdfsShim* libhdfs_shim;
auto status = filesystems::arrow::io::internal::ConnectLibHdfs(&libhdfs_shim);
if (!status.ok()) {
Expand All @@ -194,44 +200,46 @@ hdfsFS connectHdfsDriver(
// Connect to HDFS with the builder object
hdfsBuilder* builder = libhdfs_shim->NewBuilder();
libhdfs_shim->BuilderSetNameNode(builder, localhost.c_str());
libhdfs_shim->BuilderSetNameNodePort(builder, 7878);
libhdfs_shim->BuilderSetNameNodePort(builder, std::stoi(port));
libhdfs_shim->BuilderSetForceNewInstance(builder);

auto hdfs = libhdfs_shim->BuilderConnect(builder);
VELOX_CHECK_NOT_NULL(
hdfs,
"Unable to connect to HDFS: {}, got error",
std::string(localhost.c_str()) + ":7878");
std::string(localhost.c_str()) + ":" + port);
*driver = libhdfs_shim;
return hdfs;
}

TEST_F(HdfsFileSystemTest, read) {
filesystems::arrow::io::internal::LibHdfsShim* driver;
auto hdfs = connectHdfsDriver(&driver);
auto hdfs = connectHdfsDriver(&driver, miniCluster->nameNodePort());
HdfsReadFile readFile(driver, hdfs, destinationPath);
readData(&readFile);
}

TEST_F(HdfsFileSystemTest, viaFileSystem) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath_);
readData(readFile.get());
}

TEST_F(HdfsFileSystemTest, initializeFsWithEndpointInfoInFilePath) {
// Without host/port configured.
auto config = std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>());
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath_);
readData(readFile.get());

// Wrong endpoint info specified in hdfs file path.
const std::string wrongFullDestinationPath =
"hdfs://not_exist_host:" + hdfsPort + destinationPath;
"hdfs://not_exist_host:" + miniCluster->nameNodePort() + destinationPath;
VELOX_ASSERT_THROW(
filesystems::getFileSystem(wrongFullDestinationPath, config),
"Unable to connect to HDFS");
Expand All @@ -240,23 +248,25 @@ TEST_F(HdfsFileSystemTest, initializeFsWithEndpointInfoInFilePath) {
TEST_F(HdfsFileSystemTest, fallbackToUseConfig) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath_);
readData(readFile.get());
}

TEST_F(HdfsFileSystemTest, oneFsInstanceForOneEndpoint) {
auto hdfsFileSystem1 =
filesystems::getFileSystem(fullDestinationPath, nullptr);
filesystems::getFileSystem(fullDestinationPath_, nullptr);
auto hdfsFileSystem2 =
filesystems::getFileSystem(fullDestinationPath, nullptr);
filesystems::getFileSystem(fullDestinationPath_, nullptr);
ASSERT_TRUE(hdfsFileSystem1 == hdfsFileSystem2);
}

TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);

VELOX_ASSERT_RUNTIME_THROW_CODE(
hdfsFileSystem->openFileForRead(
Expand All @@ -268,7 +278,7 @@ TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) {
TEST_F(HdfsFileSystemTest, missingHost) {
try {
std::unordered_map<std::string, std::string> missingHostConfiguration(
{{"hive.hdfs.port", hdfsPort}});
{{"hive.hdfs.port", miniCluster->nameNodePort()}});
auto config = std::make_shared<const config::ConfigBase>(
std::move(missingHostConfiguration));
filesystems::HdfsFileSystem hdfsFileSystem(
Expand Down Expand Up @@ -306,7 +316,7 @@ TEST_F(HdfsFileSystemTest, missingPort) {
TEST_F(HdfsFileSystemTest, missingFileViaReadFile) {
try {
filesystems::arrow::io::internal::LibHdfsShim* driver;
auto hdfs = connectHdfsDriver(&driver);
auto hdfs = connectHdfsDriver(&driver, miniCluster->nameNodePort());
HdfsReadFile readFile(driver, hdfs, "/path/that/does/not/exist");
FAIL() << "expected VeloxException";
} catch (VeloxException const& error) {
Expand All @@ -329,8 +339,8 @@ TEST_F(HdfsFileSystemTest, schemeMatching) {
"No registered file system matched with file path 'file://'"));
}
auto fs = std::dynamic_pointer_cast<filesystems::HdfsFileSystem>(
filesystems::getFileSystem(fullDestinationPath, nullptr));
ASSERT_TRUE(fs->isHdfsFile(fullDestinationPath));
filesystems::getFileSystem(fullDestinationPath_, nullptr));
ASSERT_TRUE(fs->isHdfsFile(fullDestinationPath_));

fs = std::dynamic_pointer_cast<filesystems::HdfsFileSystem>(
filesystems::getFileSystem(viewfsDestinationPath, nullptr));
Expand All @@ -342,7 +352,7 @@ TEST_F(HdfsFileSystemTest, writeNotSupported) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath, config);
filesystems::getFileSystem(fullDestinationPath_, config);
hdfsFileSystem->openFileForWrite("/path");
} catch (VeloxException const& error) {
EXPECT_EQ(error.message(), "Write to HDFS is unsupported");
Expand All @@ -354,7 +364,7 @@ TEST_F(HdfsFileSystemTest, removeNotSupported) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath, config);
filesystems::getFileSystem(fullDestinationPath_, config);
hdfsFileSystem->remove("/path");
} catch (VeloxException const& error) {
EXPECT_EQ(error.message(), "Does not support removing files from hdfs");
Expand All @@ -365,7 +375,7 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithReadFile) {
startThreads = false;

filesystems::arrow::io::internal::LibHdfsShim* driver;
auto hdfs = connectHdfsDriver(&driver);
auto hdfs = connectHdfsDriver(&driver, miniCluster->nameNodePort());

std::vector<std::thread> threads;
std::mt19937 generator(std::random_device{}());
Expand Down Expand Up @@ -396,7 +406,8 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithFileSystem) {
startThreads = false;
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto hdfsFileSystem =
filesystems::getFileSystem(fullDestinationPath_, config);

std::vector<std::thread> threads;
std::mt19937 generator(std::random_device{}());
Expand All @@ -414,7 +425,7 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithFileSystem) {
}
std::this_thread::sleep_for(
std::chrono::microseconds(sleepTimesInMicroseconds[index]));
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath_);
readData(readFile.get());
});
threads.emplace_back(std::move(thread));
Expand Down Expand Up @@ -478,6 +489,6 @@ TEST_F(HdfsFileSystemTest, writeWithParentDirNotExist) {

TEST_F(HdfsFileSystemTest, readFailures) {
filesystems::arrow::io::internal::LibHdfsShim* driver;
auto hdfs = connectHdfsDriver(&driver);
auto hdfs = connectHdfsDriver(&driver, miniCluster->nameNodePort());
verifyFailures(driver, hdfs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class HdfsMiniCluster {
void addFile(std::string source, std::string destination);
virtual ~HdfsMiniCluster();

std::string nameNodePort() const {
return nameNodePort_;
}

private:
void setupEnvironment(const std::string& homeDirectory);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ TEST_F(InsertIntoHdfsTest, insertIntoHdfsTest) {
makeFlatVector<int16_t>(expectedRows, [](auto row) { return row; }),
makeFlatVector<double>(expectedRows, [](auto row) { return row; })});

auto outputDirectory = "hdfs://localhost:7878/";
auto outputDirectory = "hdfs://localhost:/" + miniCluster->nameNodePort();
// INSERT INTO hdfs with one writer
auto plan = PlanBuilder()
.values({input})
Expand Down

0 comments on commit 792e047

Please sign in to comment.