diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 13491e780e21b..8c6d49f92e606 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -17,6 +17,7 @@ #include +#include #include #include #include @@ -51,6 +52,8 @@ static Status CheckReadResult(int ret) { return Status::OK(); } +static constexpr int kDefaultHdfsBufferSize = 1 << 16; + // ---------------------------------------------------------------------- // File reading @@ -124,9 +127,16 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { } Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { - tSize ret = hdfsRead(fs_, file_, reinterpret_cast(buffer), nbytes); - RETURN_NOT_OK(CheckReadResult(ret)); - *bytes_read = ret; + int64_t total_bytes = 0; + while (total_bytes < nbytes) { + tSize ret = hdfsRead(fs_, file_, reinterpret_cast(buffer + total_bytes), + std::min(buffer_size_, nbytes - total_bytes)); + RETURN_NOT_OK(CheckReadResult(ret)); + total_bytes += ret; + if (ret == 0) { break; } + } + + *bytes_read = total_bytes; return Status::OK(); } @@ -136,7 +146,6 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { int64_t bytes_read = 0; RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); - if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } *out = buffer; @@ -154,8 +163,11 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { void set_memory_pool(MemoryPool* pool) { pool_ = pool; } + void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; } + private: MemoryPool* pool_; + int32_t buffer_size_; }; HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) { @@ -384,8 +396,9 @@ class HdfsClient::HdfsClientImpl { return Status::OK(); } - Status OpenReadable(const std::string& path, std::shared_ptr* file) { - hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, 0, 0, 0); + Status OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr* file) { + hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0); if (handle == nullptr) { // TODO(wesm): determine cause of failure @@ -397,6 +410,7 @@ class HdfsClient::HdfsClientImpl { // std::make_shared does not work with private ctors *file = std::shared_ptr(new HdfsReadableFile()); (*file)->impl_->set_members(path, fs_, handle); + (*file)->impl_->set_buffer_size(buffer_size); return Status::OK(); } @@ -490,9 +504,14 @@ Status HdfsClient::ListDirectory( return impl_->ListDirectory(path, listing); } +Status HdfsClient::OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr* file) { + return impl_->OpenReadable(path, buffer_size, file); +} + Status HdfsClient::OpenReadable( const std::string& path, std::shared_ptr* file) { - return impl_->OpenReadable(path, file); + return OpenReadable(path, kDefaultHdfsBufferSize, file); } Status HdfsClient::OpenWriteable(const std::string& path, bool append, diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index 48699c914503e..1c76f15c397ce 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -128,6 +128,9 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { // status if the file is not found. // // @param path complete file path + Status OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr* file); + Status OpenReadable(const std::string& path, std::shared_ptr* file); // FileMode::WRITE options diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc index 7901932dee676..8338de6d96a55 100644 --- a/cpp/src/arrow/io/io-hdfs-test.cc +++ b/cpp/src/arrow/io/io-hdfs-test.cc @@ -293,6 +293,39 @@ TEST_F(TestHdfsClient, ReadableMethods) { ASSERT_EQ(60, position); } +TEST_F(TestHdfsClient, LargeFile) { + SKIP_IF_NO_LIBHDFS(); + + ASSERT_OK(MakeScratchDir()); + + auto path = ScratchPath("test-large-file"); + const int size = 1000000; + + std::vector data = RandomData(size); + ASSERT_OK(WriteDummyFile(path, data.data(), size)); + + std::shared_ptr file; + ASSERT_OK(client_->OpenReadable(path, &file)); + + auto buffer = std::make_shared(); + ASSERT_OK(buffer->Resize(size)); + int64_t bytes_read = 0; + + ASSERT_OK(file->Read(size, &bytes_read, buffer->mutable_data())); + ASSERT_EQ(0, std::memcmp(buffer->data(), data.data(), size)); + ASSERT_EQ(size, bytes_read); + + // explicit buffer size + std::shared_ptr file2; + ASSERT_OK(client_->OpenReadable(path, 1 << 18, &file2)); + + auto buffer2 = std::make_shared(); + ASSERT_OK(buffer2->Resize(size)); + ASSERT_OK(file2->Read(size, &bytes_read, buffer2->mutable_data())); + ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size)); + ASSERT_EQ(size, bytes_read); +} + TEST_F(TestHdfsClient, RenameFile) { SKIP_IF_NO_LIBHDFS();