diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a39a7521231..be95dabf318 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -626,12 +626,6 @@ set(ARROW_SRCS src/arrow/table.cc src/arrow/type.cc - # IPC / Shared memory library; to be turned into an optional component - src/arrow/ipc/adapter.cc - src/arrow/ipc/memory.cc - src/arrow/ipc/metadata.cc - src/arrow/ipc/metadata-internal.cc - src/arrow/types/construct.cc src/arrow/types/decimal.cc src/arrow/types/json.cc diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index b8c0e138afb..87e227ef80d 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -20,6 +20,7 @@ set(ARROW_IO_LINK_LIBS arrow_shared + dl ) if (ARROW_BOOST_USE_SHARED) @@ -37,6 +38,7 @@ set(ARROW_IO_TEST_LINK_LIBS ${ARROW_IO_PRIVATE_LINK_LIBS}) set(ARROW_IO_SRCS + memory.cc ) if(ARROW_HDFS) @@ -71,8 +73,8 @@ if(ARROW_HDFS) ${ARROW_HDFS_SRCS} ${ARROW_IO_SRCS}) - ADD_ARROW_TEST(hdfs-io-test) - ARROW_TEST_LINK_LIBRARIES(hdfs-io-test + ADD_ARROW_TEST(io-hdfs-test) + ARROW_TEST_LINK_LIBRARIES(io-hdfs-test ${ARROW_IO_TEST_LINK_LIBS}) endif() @@ -101,10 +103,15 @@ if (APPLE) INSTALL_NAME_DIR "@rpath") endif() +ADD_ARROW_TEST(io-memory-test) +ARROW_TEST_LINK_LIBRARIES(io-memory-test + ${ARROW_IO_TEST_LINK_LIBS}) + # Headers: top level install(FILES hdfs.h interfaces.h + memory.h DESTINATION include/arrow/io) install(TARGETS arrow_io diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 800c3edf4f3..a6b4b2f3846 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -142,6 +142,15 @@ Status HdfsReadableFile::ReadAt( return impl_->ReadAt(position, nbytes, bytes_read, buffer); } +Status HdfsReadableFile::ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) { + return Status::NotImplemented("Not yet implemented"); +} + +bool HdfsReadableFile::supports_zero_copy() const { + return false; +} + Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { return impl_->Read(nbytes, bytes_read, buffer); } @@ -162,9 +171,9 @@ Status HdfsReadableFile::Tell(int64_t* position) { // File writing // Private implementation for writeable-only files -class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl { +class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl { public: - HdfsWriteableFileImpl() {} + HdfsOutputStreamImpl() {} Status Close() { if (is_open_) { @@ -185,29 +194,29 @@ class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl { } }; -HdfsWriteableFile::HdfsWriteableFile() { - impl_.reset(new HdfsWriteableFileImpl()); +HdfsOutputStream::HdfsOutputStream() { + impl_.reset(new HdfsOutputStreamImpl()); } -HdfsWriteableFile::~HdfsWriteableFile() { +HdfsOutputStream::~HdfsOutputStream() { impl_->Close(); } -Status HdfsWriteableFile::Close() { +Status HdfsOutputStream::Close() { return impl_->Close(); } -Status HdfsWriteableFile::Write( +Status HdfsOutputStream::Write( const uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) { return impl_->Write(buffer, nbytes, bytes_read); } -Status HdfsWriteableFile::Write(const uint8_t* buffer, int64_t nbytes) { +Status HdfsOutputStream::Write(const uint8_t* buffer, int64_t nbytes) { int64_t bytes_written_dummy = 0; return Write(buffer, nbytes, &bytes_written_dummy); } -Status HdfsWriteableFile::Tell(int64_t* position) { +Status HdfsOutputStream::Tell(int64_t* position) { return impl_->Tell(position); } @@ -347,7 +356,7 @@ class HdfsClient::HdfsClientImpl { Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size, int16_t replication, int64_t default_block_size, - std::shared_ptr* file) { + std::shared_ptr* file) { int flags = O_WRONLY; if (append) flags |= O_APPEND; @@ -362,7 +371,7 @@ class HdfsClient::HdfsClientImpl { } // std::make_shared does not work with private ctors - *file = std::shared_ptr(new HdfsWriteableFile()); + *file = std::shared_ptr(new HdfsOutputStream()); (*file)->impl_->set_members(path, fs_, handle); return Status::OK(); @@ -440,13 +449,13 @@ Status HdfsClient::OpenReadable( Status HdfsClient::OpenWriteable(const std::string& path, bool append, int32_t buffer_size, int16_t replication, int64_t default_block_size, - std::shared_ptr* file) { + std::shared_ptr* file) { return impl_->OpenWriteable( path, append, buffer_size, replication, default_block_size, file); } Status HdfsClient::OpenWriteable( - const std::string& path, bool append, std::shared_ptr* file) { + const std::string& path, bool append, std::shared_ptr* file) { return OpenWriteable(path, append, 0, 0, 0, file); } diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index b6449fcb88a..39720cc17e4 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -29,13 +29,14 @@ namespace arrow { +class Buffer; class Status; namespace io { class HdfsClient; class HdfsReadableFile; -class HdfsWriteableFile; +class HdfsOutputStream; struct HdfsPathInfo { ObjectType::type kind; @@ -139,14 +140,14 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { // @param default_block_size, 0 for default Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size, int16_t replication, int64_t default_block_size, - std::shared_ptr* file); + std::shared_ptr* file); Status OpenWriteable( - const std::string& path, bool append, std::shared_ptr* file); + const std::string& path, bool append, std::shared_ptr* file); private: friend class HdfsReadableFile; - friend class HdfsWriteableFile; + friend class HdfsOutputStream; class ARROW_NO_EXPORT HdfsClientImpl; std::unique_ptr impl_; @@ -155,7 +156,7 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { DISALLOW_COPY_AND_ASSIGN(HdfsClient); }; -class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { +class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface { public: ~HdfsReadableFile(); @@ -166,6 +167,10 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { Status ReadAt( int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + + bool supports_zero_copy() const override; + Status Seek(int64_t position) override; Status Tell(int64_t* position) override; @@ -183,9 +188,11 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile); }; -class ARROW_EXPORT HdfsWriteableFile : public WriteableFile { +// Naming this file OutputStream because it does not support seeking (like the +// WriteableFile interface) +class ARROW_EXPORT HdfsOutputStream : public OutputStream { public: - ~HdfsWriteableFile(); + ~HdfsOutputStream(); Status Close() override; @@ -196,14 +203,14 @@ class ARROW_EXPORT HdfsWriteableFile : public WriteableFile { Status Tell(int64_t* position) override; private: - class ARROW_NO_EXPORT HdfsWriteableFileImpl; - std::unique_ptr impl_; + class ARROW_NO_EXPORT HdfsOutputStreamImpl; + std::unique_ptr impl_; friend class HdfsClient::HdfsClientImpl; - HdfsWriteableFile(); + HdfsOutputStream(); - DISALLOW_COPY_AND_ASSIGN(HdfsWriteableFile); + DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream); }; Status ARROW_EXPORT ConnectLibHdfs(); diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index c2128525371..fa34b43b2c9 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -21,8 +21,11 @@ #include #include +#include "arrow/util/macros.h" + namespace arrow { +class Buffer; class Status; namespace io { @@ -40,30 +43,78 @@ class FileSystemClient { virtual ~FileSystemClient() {} }; -class FileBase { +class FileInterface { public: + virtual ~FileInterface() {} virtual Status Close() = 0; virtual Status Tell(int64_t* position) = 0; + + FileMode::type mode() const { return mode_; } + + protected: + FileInterface() {} + FileMode::type mode_; + + void set_mode(FileMode::type mode) { mode_ = mode; } + + private: + DISALLOW_COPY_AND_ASSIGN(FileInterface); }; -class ReadableFile : public FileBase { +class Seekable { public: - virtual Status ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0; + virtual Status Seek(int64_t position) = 0; +}; - virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0; +class Writeable { + public: + virtual Status Write(const uint8_t* data, int64_t nbytes) = 0; +}; - virtual Status GetSize(int64_t* size) = 0; +class Readable { + public: + virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0; +}; + +class OutputStream : public FileInterface, public Writeable { + protected: + OutputStream() {} }; -class RandomAccessFile : public ReadableFile { +class InputStream : public FileInterface, public Readable { + protected: + InputStream() {} +}; + +class ReadableFileInterface : public InputStream, public Seekable { public: - virtual Status Seek(int64_t position) = 0; + virtual Status ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0; + + virtual Status GetSize(int64_t* size) = 0; + + // Does not copy if not necessary + virtual Status ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) = 0; + + virtual bool supports_zero_copy() const = 0; + + protected: + ReadableFileInterface() { set_mode(FileMode::READ); } }; -class WriteableFile : public FileBase { +class WriteableFileInterface : public OutputStream, public Seekable { public: - virtual Status Write(const uint8_t* buffer, int64_t nbytes) = 0; + virtual Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) = 0; + + protected: + WriteableFileInterface() { set_mode(FileMode::READ); } +}; + +class ReadWriteFileInterface : public ReadableFileInterface, + public WriteableFileInterface { + protected: + ReadWriteFileInterface() { ReadableFileInterface::set_mode(FileMode::READWRITE); } }; } // namespace io diff --git a/cpp/src/arrow/io/hdfs-io-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc similarity index 99% rename from cpp/src/arrow/io/hdfs-io-test.cc rename to cpp/src/arrow/io/io-hdfs-test.cc index e48a28142fa..7901932dee6 100644 --- a/cpp/src/arrow/io/hdfs-io-test.cc +++ b/cpp/src/arrow/io/io-hdfs-test.cc @@ -49,7 +49,7 @@ class TestHdfsClient : public ::testing::Test { Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size, bool append = false, int buffer_size = 0, int replication = 0, int default_block_size = 0) { - std::shared_ptr file; + std::shared_ptr file; RETURN_NOT_OK(client_->OpenWriteable( path, append, buffer_size, replication, default_block_size, &file)); diff --git a/cpp/src/arrow/ipc/ipc-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc similarity index 66% rename from cpp/src/arrow/ipc/ipc-memory-test.cc rename to cpp/src/arrow/io/io-memory-test.cc index a2dbd35728c..6de35dab59b 100644 --- a/cpp/src/arrow/ipc/ipc-memory-test.cc +++ b/cpp/src/arrow/io/io-memory-test.cc @@ -24,20 +24,20 @@ #include "gtest/gtest.h" -#include "arrow/ipc/memory.h" -#include "arrow/ipc/test-common.h" +#include "arrow/io/memory.h" +#include "arrow/io/test-common.h" namespace arrow { -namespace ipc { +namespace io { -class TestMemoryMappedSource : public ::testing::Test, public MemoryMapFixture { +class TestMemoryMappedFile : public ::testing::Test, public MemoryMapFixture { public: void TearDown() { MemoryMapFixture::TearDown(); } }; -TEST_F(TestMemoryMappedSource, InvalidUsages) {} +TEST_F(TestMemoryMappedFile, InvalidUsages) {} -TEST_F(TestMemoryMappedSource, WriteRead) { +TEST_F(TestMemoryMappedFile, WriteRead) { const int64_t buffer_size = 1024; std::vector buffer(buffer_size); @@ -48,14 +48,13 @@ TEST_F(TestMemoryMappedSource, WriteRead) { std::string path = "ipc-write-read-test"; CreateFile(path, reps * buffer_size); - std::shared_ptr result; - ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &result)); + std::shared_ptr result; + ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &result)); int64_t position = 0; - std::shared_ptr out_buffer; for (int i = 0; i < reps; ++i) { - ASSERT_OK(result->Write(position, buffer.data(), buffer_size)); + ASSERT_OK(result->Write(buffer.data(), buffer_size)); ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer)); ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); @@ -64,7 +63,7 @@ TEST_F(TestMemoryMappedSource, WriteRead) { } } -TEST_F(TestMemoryMappedSource, ReadOnly) { +TEST_F(TestMemoryMappedFile, ReadOnly) { const int64_t buffer_size = 1024; std::vector buffer(buffer_size); @@ -75,19 +74,18 @@ TEST_F(TestMemoryMappedSource, ReadOnly) { std::string path = "ipc-read-only-test"; CreateFile(path, reps * buffer_size); - std::shared_ptr rwmmap; - ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &rwmmap)); + std::shared_ptr rwmmap; + ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &rwmmap)); int64_t position = 0; for (int i = 0; i < reps; ++i) { - ASSERT_OK(rwmmap->Write(position, buffer.data(), buffer_size)); - + ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size)); position += buffer_size; } rwmmap->Close(); - std::shared_ptr rommap; - ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_ONLY, &rommap)); + std::shared_ptr rommap; + ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap)); position = 0; std::shared_ptr out_buffer; @@ -100,7 +98,7 @@ TEST_F(TestMemoryMappedSource, ReadOnly) { rommap->Close(); } -TEST_F(TestMemoryMappedSource, InvalidMode) { +TEST_F(TestMemoryMappedFile, InvalidMode) { const int64_t buffer_size = 1024; std::vector buffer(buffer_size); @@ -109,19 +107,19 @@ TEST_F(TestMemoryMappedSource, InvalidMode) { std::string path = "ipc-invalid-mode-test"; CreateFile(path, buffer_size); - std::shared_ptr rommap; - ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_ONLY, &rommap)); + std::shared_ptr rommap; + ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap)); - ASSERT_RAISES(IOError, rommap->Write(0, buffer.data(), buffer_size)); + ASSERT_RAISES(IOError, rommap->Write(buffer.data(), buffer_size)); } -TEST_F(TestMemoryMappedSource, InvalidFile) { +TEST_F(TestMemoryMappedFile, InvalidFile) { std::string non_existent_path = "invalid-file-name-asfd"; - std::shared_ptr result; - ASSERT_RAISES(IOError, - MemoryMappedSource::Open(non_existent_path, MemorySource::READ_ONLY, &result)); + std::shared_ptr result; + ASSERT_RAISES( + IOError, MemoryMappedFile::Open(non_existent_path, FileMode::READ, &result)); } -} // namespace ipc +} // namespace io } // namespace arrow diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc index 003570d4fde..0b805abf94c 100644 --- a/cpp/src/arrow/io/libhdfs_shim.cc +++ b/cpp/src/arrow/io/libhdfs_shim.cc @@ -51,8 +51,7 @@ extern "C" { #include #include -#include // NOLINT -#include // NOLINT +#include // NOLINT #include "arrow/util/status.h" #include "arrow/util/visibility.h" diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc new file mode 100644 index 00000000000..1dd6c3a0230 --- /dev/null +++ b/cpp/src/arrow/io/memory.cc @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/io/memory.h" + +#include // For memory-mapping + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/io/interfaces.h" + +#include "arrow/util/buffer.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace io { + +// Implement MemoryMappedFile + +class MemoryMappedFile::MemoryMappedFileImpl { + public: + MemoryMappedFileImpl() + : file_(nullptr), is_open_(false), is_writable_(false), data_(nullptr) {} + + ~MemoryMappedFileImpl() { + if (is_open_) { + munmap(data_, size_); + fclose(file_); + } + } + + Status Open(const std::string& path, FileMode::type mode) { + if (is_open_) { return Status::IOError("A file is already open"); } + + int prot_flags = PROT_READ; + + if (mode == FileMode::READWRITE) { + file_ = fopen(path.c_str(), "r+b"); + prot_flags |= PROT_WRITE; + is_writable_ = true; + } else { + file_ = fopen(path.c_str(), "rb"); + } + if (file_ == nullptr) { + std::stringstream ss; + ss << "Unable to open file, errno: " << errno; + return Status::IOError(ss.str()); + } + + fseek(file_, 0L, SEEK_END); + if (ferror(file_)) { return Status::IOError("Unable to seek to end of file"); } + size_ = ftell(file_); + + fseek(file_, 0L, SEEK_SET); + is_open_ = true; + position_ = 0; + + void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fileno(file_), 0); + if (result == MAP_FAILED) { + std::stringstream ss; + ss << "Memory mapping file failed, errno: " << errno; + return Status::IOError(ss.str()); + } + data_ = reinterpret_cast(result); + + return Status::OK(); + } + + int64_t size() const { return size_; } + + Status Seek(int64_t position) { + if (position < 0 || position >= size_) { + return Status::Invalid("position is out of bounds"); + } + position_ = position; + return Status::OK(); + } + + int64_t position() { return position_; } + + void advance(int64_t nbytes) { position_ = std::min(size_, position_ + nbytes); } + + uint8_t* data() { return data_; } + + uint8_t* head() { return data_ + position_; } + + bool writable() { return is_writable_; } + + bool opened() { return is_open_; } + + private: + FILE* file_; + int64_t position_; + int64_t size_; + bool is_open_; + bool is_writable_; + + // The memory map + uint8_t* data_; +}; + +MemoryMappedFile::MemoryMappedFile(FileMode::type mode) { + ReadableFileInterface::set_mode(mode); +} + +Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode, + std::shared_ptr* out) { + std::shared_ptr result(new MemoryMappedFile(mode)); + + result->impl_.reset(new MemoryMappedFileImpl()); + RETURN_NOT_OK(result->impl_->Open(path, mode)); + + *out = result; + return Status::OK(); +} + +Status MemoryMappedFile::GetSize(int64_t* size) { + *size = impl_->size(); + return Status::OK(); +} + +Status MemoryMappedFile::Tell(int64_t* position) { + *position = impl_->position(); + return Status::OK(); +} + +Status MemoryMappedFile::Seek(int64_t position) { + return impl_->Seek(position); +} + +Status MemoryMappedFile::Close() { + // munmap handled in pimpl dtor + return Status::OK(); +} + +Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) { + nbytes = std::min(nbytes, impl_->size() - impl_->position()); + std::memcpy(out, impl_->head(), nbytes); + *bytes_read = nbytes; + impl_->advance(nbytes); + return Status::OK(); +} + +Status MemoryMappedFile::ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) { + RETURN_NOT_OK(impl_->Seek(position)); + return Read(nbytes, bytes_read, out); +} + +Status MemoryMappedFile::ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) { + nbytes = std::min(nbytes, impl_->size() - position); + RETURN_NOT_OK(impl_->Seek(position)); + *out = std::make_shared(impl_->head(), nbytes); + impl_->advance(nbytes); + return Status::OK(); +} + +bool MemoryMappedFile::supports_zero_copy() const { + return true; +} + +Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) { + if (!impl_->opened() || !impl_->writable()) { + return Status::IOError("Unable to write"); + } + + RETURN_NOT_OK(impl_->Seek(position)); + return WriteInternal(data, nbytes); +} + +Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) { + if (!impl_->opened() || !impl_->writable()) { + return Status::IOError("Unable to write"); + } + if (nbytes + impl_->position() > impl_->size()) { + return Status::Invalid("Cannot write past end of memory map"); + } + + return WriteInternal(data, nbytes); +} + +Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) { + memcpy(impl_->head(), data, nbytes); + impl_->advance(nbytes); + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// In-memory buffer reader + +Status BufferReader::Close() { + // no-op + return Status::OK(); +} + +Status BufferReader::Tell(int64_t* position) { + *position = position_; + return Status::OK(); +} + +Status BufferReader::ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, bytes_read, buffer); +} + +Status BufferReader::ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) { + int64_t size = std::min(nbytes, buffer_size_ - position_); + *out = std::make_shared(buffer_ + position, size); + position_ += nbytes; + return Status::OK(); +} + +bool BufferReader::supports_zero_copy() const { + return true; +} + +Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { + memcpy(buffer, buffer_ + position_, nbytes); + *bytes_read = std::min(nbytes, buffer_size_ - position_); + position_ += *bytes_read; + return Status::OK(); +} + +Status BufferReader::GetSize(int64_t* size) { + *size = buffer_size_; + return Status::OK(); +} + +Status BufferReader::Seek(int64_t position) { + if (position < 0 || position >= buffer_size_) { + return Status::IOError("position out of bounds"); + } + + position_ = position; + return Status::OK(); +} + +} // namespace io +} // namespace arrow diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h new file mode 100644 index 00000000000..6fe47c3b515 --- /dev/null +++ b/cpp/src/arrow/io/memory.h @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Public API for different memory sharing / IO mechanisms + +#ifndef ARROW_IO_MEMORY_H +#define ARROW_IO_MEMORY_H + +#include +#include +#include + +#include "arrow/io/interfaces.h" + +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Buffer; +class MutableBuffer; +class Status; + +namespace io { + +// An output stream that writes to a MutableBuffer, such as one obtained from a +// memory map +// +// TODO(wesm): Implement this class +class ARROW_EXPORT BufferOutputStream : public OutputStream { + public: + explicit BufferOutputStream(const std::shared_ptr& buffer) + : buffer_(buffer) {} + + // Implement the OutputStream interface + Status Close() override; + Status Tell(int64_t* position) override; + Status Write(const uint8_t* data, int64_t length) override; + + // Returns the number of bytes remaining in the buffer + int64_t bytes_remaining() const; + + private: + std::shared_ptr buffer_; + int64_t capacity_; + int64_t position_; +}; + +// A memory source that uses memory-mapped files for memory interactions +class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { + public: + static Status Open(const std::string& path, FileMode::type mode, + std::shared_ptr* out); + + Status Close() override; + + Status Tell(int64_t* position) override; + + Status Seek(int64_t position) override; + + // Required by ReadableFileInterface, copies memory into out + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override; + + Status ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) override; + + // Read into a buffer, zero copy if possible + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + + bool supports_zero_copy() const override; + + Status Write(const uint8_t* data, int64_t nbytes) override; + + Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override; + + // @return: the size in bytes of the memory source + Status GetSize(int64_t* size) override; + + private: + explicit MemoryMappedFile(FileMode::type mode); + + Status WriteInternal(const uint8_t* data, int64_t nbytes); + + // Hide the internal details of this class for now + class MemoryMappedFileImpl; + std::unique_ptr impl_; +}; + +class ARROW_EXPORT BufferReader : public ReadableFileInterface { + public: + BufferReader(const uint8_t* buffer, int buffer_size) + : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} + + Status Close() override; + Status Tell(int64_t* position) override; + + Status ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + Status GetSize(int64_t* size) override; + Status Seek(int64_t position) override; + + bool supports_zero_copy() const override; + + private: + const uint8_t* buffer_; + int buffer_size_; + int64_t position_; +}; + +} // namespace io +} // namespace arrow + +#endif // ARROW_IO_MEMORY_H diff --git a/cpp/src/arrow/io/test-common.h b/cpp/src/arrow/io/test-common.h new file mode 100644 index 00000000000..1954d479e39 --- /dev/null +++ b/cpp/src/arrow/io/test-common.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_IO_TEST_COMMON_H +#define ARROW_IO_TEST_COMMON_H + +#include +#include +#include +#include +#include + +#include "arrow/io/memory.h" +#include "arrow/test-util.h" +#include "arrow/util/buffer.h" +#include "arrow/util/memory-pool.h" + +namespace arrow { +namespace io { + +class MemoryMapFixture { + public: + void TearDown() { + for (auto path : tmp_files_) { + std::remove(path.c_str()); + } + } + + void CreateFile(const std::string path, int64_t size) { + FILE* file = fopen(path.c_str(), "w"); + if (file != nullptr) { tmp_files_.push_back(path); } + ftruncate(fileno(file), size); + fclose(file); + } + + Status InitMemoryMap( + int64_t size, const std::string& path, std::shared_ptr* mmap) { + CreateFile(path, size); + return MemoryMappedFile::Open(path, FileMode::READWRITE, mmap); + } + + private: + std::vector tmp_files_; +}; + +} // namespace io +} // namespace arrow + +#endif // ARROW_IO_TEST_COMMON_H diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index 82634169ed9..e5553a63581 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -19,16 +19,50 @@ # arrow_ipc ####################################### -# Headers: top level -install(FILES - adapter.h - metadata.h - memory.h - DESTINATION include/arrow/ipc) +set(ARROW_IPC_LINK_LIBS + arrow_io + arrow_shared +) + +set(ARROW_IPC_PRIVATE_LINK_LIBS + ) + +set(ARROW_IPC_TEST_LINK_LIBS + arrow_ipc + ${ARROW_IPC_PRIVATE_LINK_LIBS}) + +set(ARROW_IPC_SRCS + adapter.cc + metadata.cc + metadata-internal.cc +) + +# TODO(wesm): SHARED and STATIC targets +add_library(arrow_ipc SHARED + ${ARROW_IPC_SRCS} +) +target_link_libraries(arrow_ipc + LINK_PUBLIC ${ARROW_IPC_LINK_LIBS} + LINK_PRIVATE ${ARROW_IPC_PRIVATE_LINK_LIBS}) + +if(NOT APPLE) + # Localize thirdparty symbols using a linker version script. This hides them + # from the client application. The OS X linker does not support the + # version-script option. + set(ARROW_IPC_LINK_FLAGS "-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/symbols.map") +endif() + +SET_TARGET_PROPERTIES(arrow_ipc PROPERTIES + LINKER_LANGUAGE CXX + LINK_FLAGS "${ARROW_IPC_LINK_FLAGS}") ADD_ARROW_TEST(ipc-adapter-test) -ADD_ARROW_TEST(ipc-memory-test) +ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test + ${ARROW_IPC_TEST_LINK_LIBS}) + ADD_ARROW_TEST(ipc-metadata-test) +ARROW_TEST_LINK_LIBRARIES(ipc-metadata-test + ${ARROW_IPC_TEST_LINK_LIBS}) # make clean will delete the generated file set_source_files_properties(Metadata_generated.h PROPERTIES GENERATED TRUE) @@ -49,3 +83,13 @@ add_custom_command( add_custom_target(metadata_fbs DEPENDS ${FBS_OUTPUT_FILES}) add_dependencies(arrow_objlib metadata_fbs) + +# Headers: top level +install(FILES + adapter.h + metadata.h + DESTINATION include/arrow/ipc) + +install(TARGETS arrow_ipc + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib) diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index 40d372bbd35..0e101c89303 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -24,9 +24,11 @@ #include "arrow/array.h" #include "arrow/ipc/Message_generated.h" -#include "arrow/ipc/memory.h" #include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/metadata.h" +#include "arrow/ipc/util.h" +#include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" #include "arrow/schema.h" #include "arrow/table.h" #include "arrow/type.h" @@ -144,10 +146,15 @@ class RowBatchWriter { return Status::OK(); } - Status Write(MemorySource* dst, int64_t position, int64_t* data_header_offset) { + Status Write(io::OutputStream* dst, int64_t* data_header_offset) { // Write out all the buffers contiguously and compute the total size of the // memory payload int64_t offset = 0; + + // Get the starting position + int64_t position; + RETURN_NOT_OK(dst->Tell(&position)); + for (size_t i = 0; i < buffers_.size(); ++i) { const Buffer* buffer = buffers_[i].get(); int64_t size = 0; @@ -171,7 +178,7 @@ class RowBatchWriter { buffer_meta_.push_back(flatbuf::Buffer(0, position + offset, size)); if (size > 0) { - RETURN_NOT_OK(dst->Write(position + offset, buffer->data(), size)); + RETURN_NOT_OK(dst->Write(buffer->data(), size)); offset += size; } } @@ -180,7 +187,7 @@ class RowBatchWriter { // memory, the data header can be converted to a flatbuffer and written out // // Note: The memory written here is prefixed by the size of the flatbuffer - // itself as an int32_t. On reading from a MemorySource, you will have to + // itself as an int32_t. On reading from a input, you will have to // determine the data header size then request a buffer such that you can // construct the flatbuffer data accessor object (see arrow::ipc::Message) std::shared_ptr data_header; @@ -188,8 +195,7 @@ class RowBatchWriter { batch_->num_rows(), offset, field_nodes_, buffer_meta_, &data_header)); // Write the data header at the end - RETURN_NOT_OK( - dst->Write(position + offset, data_header->data(), data_header->size())); + RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size())); *data_header_offset = position + offset; return Status::OK(); @@ -199,9 +205,9 @@ class RowBatchWriter { Status GetTotalSize(int64_t* size) { // emulates the behavior of Write without actually writing int64_t data_header_offset; - MockMemorySource source(0); - RETURN_NOT_OK(Write(&source, 0, &data_header_offset)); - *size = source.GetExtentBytesWritten(); + MockOutputStream dst; + RETURN_NOT_OK(Write(&dst, &data_header_offset)); + *size = dst.GetExtentBytesWritten(); return Status::OK(); } @@ -214,12 +220,12 @@ class RowBatchWriter { int max_recursion_depth_; }; -Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position, - int64_t* header_offset, int max_recursion_depth) { +Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch, int64_t* header_offset, + int max_recursion_depth) { DCHECK_GT(max_recursion_depth, 0); RowBatchWriter serializer(batch, max_recursion_depth); RETURN_NOT_OK(serializer.AssemblePayload()); - return serializer.Write(dst, position, header_offset); + return serializer.Write(dst, header_offset); } Status GetRowBatchSize(const RowBatch* batch, int64_t* size) { @@ -234,11 +240,11 @@ Status GetRowBatchSize(const RowBatch* batch, int64_t* size) { static constexpr int64_t INIT_METADATA_SIZE = 4096; -class RowBatchReader::Impl { +class RowBatchReader::RowBatchReaderImpl { public: - Impl(MemorySource* source, const std::shared_ptr& metadata, - int max_recursion_depth) - : source_(source), metadata_(metadata), max_recursion_depth_(max_recursion_depth) { + RowBatchReaderImpl(io::ReadableFileInterface* file, + const std::shared_ptr& metadata, int max_recursion_depth) + : file_(file), metadata_(metadata), max_recursion_depth_(max_recursion_depth) { num_buffers_ = metadata->num_buffers(); num_flattened_fields_ = metadata->num_fields(); } @@ -339,10 +345,11 @@ class RowBatchReader::Impl { Status GetBuffer(int buffer_index, std::shared_ptr* out) { BufferMetadata metadata = metadata_->buffer(buffer_index); RETURN_NOT_OK(CheckMultipleOf64(metadata.length)); - return source_->ReadAt(metadata.offset, metadata.length, out); + return file_->ReadAt(metadata.offset, metadata.length, out); } - MemorySource* source_; + private: + io::ReadableFileInterface* file_; std::shared_ptr metadata_; int field_index_; @@ -352,22 +359,22 @@ class RowBatchReader::Impl { int num_flattened_fields_; }; -Status RowBatchReader::Open( - MemorySource* source, int64_t position, std::shared_ptr* out) { - return Open(source, position, kMaxIpcRecursionDepth, out); +Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position, + std::shared_ptr* out) { + return Open(file, position, kMaxIpcRecursionDepth, out); } -Status RowBatchReader::Open(MemorySource* source, int64_t position, +Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position, int max_recursion_depth, std::shared_ptr* out) { std::shared_ptr metadata; - RETURN_NOT_OK(source->ReadAt(position, INIT_METADATA_SIZE, &metadata)); + RETURN_NOT_OK(file->ReadAt(position, INIT_METADATA_SIZE, &metadata)); int32_t metadata_size = *reinterpret_cast(metadata->data()); - // We may not need to call source->ReadAt again + // We may not need to call ReadAt again if (metadata_size > static_cast(INIT_METADATA_SIZE - sizeof(int32_t))) { // We don't have enough data, read the indicated metadata size. - RETURN_NOT_OK(source->ReadAt(position + sizeof(int32_t), metadata_size, &metadata)); + RETURN_NOT_OK(file->ReadAt(position + sizeof(int32_t), metadata_size, &metadata)); } // TODO(wesm): buffer slicing here would be better in case ReadAt returns @@ -383,14 +390,14 @@ Status RowBatchReader::Open(MemorySource* source, int64_t position, std::shared_ptr batch_meta = message->GetRecordBatch(); std::shared_ptr result(new RowBatchReader()); - result->impl_.reset(new Impl(source, batch_meta, max_recursion_depth)); + result->impl_.reset(new RowBatchReaderImpl(file, batch_meta, max_recursion_depth)); *out = result; return Status::OK(); } // Here the explicit destructor is required for compilers to be aware of -// the complete information of RowBatchReader::Impl class +// the complete information of RowBatchReader::RowBatchReaderImpl class RowBatchReader::~RowBatchReader() {} Status RowBatchReader::GetRowBatch( diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h index 6231af66aa1..215b46f8f65 100644 --- a/cpp/src/arrow/ipc/adapter.h +++ b/cpp/src/arrow/ipc/adapter.h @@ -33,9 +33,15 @@ class RowBatch; class Schema; class Status; +namespace io { + +class ReadableFileInterface; +class OutputStream; + +} // namespace io + namespace ipc { -class MemorySource; class RecordBatchMessage; // ---------------------------------------------------------------------- @@ -43,22 +49,21 @@ class RecordBatchMessage; // We have trouble decoding flatbuffers if the size i > 70, so 64 is a nice round number // TODO(emkornfield) investigate this more constexpr int kMaxIpcRecursionDepth = 64; -// Write the RowBatch (collection of equal-length Arrow arrays) to the memory -// source at the indicated position + +// Write the RowBatch (collection of equal-length Arrow arrays) to the output +// stream // -// First, each of the memory buffers are written out end-to-end in starting at -// the indicated position. +// First, each of the memory buffers are written out end-to-end // // Then, this function writes the batch metadata as a flatbuffer (see // format/Message.fbs -- the RecordBatch message type) like so: // // // -// Finally, the memory offset to the start of the metadata / data header is -// returned in an out-variable -ARROW_EXPORT Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, - int64_t position, int64_t* header_offset, - int max_recursion_depth = kMaxIpcRecursionDepth); +// Finally, the absolute offset (relative to the start of the output stream) to +// the start of the metadata / data header is returned in an out-variable +ARROW_EXPORT Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch, + int64_t* header_offset, int max_recursion_depth = kMaxIpcRecursionDepth); // int64_t GetRowBatchMetadata(const RowBatch* batch); @@ -68,16 +73,16 @@ ARROW_EXPORT Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, ARROW_EXPORT Status GetRowBatchSize(const RowBatch* batch, int64_t* size); // ---------------------------------------------------------------------- -// "Read" path; does not copy data if the MemorySource does not +// "Read" path; does not copy data if the input supports zero copy reads class ARROW_EXPORT RowBatchReader { public: - static Status Open( - MemorySource* source, int64_t position, std::shared_ptr* out); - - static Status Open(MemorySource* source, int64_t position, int max_recursion_depth, + static Status Open(io::ReadableFileInterface* file, int64_t position, std::shared_ptr* out); + static Status Open(io::ReadableFileInterface* file, int64_t position, + int max_recursion_depth, std::shared_ptr* out); + virtual ~RowBatchReader(); // Reassemble the row batch. A Schema is required to be able to construct the @@ -86,8 +91,8 @@ class ARROW_EXPORT RowBatchReader { const std::shared_ptr& schema, std::shared_ptr* out); private: - class Impl; - std::unique_ptr impl_; + class RowBatchReaderImpl; + std::unique_ptr impl_; }; } // namespace ipc diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index 6740e0fc5ac..ca4d0152b90 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -24,9 +24,11 @@ #include "gtest/gtest.h" +#include "arrow/io/memory.h" +#include "arrow/io/test-common.h" #include "arrow/ipc/adapter.h" -#include "arrow/ipc/memory.h" #include "arrow/ipc/test-common.h" +#include "arrow/ipc/util.h" #include "arrow/test-util.h" #include "arrow/types/list.h" @@ -49,17 +51,18 @@ const auto LIST_LIST_INT32 = std::make_shared(LIST_INT32); typedef Status MakeRowBatch(std::shared_ptr* out); class TestWriteRowBatch : public ::testing::TestWithParam, - public MemoryMapFixture { + public io::MemoryMapFixture { public: void SetUp() { pool_ = default_memory_pool(); } - void TearDown() { MemoryMapFixture::TearDown(); } + void TearDown() { io::MemoryMapFixture::TearDown(); } Status RoundTripHelper(const RowBatch& batch, int memory_map_size, std::shared_ptr* batch_result) { std::string path = "test-write-row-batch"; - MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); + io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); int64_t header_location; - RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, 0, &header_location)); + + RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, &header_location)); std::shared_ptr reader; RETURN_NOT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader)); @@ -69,7 +72,7 @@ class TestWriteRowBatch : public ::testing::TestWithParam, } protected: - std::shared_ptr mmap_; + std::shared_ptr mmap_; MemoryPool* pool_; }; @@ -276,12 +279,12 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch, &MakeStringTypesRowBatch, &MakeStruct)); void TestGetRowBatchSize(std::shared_ptr batch) { - MockMemorySource mock_source(1 << 16); + ipc::MockOutputStream mock; int64_t mock_header_location = -1; int64_t size = -1; - ASSERT_OK(WriteRowBatch(&mock_source, batch.get(), 0, &mock_header_location)); + ASSERT_OK(WriteRowBatch(&mock, batch.get(), &mock_header_location)); ASSERT_OK(GetRowBatchSize(batch.get(), &size)); - ASSERT_EQ(mock_source.GetExtentBytesWritten(), size); + ASSERT_EQ(mock.GetExtentBytesWritten(), size); } TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) { @@ -303,10 +306,10 @@ TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) { TestGetRowBatchSize(batch); } -class RecursionLimits : public ::testing::Test, public MemoryMapFixture { +class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { public: void SetUp() { pool_ = default_memory_pool(); } - void TearDown() { MemoryMapFixture::TearDown(); } + void TearDown() { io::MemoryMapFixture::TearDown(); } Status WriteToMmap(int recursion_level, bool override_level, int64_t* header_out = nullptr, std::shared_ptr* schema_out = nullptr) { @@ -329,19 +332,19 @@ class RecursionLimits : public ::testing::Test, public MemoryMapFixture { std::string path = "test-write-past-max-recursion"; const int memory_map_size = 1 << 16; - MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); + io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); int64_t header_location; int64_t* header_out_param = header_out == nullptr ? &header_location : header_out; if (override_level) { return WriteRowBatch( - mmap_.get(), batch.get(), 0, header_out_param, recursion_level + 1); + mmap_.get(), batch.get(), header_out_param, recursion_level + 1); } else { - return WriteRowBatch(mmap_.get(), batch.get(), 0, header_out_param); + return WriteRowBatch(mmap_.get(), batch.get(), header_out_param); } } protected: - std::shared_ptr mmap_; + std::shared_ptr mmap_; MemoryPool* pool_; }; diff --git a/cpp/src/arrow/ipc/memory.cc b/cpp/src/arrow/ipc/memory.cc deleted file mode 100644 index a6c56d64f4a..00000000000 --- a/cpp/src/arrow/ipc/memory.cc +++ /dev/null @@ -1,182 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/ipc/memory.h" - -#include // For memory-mapping - -#include -#include -#include -#include -#include -#include -#include - -#include "arrow/util/buffer.h" -#include "arrow/util/status.h" - -namespace arrow { -namespace ipc { - -MemorySource::MemorySource(AccessMode access_mode) : access_mode_(access_mode) {} - -MemorySource::~MemorySource() {} - -// Implement MemoryMappedSource - -class MemoryMappedSource::Impl { - public: - Impl() : file_(nullptr), is_open_(false), is_writable_(false), data_(nullptr) {} - - ~Impl() { - if (is_open_) { - munmap(data_, size_); - fclose(file_); - } - } - - Status Open(const std::string& path, MemorySource::AccessMode mode) { - if (is_open_) { return Status::IOError("A file is already open"); } - - int prot_flags = PROT_READ; - - if (mode == MemorySource::READ_WRITE) { - file_ = fopen(path.c_str(), "r+b"); - prot_flags |= PROT_WRITE; - is_writable_ = true; - } else { - file_ = fopen(path.c_str(), "rb"); - } - if (file_ == nullptr) { - std::stringstream ss; - ss << "Unable to open file, errno: " << errno; - return Status::IOError(ss.str()); - } - - fseek(file_, 0L, SEEK_END); - if (ferror(file_)) { return Status::IOError("Unable to seek to end of file"); } - size_ = ftell(file_); - - fseek(file_, 0L, SEEK_SET); - is_open_ = true; - - void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fileno(file_), 0); - if (result == MAP_FAILED) { - std::stringstream ss; - ss << "Memory mapping file failed, errno: " << errno; - return Status::IOError(ss.str()); - } - data_ = reinterpret_cast(result); - - return Status::OK(); - } - - int64_t size() const { return size_; } - - uint8_t* data() { return data_; } - - bool writable() { return is_writable_; } - - bool opened() { return is_open_; } - - private: - FILE* file_; - int64_t size_; - bool is_open_; - bool is_writable_; - - // The memory map - uint8_t* data_; -}; - -MemoryMappedSource::MemoryMappedSource(AccessMode access_mode) - : MemorySource(access_mode) {} - -Status MemoryMappedSource::Open(const std::string& path, AccessMode access_mode, - std::shared_ptr* out) { - std::shared_ptr result(new MemoryMappedSource(access_mode)); - - result->impl_.reset(new Impl()); - RETURN_NOT_OK(result->impl_->Open(path, access_mode)); - - *out = result; - return Status::OK(); -} - -int64_t MemoryMappedSource::Size() const { - return impl_->size(); -} - -Status MemoryMappedSource::Close() { - // munmap handled in ::Impl dtor - return Status::OK(); -} - -Status MemoryMappedSource::ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) { - if (position < 0 || position >= impl_->size()) { - return Status::Invalid("position is out of bounds"); - } - - nbytes = std::min(nbytes, impl_->size() - position); - *out = std::make_shared(impl_->data() + position, nbytes); - return Status::OK(); -} - -Status MemoryMappedSource::Write(int64_t position, const uint8_t* data, int64_t nbytes) { - if (!impl_->opened() || !impl_->writable()) { - return Status::IOError("Unable to write"); - } - if (position < 0 || position >= impl_->size()) { - return Status::Invalid("position is out of bounds"); - } - - // TODO(wesm): verify we are not writing past the end of the buffer - uint8_t* dst = impl_->data() + position; - memcpy(dst, data, nbytes); - - return Status::OK(); -} - -MockMemorySource::MockMemorySource(int64_t size) - : size_(size), extent_bytes_written_(0) {} - -Status MockMemorySource::Close() { - return Status::OK(); -} - -Status MockMemorySource::ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) { - return Status::OK(); -} - -Status MockMemorySource::Write(int64_t position, const uint8_t* data, int64_t nbytes) { - extent_bytes_written_ = std::max(extent_bytes_written_, position + nbytes); - return Status::OK(); -} - -int64_t MockMemorySource::Size() const { - return size_; -} - -int64_t MockMemorySource::GetExtentBytesWritten() const { - return extent_bytes_written_; -} - -} // namespace ipc -} // namespace arrow diff --git a/cpp/src/arrow/ipc/memory.h b/cpp/src/arrow/ipc/memory.h deleted file mode 100644 index 377401d85c0..00000000000 --- a/cpp/src/arrow/ipc/memory.h +++ /dev/null @@ -1,150 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Public API for different interprocess memory sharing mechanisms - -#ifndef ARROW_IPC_MEMORY_H -#define ARROW_IPC_MEMORY_H - -#include -#include -#include - -#include "arrow/util/macros.h" -#include "arrow/util/visibility.h" - -namespace arrow { - -class Buffer; -class MutableBuffer; -class Status; - -namespace ipc { - -// Abstract output stream -class OutputStream { - public: - virtual ~OutputStream() {} - // Close the output stream - virtual Status Close() = 0; - - // The current position in the output stream - virtual int64_t Tell() const = 0; - - // Write bytes to the stream - virtual Status Write(const uint8_t* data, int64_t length) = 0; -}; - -// An output stream that writes to a MutableBuffer, such as one obtained from a -// memory map -class BufferOutputStream : public OutputStream { - public: - explicit BufferOutputStream(const std::shared_ptr& buffer) - : buffer_(buffer) {} - - // Implement the OutputStream interface - Status Close() override; - int64_t Tell() const override; - Status Write(const uint8_t* data, int64_t length) override; - - // Returns the number of bytes remaining in the buffer - int64_t bytes_remaining() const; - - private: - std::shared_ptr buffer_; - int64_t capacity_; - int64_t position_; -}; - -class ARROW_EXPORT MemorySource { - public: - // Indicates the access permissions of the memory source - enum AccessMode { READ_ONLY, READ_WRITE }; - - virtual ~MemorySource(); - - // Retrieve a buffer of memory from the source of the indicates size and at - // the indicated location - // @returns: arrow::Status indicating success / failure. The buffer is set - // into the *out argument - virtual Status ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) = 0; - - virtual Status Close() = 0; - - virtual Status Write(int64_t position, const uint8_t* data, int64_t nbytes) = 0; - - // @return: the size in bytes of the memory source - virtual int64_t Size() const = 0; - - protected: - explicit MemorySource(AccessMode access_mode = AccessMode::READ_WRITE); - - AccessMode access_mode_; - - private: - DISALLOW_COPY_AND_ASSIGN(MemorySource); -}; - -// A memory source that uses memory-mapped files for memory interactions -class ARROW_EXPORT MemoryMappedSource : public MemorySource { - public: - static Status Open(const std::string& path, AccessMode access_mode, - std::shared_ptr* out); - - Status Close() override; - - Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; - - Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override; - - // @return: the size in bytes of the memory source - int64_t Size() const override; - - private: - explicit MemoryMappedSource(AccessMode access_mode); - // Hide the internal details of this class for now - class Impl; - std::unique_ptr impl_; -}; - -// A MemorySource that tracks the size of allocations from a memory source -class MockMemorySource : public MemorySource { - public: - explicit MockMemorySource(int64_t size); - - Status Close() override; - - Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; - - Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override; - - int64_t Size() const override; - - // @return: the smallest number of bytes containing the modified region of the - // MockMemorySource - int64_t GetExtentBytesWritten() const; - - private: - int64_t size_; - int64_t extent_bytes_written_; -}; - -} // namespace ipc -} // namespace arrow - -#endif // ARROW_IPC_MEMORY_H diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 8cc902c2967..05e9c7ad4d3 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -220,9 +220,8 @@ static Status FieldToFlatbuffer( auto fb_children = fbb.CreateVector(children); // TODO: produce the list of VectorTypes - *offset = flatbuf::CreateField( - fbb, fb_name, field->nullable, type_enum, type_data, field->dictionary, - fb_children); + *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_data, + field->dictionary, fb_children); return Status::OK(); } @@ -295,8 +294,8 @@ Status WriteDataHeader(int32_t length, int64_t body_length, } Status MessageBuilder::Finish() { - auto message = flatbuf::CreateMessage(fbb_, kMetadataVersion, - header_type_, header_, body_length_); + auto message = + flatbuf::CreateMessage(fbb_, kMetadataVersion, header_type_, header_, body_length_); fbb_.Finish(message); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index db9a83f6a8d..d38df840ba0 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -38,7 +38,7 @@ class Status; namespace ipc { static constexpr flatbuf::MetadataVersion kMetadataVersion = - flatbuf::MetadataVersion_V1_SNAPSHOT; + flatbuf::MetadataVersion_V1_SNAPSHOT; Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr* out); diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 838a4a676ea..d5ec53317e6 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -23,6 +23,8 @@ #include #include +#include "arrow/util/visibility.h" + namespace arrow { class Buffer; @@ -36,6 +38,7 @@ namespace ipc { // Message read/write APIs // Serialize arrow::Schema as a Flatbuffer +ARROW_EXPORT Status WriteSchema(const Schema* schema, std::shared_ptr* out); //---------------------------------------------------------------------- @@ -47,7 +50,7 @@ Status WriteSchema(const Schema* schema, std::shared_ptr* out); class Message; // Container for serialized Schema metadata contained in an IPC message -class SchemaMessage { +class ARROW_EXPORT SchemaMessage { public: // Accepts an opaque flatbuffer pointer SchemaMessage(const std::shared_ptr& message, const void* schema); @@ -82,7 +85,7 @@ struct BufferMetadata { }; // Container for serialized record batch metadata contained in an IPC message -class RecordBatchMessage { +class ARROW_EXPORT RecordBatchMessage { public: // Accepts an opaque flatbuffer pointer RecordBatchMessage(const std::shared_ptr& message, const void* batch_meta); @@ -102,13 +105,13 @@ class RecordBatchMessage { std::unique_ptr impl_; }; -class DictionaryBatchMessage { +class ARROW_EXPORT DictionaryBatchMessage { public: int64_t id() const; std::unique_ptr data() const; }; -class Message : public std::enable_shared_from_this { +class ARROW_EXPORT Message : public std::enable_shared_from_this { public: enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH }; diff --git a/cpp/src/arrow/ipc/symbols.map b/cpp/src/arrow/ipc/symbols.map new file mode 100644 index 00000000000..b4ad98cd7f2 --- /dev/null +++ b/cpp/src/arrow/ipc/symbols.map @@ -0,0 +1,18 @@ +{ + # Symbols marked as 'local' are not exported by the DSO and thus may not + # be used by client applications. + local: + # devtoolset / static-libstdc++ symbols + __cxa_*; + + extern "C++" { + # boost + boost::*; + + # devtoolset or -static-libstdc++ - the Red Hat devtoolset statically + # links c++11 symbols into binaries so that the result may be executed on + # a system with an older libstdc++ which doesn't include the necessary + # c++11 symbols. + std::*; + }; +}; diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index e7dbb84d790..f6582fc883b 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -34,31 +34,6 @@ namespace arrow { namespace ipc { -class MemoryMapFixture { - public: - void TearDown() { - for (auto path : tmp_files_) { - std::remove(path.c_str()); - } - } - - void CreateFile(const std::string path, int64_t size) { - FILE* file = fopen(path.c_str(), "w"); - if (file != nullptr) { tmp_files_.push_back(path); } - ftruncate(fileno(file), size); - fclose(file); - } - - Status InitMemoryMap( - int64_t size, const std::string& path, std::shared_ptr* mmap) { - CreateFile(path, size); - return MemoryMappedSource::Open(path, MemorySource::READ_WRITE, mmap); - } - - private: - std::vector tmp_files_; -}; - Status MakeRandomInt32Array( int32_t length, bool include_nulls, MemoryPool* pool, std::shared_ptr* array) { std::shared_ptr data; diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h new file mode 100644 index 00000000000..3f4001b21a9 --- /dev/null +++ b/cpp/src/arrow/ipc/util.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_IPC_UTIL_H +#define ARROW_IPC_UTIL_H + +#include + +#include "arrow/array.h" +#include "arrow/io/interfaces.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace ipc { + +// A helper class to tracks the size of allocations +class MockOutputStream : public io::OutputStream { + public: + MockOutputStream() : extent_bytes_written_(0) {} + + Status Close() override { return Status::OK(); } + + Status Write(const uint8_t* data, int64_t nbytes) override { + extent_bytes_written_ += nbytes; + return Status::OK(); + } + + Status Tell(int64_t* position) override { + *position = extent_bytes_written_; + return Status::OK(); + } + + int64_t GetExtentBytesWritten() const { return extent_bytes_written_; } + + private: + int64_t extent_bytes_written_; +}; + +} // namespace ipc +} // namespace arrow + +#endif // ARROW_IPC_UTIL_H diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt index f2a90b71a49..c400e14ea47 100644 --- a/cpp/src/arrow/parquet/CMakeLists.txt +++ b/cpp/src/arrow/parquet/CMakeLists.txt @@ -27,6 +27,7 @@ set(PARQUET_SRCS set(PARQUET_LIBS arrow_shared + arrow_io parquet_shared ) diff --git a/cpp/src/arrow/parquet/io.cc b/cpp/src/arrow/parquet/io.cc index b6fdd67d15b..a50d753f305 100644 --- a/cpp/src/arrow/parquet/io.cc +++ b/cpp/src/arrow/parquet/io.cc @@ -27,7 +27,7 @@ #include "arrow/util/status.h" // To assist with readability -using ArrowROFile = arrow::io::RandomAccessFile; +using ArrowROFile = arrow::io::ReadableFileInterface; namespace arrow { namespace parquet { @@ -58,7 +58,7 @@ void ParquetAllocator::Free(uint8_t* buffer, int64_t size) { ParquetReadSource::ParquetReadSource(ParquetAllocator* allocator) : file_(nullptr), allocator_(allocator) {} -Status ParquetReadSource::Open(const std::shared_ptr& file) { +Status ParquetReadSource::Open(const std::shared_ptr& file) { int64_t file_size; RETURN_NOT_OK(file->GetSize(&file_size)); diff --git a/cpp/src/arrow/parquet/io.h b/cpp/src/arrow/parquet/io.h index 1c59695c6c1..1734863acf1 100644 --- a/cpp/src/arrow/parquet/io.h +++ b/cpp/src/arrow/parquet/io.h @@ -62,7 +62,7 @@ class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource { explicit ParquetReadSource(ParquetAllocator* allocator); // We need to ask for the file size on opening the file, and this can fail - Status Open(const std::shared_ptr& file); + Status Open(const std::shared_ptr& file); void Close() override; int64_t Tell() const override; @@ -72,7 +72,7 @@ class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource { private: // An Arrow readable file of some kind - std::shared_ptr file_; + std::shared_ptr file_; // The allocator is required for creating managed buffers ParquetAllocator* allocator_; diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index 6615457c483..208b3e867d3 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -22,6 +22,7 @@ #include "gtest/gtest.h" +#include "arrow/io/memory.h" #include "arrow/parquet/io.h" #include "arrow/test-util.h" #include "arrow/util/memory-pool.h" @@ -96,61 +97,13 @@ TEST(TestParquetAllocator, CustomPool) { // ---------------------------------------------------------------------- // Read source tests -class BufferReader : public io::RandomAccessFile { - public: - BufferReader(const uint8_t* buffer, int buffer_size) - : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} - - Status Close() override { - // no-op - return Status::OK(); - } - - Status Tell(int64_t* position) override { - *position = position_; - return Status::OK(); - } - - Status ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override { - RETURN_NOT_OK(Seek(position)); - return Read(nbytes, bytes_read, buffer); - } - - Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override { - memcpy(buffer, buffer_ + position_, nbytes); - *bytes_read = std::min(nbytes, buffer_size_ - position_); - position_ += *bytes_read; - return Status::OK(); - } - - Status GetSize(int64_t* size) override { - *size = buffer_size_; - return Status::OK(); - } - - Status Seek(int64_t position) override { - if (position < 0 || position >= buffer_size_) { - return Status::IOError("position out of bounds"); - } - - position_ = position; - return Status::OK(); - } - - private: - const uint8_t* buffer_; - int buffer_size_; - int64_t position_; -}; - TEST(TestParquetReadSource, Basics) { std::string data = "this is the data"; auto data_buffer = reinterpret_cast(data.c_str()); ParquetAllocator allocator(default_memory_pool()); - auto file = std::make_shared(data_buffer, data.size()); + auto file = std::make_shared(data_buffer, data.size()); auto source = std::make_shared(&allocator); ASSERT_OK(source->Open(file)); diff --git a/cpp/src/arrow/parquet/parquet-schema-test.cc b/cpp/src/arrow/parquet/parquet-schema-test.cc index a2bcd3e05c3..63ad8fba465 100644 --- a/cpp/src/arrow/parquet/parquet-schema-test.cc +++ b/cpp/src/arrow/parquet/parquet-schema-test.cc @@ -178,8 +178,7 @@ class TestConvertArrowSchema : public ::testing::Test { NodePtr schema_node = GroupNode::Make("schema", Repetition::REPEATED, nodes); const GroupNode* expected_schema_node = static_cast(schema_node.get()); - const GroupNode* result_schema_node = - static_cast(result_schema_->schema().get()); + const GroupNode* result_schema_node = result_schema_->group_node(); ASSERT_EQ(expected_schema_node->field_count(), result_schema_node->field_count()); diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 440ec84e2c7..0c2fc6e8fc7 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -149,7 +149,7 @@ bool FileReader::Impl::CheckForFlatColumn(const ::parquet::ColumnDescriptor* des } Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr* out) { - const ::parquet::SchemaDescriptor* schema = reader_->metadata()->schema_descriptor(); + const ::parquet::SchemaDescriptor* schema = reader_->metadata()->schema(); if (!CheckForFlatColumn(schema->Column(i))) { return Status::Invalid("The requested column is not flat"); @@ -167,9 +167,9 @@ Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr* out) { } Status FileReader::Impl::ReadFlatTable(std::shared_ptr* table) { - auto descr = reader_->metadata()->schema_descriptor(); + auto descr = reader_->metadata()->schema(); - const std::string& name = descr->schema()->name(); + const std::string& name = descr->name(); std::shared_ptr schema; RETURN_NOT_OK(FromParquetSchema(descr, &schema)); @@ -193,7 +193,7 @@ FileReader::FileReader( FileReader::~FileReader() {} // Static ctor -Status OpenFile(const std::shared_ptr& file, +Status OpenFile(const std::shared_ptr& file, ParquetAllocator* allocator, std::unique_ptr* reader) { std::unique_ptr source(new ParquetReadSource(allocator)); RETURN_NOT_OK(source->Open(file)); diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h index f1492f64521..a9c64eca997 100644 --- a/cpp/src/arrow/parquet/reader.h +++ b/cpp/src/arrow/parquet/reader.h @@ -137,7 +137,7 @@ class ARROW_EXPORT FlatColumnReader { // Helper function to create a file reader from an implementation of an Arrow // readable file ARROW_EXPORT -Status OpenFile(const std::shared_ptr& file, +Status OpenFile(const std::shared_ptr& file, ParquetAllocator* allocator, std::unique_ptr* reader); } // namespace parquet diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc index cd91df32271..ff32e51bacd 100644 --- a/cpp/src/arrow/parquet/schema.cc +++ b/cpp/src/arrow/parquet/schema.cc @@ -202,7 +202,7 @@ Status FromParquetSchema( // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes // from the root Parquet node const GroupNode* schema_node = - static_cast(parquet_schema->schema().get()); + static_cast(parquet_schema->group_node()); std::vector> fields(schema_node->field_count()); for (int i = 0; i < schema_node->field_count(); i++) { diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc index ddee573fa1e..2b47f1461c9 100644 --- a/cpp/src/arrow/parquet/writer.cc +++ b/cpp/src/arrow/parquet/writer.cc @@ -334,7 +334,7 @@ Status WriteFlatTable(const Table* table, MemoryPool* pool, std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema; RETURN_NOT_OK( ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema)); - auto schema_node = std::static_pointer_cast(parquet_schema->schema()); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); std::unique_ptr parquet_writer = ParquetFileWriter::Open(sink, schema_node, properties); FileWriter writer(pool, std::move(parquet_writer)); diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 02677d5e18b..b4c3721a728 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -149,7 +149,7 @@ struct ARROW_EXPORT Field { int64_t dictionary; Field(const std::string& name, const TypePtr& type, bool nullable = true, - int64_t dictionary = 0) + int64_t dictionary = 0) : name(name), type(type), nullable(nullable), dictionary(dictionary) {} bool operator==(const Field& other) const { return this->Equals(other); } @@ -159,7 +159,7 @@ struct ARROW_EXPORT Field { bool Equals(const Field& other) const { return (this == &other) || (this->name == other.name && this->nullable == other.nullable && - this->dictionary == dictionary && this->type->Equals(other.type.get())); + this->dictionary == dictionary && this->type->Equals(other.type.get())); } bool Equals(const std::shared_ptr& other) const { return Equals(*other.get()); } diff --git a/cpp/src/arrow/util/memory-pool-test.cc b/cpp/src/arrow/util/memory-pool-test.cc index e767e955524..5d60376f794 100644 --- a/cpp/src/arrow/util/memory-pool-test.cc +++ b/cpp/src/arrow/util/memory-pool-test.cc @@ -64,6 +64,6 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) { pool->Free(data, 100); } -#endif // ARROW_VALGRIND +#endif // ARROW_VALGRIND } // namespace arrow diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd index 734ace6c923..f338a436814 100644 --- a/python/pyarrow/includes/libarrow_io.pxd +++ b/python/pyarrow/includes/libarrow_io.pxd @@ -29,25 +29,41 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: ObjectType_FILE" arrow::io::ObjectType::FILE" ObjectType_DIRECTORY" arrow::io::ObjectType::DIRECTORY" - cdef cppclass FileBase: + cdef cppclass FileInterface: CStatus Close() CStatus Tell(int64_t* position) + FileMode mode() - cdef cppclass ReadableFile(FileBase): + cdef cppclass Readable: + CStatus Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) + + cdef cppclass Seekable: + CStatus Seek(int64_t position) + + cdef cppclass Writeable: + CStatus Write(const uint8_t* data, int64_t nbytes) + + cdef cppclass OutputStream(FileInterface, Writeable): + pass + + cdef cppclass InputStream(FileInterface, Readable): + pass + + cdef cppclass ReadableFileInterface(InputStream, Seekable): CStatus GetSize(int64_t* size) - CStatus Read(int64_t nbytes, int64_t* bytes_read, - uint8_t* buffer) CStatus ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) + CStatus ReadAt(int64_t position, int64_t nbytes, + int64_t* bytes_read, shared_ptr[Buffer]* out) - cdef cppclass RandomAccessFile(ReadableFile): - CStatus Seek(int64_t position) + cdef cppclass WriteableFileInterface(OutputStream, Seekable): + CStatus WriteAt(int64_t position, const uint8_t* data, + int64_t nbytes) - cdef cppclass WriteableFile(FileBase): - CStatus Write(const uint8_t* buffer, int64_t nbytes) - # CStatus Write(const uint8_t* buffer, int64_t nbytes, - # int64_t* bytes_written) + cdef cppclass ReadWriteFileInterface(ReadableFileInterface, + WriteableFileInterface): + pass cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: @@ -70,10 +86,10 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: int64_t block_size int16_t permissions - cdef cppclass HdfsReadableFile(RandomAccessFile): + cdef cppclass HdfsReadableFile(ReadableFileInterface): pass - cdef cppclass HdfsWriteableFile(WriteableFile): + cdef cppclass HdfsOutputStream(OutputStream): pass cdef cppclass CHdfsClient" arrow::io::HdfsClient": @@ -103,4 +119,4 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: CStatus OpenWriteable(const c_string& path, c_bool append, int32_t buffer_size, int16_t replication, int64_t default_block_size, - shared_ptr[HdfsWriteableFile]* handle) + shared_ptr[HdfsOutputStream]* handle) diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index fe24f593e32..f932a931493 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -19,7 +19,7 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool -from pyarrow.includes.libarrow_io cimport RandomAccessFile +from pyarrow.includes.libarrow_io cimport ReadableFileInterface cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: @@ -78,10 +78,10 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: unique_ptr[ParquetFileReader] OpenFile(const c_string& path) cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: - cdef cppclass OutputStream: + cdef cppclass ParquetOutputStream" parquet::OutputStream": pass - cdef cppclass LocalFileOutputStream(OutputStream): + cdef cppclass LocalFileOutputStream(ParquetOutputStream): LocalFileOutputStream(const c_string& path) void Close() @@ -100,11 +100,11 @@ cdef extern from "arrow/parquet/io.h" namespace "arrow::parquet" nogil: cdef cppclass ParquetReadSource: ParquetReadSource(ParquetAllocator* allocator) - Open(const shared_ptr[RandomAccessFile]& file) + Open(const shared_ptr[ReadableFileInterface]& file) cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil: - CStatus OpenFile(const shared_ptr[RandomAccessFile]& file, + CStatus OpenFile(const shared_ptr[ReadableFileInterface]& file, ParquetAllocator* allocator, unique_ptr[FileReader]* reader) @@ -121,6 +121,8 @@ cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil: cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil: - cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, - const shared_ptr[OutputStream]& sink, int64_t chunk_size, - const shared_ptr[WriterProperties]& properties) + cdef CStatus WriteFlatTable( + const CTable* table, MemoryPool* pool, + const shared_ptr[ParquetOutputStream]& sink, + int64_t chunk_size, + const shared_ptr[WriterProperties]& properties) diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd index b92af72704a..f55fc0ab53a 100644 --- a/python/pyarrow/io.pxd +++ b/python/pyarrow/io.pxd @@ -19,7 +19,8 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * -from pyarrow.includes.libarrow_io cimport RandomAccessFile, WriteableFile +from pyarrow.includes.libarrow_io cimport (ReadableFileInterface, + OutputStream) cdef class NativeFileInterface: @@ -28,5 +29,5 @@ cdef class NativeFileInterface: # extension classes are technically virtual in the C++ sense)m we can # expose the arrow::io abstract file interfaces to other components # throughout the suite of Arrow C++ libraries - cdef read_handle(self, shared_ptr[RandomAccessFile]* file) - cdef write_handle(self, shared_ptr[WriteableFile]* file) + cdef read_handle(self, shared_ptr[ReadableFileInterface]* file) + cdef write_handle(self, shared_ptr[OutputStream]* file) diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index b8bf8835620..f2eee260c33 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -316,16 +316,16 @@ cdef class HdfsClient: cdef class NativeFileInterface: - cdef read_handle(self, shared_ptr[RandomAccessFile]* file): + cdef read_handle(self, shared_ptr[ReadableFileInterface]* file): raise NotImplementedError - cdef write_handle(self, shared_ptr[WriteableFile]* file): + cdef write_handle(self, shared_ptr[OutputStream]* file): raise NotImplementedError cdef class HdfsFile(NativeFileInterface): cdef: shared_ptr[HdfsReadableFile] rd_file - shared_ptr[HdfsWriteableFile] wr_file + shared_ptr[HdfsOutputStream] wr_file bint is_readonly bint is_open object parent @@ -364,13 +364,13 @@ cdef class HdfsFile(NativeFileInterface): if self.is_readonly: raise IOError("only valid on writeonly files") - cdef read_handle(self, shared_ptr[RandomAccessFile]* file): + cdef read_handle(self, shared_ptr[ReadableFileInterface]* file): self._assert_readable() - file[0] = self.rd_file + file[0] = self.rd_file - cdef write_handle(self, shared_ptr[WriteableFile]* file): + cdef write_handle(self, shared_ptr[OutputStream]* file): self._assert_writeable() - file[0] = self.wr_file + file[0] = self.wr_file def size(self): cdef int64_t size diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index ebba1a17ac7..fb36b2967c0 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -21,7 +21,7 @@ from pyarrow.includes.libarrow cimport * from pyarrow.includes.parquet cimport * -from pyarrow.includes.libarrow_io cimport RandomAccessFile, WriteableFile +from pyarrow.includes.libarrow_io cimport ReadableFileInterface cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.compat import tobytes @@ -55,7 +55,7 @@ cdef class ParquetReader: ParquetFileReader.OpenFile(path))) cdef open_native_file(self, NativeFileInterface file): - cdef shared_ptr[RandomAccessFile] cpp_handle + cdef shared_ptr[ReadableFileInterface] cpp_handle file.read_handle(&cpp_handle) check_cstatus(OpenFile(cpp_handle, &self.allocator, &self.reader)) @@ -105,7 +105,7 @@ def write_table(table, filename, chunk_size=None, version=None): """ cdef Table table_ = table cdef CTable* ctable_ = table_.table - cdef shared_ptr[OutputStream] sink + cdef shared_ptr[ParquetOutputStream] sink cdef WriterProperties.Builder properties_builder cdef int64_t chunk_size_ = 0 if chunk_size is None: