Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-406: [C++] Set explicit 64K HDFS buffer size, test large reads #226

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions cpp/src/arrow/io/hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <hdfs.h>

#include <algorithm>
#include <cstdint>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -51,6 +52,8 @@ static Status CheckReadResult(int ret) {
return Status::OK();
}

static constexpr int kDefaultHdfsBufferSize = 1 << 16;

// ----------------------------------------------------------------------
// File reading

Expand Down Expand Up @@ -124,9 +127,16 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
}

Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer), nbytes);
RETURN_NOT_OK(CheckReadResult(ret));
*bytes_read = ret;
int64_t total_bytes = 0;
while (total_bytes < nbytes) {
tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer + total_bytes),
std::min<int64_t>(buffer_size_, nbytes - total_bytes));
RETURN_NOT_OK(CheckReadResult(ret));
total_bytes += ret;
if (ret == 0) { break; }
}

*bytes_read = total_bytes;
return Status::OK();
}

Expand All @@ -136,7 +146,6 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {

int64_t bytes_read = 0;
RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));

if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); }

*out = buffer;
Expand All @@ -154,8 +163,11 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {

void set_memory_pool(MemoryPool* pool) { pool_ = pool; }

void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; }

private:
MemoryPool* pool_;
int32_t buffer_size_;
};

HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) {
Expand Down Expand Up @@ -384,8 +396,9 @@ class HdfsClient::HdfsClientImpl {
return Status::OK();
}

Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file) {
hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, 0, 0, 0);
Status OpenReadable(const std::string& path, int32_t buffer_size,
std::shared_ptr<HdfsReadableFile>* file) {
hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0);

if (handle == nullptr) {
// TODO(wesm): determine cause of failure
Expand All @@ -397,6 +410,7 @@ class HdfsClient::HdfsClientImpl {
// std::make_shared does not work with private ctors
*file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile());
(*file)->impl_->set_members(path, fs_, handle);
(*file)->impl_->set_buffer_size(buffer_size);

return Status::OK();
}
Expand Down Expand Up @@ -490,9 +504,14 @@ Status HdfsClient::ListDirectory(
return impl_->ListDirectory(path, listing);
}

Status HdfsClient::OpenReadable(const std::string& path, int32_t buffer_size,
std::shared_ptr<HdfsReadableFile>* file) {
return impl_->OpenReadable(path, buffer_size, file);
}

Status HdfsClient::OpenReadable(
const std::string& path, std::shared_ptr<HdfsReadableFile>* file) {
return impl_->OpenReadable(path, file);
return OpenReadable(path, kDefaultHdfsBufferSize, file);
}

Status HdfsClient::OpenWriteable(const std::string& path, bool append,
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/io/hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient {
// status if the file is not found.
//
// @param path complete file path
Status OpenReadable(const std::string& path, int32_t buffer_size,
std::shared_ptr<HdfsReadableFile>* file);

Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file);

// FileMode::WRITE options
Expand Down
33 changes: 33 additions & 0 deletions cpp/src/arrow/io/io-hdfs-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,39 @@ TEST_F(TestHdfsClient, ReadableMethods) {
ASSERT_EQ(60, position);
}

TEST_F(TestHdfsClient, LargeFile) {
SKIP_IF_NO_LIBHDFS();

ASSERT_OK(MakeScratchDir());

auto path = ScratchPath("test-large-file");
const int size = 1000000;

std::vector<uint8_t> data = RandomData(size);
ASSERT_OK(WriteDummyFile(path, data.data(), size));

std::shared_ptr<HdfsReadableFile> file;
ASSERT_OK(client_->OpenReadable(path, &file));

auto buffer = std::make_shared<PoolBuffer>();
ASSERT_OK(buffer->Resize(size));
int64_t bytes_read = 0;

ASSERT_OK(file->Read(size, &bytes_read, buffer->mutable_data()));
ASSERT_EQ(0, std::memcmp(buffer->data(), data.data(), size));
ASSERT_EQ(size, bytes_read);

// explicit buffer size
std::shared_ptr<HdfsReadableFile> file2;
ASSERT_OK(client_->OpenReadable(path, 1 << 18, &file2));

auto buffer2 = std::make_shared<PoolBuffer>();
ASSERT_OK(buffer2->Resize(size));
ASSERT_OK(file2->Read(size, &bytes_read, buffer2->mutable_data()));
ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size));
ASSERT_EQ(size, bytes_read);
}

TEST_F(TestHdfsClient, RenameFile) {
SKIP_IF_NO_LIBHDFS();

Expand Down