From 2952cfb48a08491debac3b46a0c2c6ec2b84a816 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 21 Aug 2017 22:20:45 -0400 Subject: [PATCH 1/2] Add SerializeRecordBatch API, various API scrubbing, make some integer arguments const Change-Id: I550d70f4fe30e9f3e8050889c409ca5293708477 --- cpp/src/arrow/buffer.cc | 25 ++++----- cpp/src/arrow/buffer.h | 53 +++++++++++------- cpp/src/arrow/ipc/ipc-read-write-test.cc | 29 +++------- cpp/src/arrow/ipc/metadata.cc | 11 ++-- cpp/src/arrow/ipc/metadata.h | 23 ++++---- cpp/src/arrow/ipc/reader.h | 32 +++++------ cpp/src/arrow/ipc/writer.cc | 28 ++++++++++ cpp/src/arrow/ipc/writer.h | 68 +++++++++++++++++------- cpp/src/arrow/python/python_to_arrow.cc | 10 ++-- 9 files changed, 167 insertions(+), 112 deletions(-) diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc index b9c5897f8a2..9e8eb8bd4b6 100644 --- a/cpp/src/arrow/buffer.cc +++ b/cpp/src/arrow/buffer.cc @@ -27,7 +27,7 @@ namespace arrow { -Status Buffer::Copy(int64_t start, int64_t nbytes, MemoryPool* pool, +Status Buffer::Copy(const int64_t start, const int64_t nbytes, MemoryPool* pool, std::shared_ptr* out) const { // Sanity checks DCHECK_LT(start, size_); @@ -42,11 +42,12 @@ Status Buffer::Copy(int64_t start, int64_t nbytes, MemoryPool* pool, return Status::OK(); } -Status Buffer::Copy(int64_t start, int64_t nbytes, std::shared_ptr* out) const { +Status Buffer::Copy(const int64_t start, const int64_t nbytes, + std::shared_ptr* out) const { return Copy(start, nbytes, default_memory_pool(), out); } -bool Buffer::Equals(const Buffer& other, int64_t nbytes) const { +bool Buffer::Equals(const Buffer& other, const int64_t nbytes) const { return this == &other || (size_ >= nbytes && other.size_ >= nbytes && (data_ == other.data_ || !memcmp(data_, other.data_, static_cast(nbytes)))); @@ -71,10 +72,10 @@ PoolBuffer::~PoolBuffer() { } } -Status PoolBuffer::Reserve(int64_t new_capacity) { - if (!mutable_data_ || new_capacity > capacity_) { +Status PoolBuffer::Reserve(const int64_t capacity) { + if (!mutable_data_ || capacity > capacity_) { uint8_t* new_data; - new_capacity = BitUtil::RoundUpToMultipleOf64(new_capacity); + int64_t new_capacity = BitUtil::RoundUpToMultipleOf64(capacity); if (mutable_data_) { RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &mutable_data_)); } else { @@ -87,7 +88,7 @@ Status PoolBuffer::Reserve(int64_t new_capacity) { return Status::OK(); } -Status PoolBuffer::Resize(int64_t new_size, bool shrink_to_fit) { +Status PoolBuffer::Resize(const int64_t new_size, bool shrink_to_fit) { if (!shrink_to_fit || (new_size > size_)) { RETURN_NOT_OK(Reserve(new_size)); } else { @@ -113,18 +114,18 @@ Status PoolBuffer::Resize(int64_t new_size, bool shrink_to_fit) { } std::shared_ptr SliceMutableBuffer(const std::shared_ptr& buffer, - int64_t offset, int64_t length) { + const int64_t offset, const int64_t length) { return std::make_shared(buffer, offset, length); } -MutableBuffer::MutableBuffer(const std::shared_ptr& parent, int64_t offset, - int64_t size) +MutableBuffer::MutableBuffer(const std::shared_ptr& parent, const int64_t offset, + const int64_t size) : MutableBuffer(parent->mutable_data() + offset, size) { DCHECK(parent->is_mutable()) << "Must pass mutable buffer"; parent_ = parent; } -Status AllocateBuffer(MemoryPool* pool, int64_t size, +Status AllocateBuffer(MemoryPool* pool, const int64_t size, std::shared_ptr* out) { auto buffer = std::make_shared(pool); RETURN_NOT_OK(buffer->Resize(size)); @@ -132,7 +133,7 @@ Status AllocateBuffer(MemoryPool* pool, int64_t size, return Status::OK(); } -Status AllocateResizableBuffer(MemoryPool* pool, int64_t size, +Status AllocateResizableBuffer(MemoryPool* pool, const int64_t size, std::shared_ptr* out) { auto buffer = std::make_shared(pool); RETURN_NOT_OK(buffer->Resize(size)); diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 5d050b77f77..f8f0b3df475 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -59,7 +59,7 @@ class ARROW_EXPORT Buffer { /// This method makes no assertions about alignment or padding of the buffer but /// in general we expected buffers to be aligned and padded to 64 bytes. In the future /// we might add utility methods to help determine if a buffer satisfies this contract. - Buffer(const std::shared_ptr& parent, int64_t offset, int64_t size) + Buffer(const std::shared_ptr& parent, const int64_t offset, const int64_t size) : Buffer(parent->data() + offset, size) { parent_ = parent; } @@ -72,11 +72,12 @@ class ARROW_EXPORT Buffer { bool Equals(const Buffer& other) const; /// Copy a section of the buffer into a new Buffer. - Status Copy(int64_t start, int64_t nbytes, MemoryPool* pool, + Status Copy(const int64_t start, const int64_t nbytes, MemoryPool* pool, std::shared_ptr* out) const; /// Copy a section of the buffer using the default memory pool into a new Buffer. - Status Copy(int64_t start, int64_t nbytes, std::shared_ptr* out) const; + Status Copy(const int64_t start, const int64_t nbytes, + std::shared_ptr* out) const; int64_t capacity() const { return capacity_; } const uint8_t* data() const { return data_; } @@ -114,24 +115,27 @@ static inline std::shared_ptr GetBufferFromString(const std::string& str /// Construct a view on passed buffer at the indicated offset and length. This /// function cannot fail and does not error checking (except in debug builds) static inline std::shared_ptr SliceBuffer(const std::shared_ptr& buffer, - int64_t offset, int64_t length) { + const int64_t offset, + const int64_t length) { return std::make_shared(buffer, offset, length); } /// Construct a mutable buffer slice. If the parent buffer is not mutable, this /// will abort in debug builds -std::shared_ptr ARROW_EXPORT -SliceMutableBuffer(const std::shared_ptr& buffer, int64_t offset, int64_t length); +ARROW_EXPORT +std::shared_ptr SliceMutableBuffer(const std::shared_ptr& buffer, + const int64_t offset, const int64_t length); /// A Buffer whose contents can be mutated. May or may not own its data. class ARROW_EXPORT MutableBuffer : public Buffer { public: - MutableBuffer(uint8_t* data, int64_t size) : Buffer(data, size) { + MutableBuffer(uint8_t* data, const int64_t size) : Buffer(data, size) { mutable_data_ = data; is_mutable_ = true; } - MutableBuffer(const std::shared_ptr& parent, int64_t offset, int64_t size); + MutableBuffer(const std::shared_ptr& parent, const int64_t offset, + const int64_t size); protected: MutableBuffer() : Buffer(nullptr, 0) {} @@ -145,20 +149,20 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer { /// /// @param shrink_to_fit On deactivating this option, the capacity of the Buffer won't /// decrease. - virtual Status Resize(int64_t new_size, bool shrink_to_fit = true) = 0; + virtual Status Resize(const int64_t new_size, bool shrink_to_fit = true) = 0; /// Ensure that buffer has enough memory allocated to fit the indicated /// capacity (and meets the 64 byte padding requirement in Layout.md). /// It does not change buffer's reported size. - virtual Status Reserve(int64_t new_capacity) = 0; + virtual Status Reserve(const int64_t new_capacity) = 0; template - Status TypedResize(int64_t new_nb_elements, bool shrink_to_fit = true) { + Status TypedResize(const int64_t new_nb_elements, bool shrink_to_fit = true) { return Resize(sizeof(T) * new_nb_elements, shrink_to_fit); } template - Status TypedReserve(int64_t new_nb_elements) { + Status TypedReserve(const int64_t new_nb_elements) { return Reserve(sizeof(T) * new_nb_elements); } @@ -172,8 +176,8 @@ class ARROW_EXPORT PoolBuffer : public ResizableBuffer { explicit PoolBuffer(MemoryPool* pool = nullptr); virtual ~PoolBuffer(); - Status Resize(int64_t new_size, bool shrink_to_fit = true) override; - Status Reserve(int64_t new_capacity) override; + Status Resize(const int64_t new_size, bool shrink_to_fit = true) override; + Status Reserve(const int64_t new_capacity) override; private: MemoryPool* pool_; @@ -185,7 +189,7 @@ class ARROW_EXPORT BufferBuilder { : pool_(pool), data_(nullptr), capacity_(0), size_(0) {} /// Resizes the buffer to the nearest multiple of 64 bytes per Layout.md - Status Resize(int64_t elements) { + Status Resize(const int64_t elements) { // Resize(0) is a no-op if (elements == 0) { return Status::OK(); @@ -213,7 +217,7 @@ class ARROW_EXPORT BufferBuilder { } // Advance pointer and zero out memory - Status Advance(int64_t length) { + Status Advance(const int64_t length) { if (capacity_ < length + size_) { int64_t new_capacity = BitUtil::NextPower2(length + size_); RETURN_NOT_OK(Resize(new_capacity)); @@ -299,11 +303,20 @@ class ARROW_EXPORT TypedBufferBuilder : public BufferBuilder { /// \param[out] out the allocated buffer with padding /// /// \return Status message -Status ARROW_EXPORT AllocateBuffer(MemoryPool* pool, int64_t size, - std::shared_ptr* out); +ARROW_EXPORT +Status AllocateBuffer(MemoryPool* pool, const int64_t size, + std::shared_ptr* out); -Status ARROW_EXPORT AllocateResizableBuffer(MemoryPool* pool, int64_t size, - std::shared_ptr* out); +/// Allocate resizeable buffer from a memory pool +/// +/// \param[in] pool a memory pool +/// \param[in] size size of buffer to allocate +/// \param[out] out the allocated buffer +/// +/// \return Status message +ARROW_EXPORT +Status AllocateResizableBuffer(MemoryPool* pool, const int64_t size, + std::shared_ptr* out); } // namespace arrow diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index 045296163ea..5aaff5f5691 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -137,27 +137,13 @@ static int g_file_number = 0; class IpcTestFixture : public io::MemoryMapFixture { public: - Status DoStandardRoundTrip(const RecordBatch& batch, bool zero_data, + Status DoStandardRoundTrip(const RecordBatch& batch, std::shared_ptr* batch_result) { - int32_t metadata_length; - int64_t body_length; - - const int64_t buffer_offset = 0; - - if (zero_data) { - RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); - } - RETURN_NOT_OK(mmap_->Seek(0)); - - RETURN_NOT_OK(WriteRecordBatch(batch, buffer_offset, mmap_.get(), &metadata_length, - &body_length, pool_)); + std::shared_ptr serialized_batch; + RETURN_NOT_OK(SerializeRecordBatch(batch, pool_, &serialized_batch)); - std::unique_ptr message; - RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); - - io::BufferReader buffer_reader(message->body()); - return ReadRecordBatch(*message->metadata(), batch.schema(), &buffer_reader, - batch_result); + io::BufferReader buf_reader(serialized_batch); + return ReadRecordBatch(batch.schema(), 0, &buf_reader, batch_result); } Status DoLargeRoundTrip(const RecordBatch& batch, bool zero_data, @@ -197,7 +183,7 @@ class IpcTestFixture : public io::MemoryMapFixture { ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(buffer_size, ss.str(), &mmap_)); std::shared_ptr result; - ASSERT_OK(DoStandardRoundTrip(batch, true, &result)); + ASSERT_OK(DoStandardRoundTrip(batch, &result)); CheckReadResult(*result, batch); ASSERT_OK(DoLargeRoundTrip(batch, true, &result)); @@ -657,9 +643,6 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) { CheckReadResult(*result, batch); ASSERT_EQ(length, result->num_rows()); - - // Fails if we try to write this with the normal code path - ASSERT_RAISES(Invalid, DoStandardRoundTrip(batch, false, &result)); } void CheckBatchDictionaries(const RecordBatch& batch) { diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index c9534217e92..a31b9668da3 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -703,7 +703,7 @@ static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length, return Status::OK(); } -Status WriteRecordBatchMessage(int64_t length, int64_t body_length, +Status WriteRecordBatchMessage(const int64_t length, const int64_t body_length, const std::vector& nodes, const std::vector& buffers, std::shared_ptr* out) { @@ -714,7 +714,7 @@ Status WriteRecordBatchMessage(int64_t length, int64_t body_length, body_length, out); } -Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset, +Status WriteTensorMessage(const Tensor& tensor, const int64_t buffer_start_offset, std::shared_ptr* out) { using TensorDimOffset = flatbuffers::Offset; using TensorOffset = flatbuffers::Offset; @@ -743,7 +743,8 @@ Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset, body_length, out); } -Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length, +Status WriteDictionaryMessage(const int64_t id, const int64_t length, + const int64_t body_length, const std::vector& nodes, const std::vector& buffers, std::shared_ptr* out) { @@ -1106,8 +1107,8 @@ static Status ReadFullMessage(const std::shared_ptr& metadata, return Message::Open(metadata, body, message); } -Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, - std::unique_ptr* message) { +Status ReadMessage(const int64_t offset, const int32_t metadata_length, + io::RandomAccessFile* file, std::unique_ptr* message) { std::shared_ptr buffer; RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer)); diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 90e4defd6a3..81716ae467b 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -244,9 +244,9 @@ class ARROW_EXPORT InputStreamMessageReader : public MessageReader { /// \param[in] file the seekable file interface to read from /// \param[out] message the message read /// \return Status success or failure -Status ARROW_EXPORT ReadMessage(int64_t offset, int32_t metadata_length, - io::RandomAccessFile* file, - std::unique_ptr* message); +ARROW_EXPORT +Status ReadMessage(const int64_t offset, const int32_t metadata_length, + io::RandomAccessFile* file, std::unique_ptr* message); /// \brief Read encapulated RPC message (metadata and body) from InputStream /// @@ -274,15 +274,18 @@ Status ARROW_EXPORT WriteSchemaMessage(const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr* out); -Status ARROW_EXPORT WriteRecordBatchMessage(int64_t length, int64_t body_length, - const std::vector& nodes, - const std::vector& buffers, - std::shared_ptr* out); +ARROW_EXPORT +Status WriteRecordBatchMessage(const int64_t length, const int64_t body_length, + const std::vector& nodes, + const std::vector& buffers, + std::shared_ptr* out); -Status ARROW_EXPORT WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset, - std::shared_ptr* out); +ARROW_EXPORT +Status WriteTensorMessage(const Tensor& tensor, const int64_t buffer_start_offset, + std::shared_ptr* out); -Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length, +Status WriteDictionaryMessage(const int64_t id, const int64_t length, + const int64_t body_length, const std::vector& nodes, const std::vector& buffers, std::shared_ptr* out); diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index c0d3fb1f185..6f191ba437c 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -148,10 +148,9 @@ class ARROW_EXPORT RecordBatchFileReader { /// \param(in) schema the record batch schema /// \param(in) file a random access file /// \param(out) out the read record batch -Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata, - const std::shared_ptr& schema, - io::RandomAccessFile* file, - std::shared_ptr* out); +ARROW_EXPORT +Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr& schema, + io::RandomAccessFile* file, std::shared_ptr* out); /// \brief Read record batch from fully encapulated Message /// @@ -159,9 +158,9 @@ Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata, /// \param[in] schema /// \param[out] out the resulting RecordBatch /// \return Status -Status ARROW_EXPORT ReadRecordBatch(const Message& message, - const std::shared_ptr& schema, - std::shared_ptr* out); +ARROW_EXPORT +Status ReadRecordBatch(const Message& message, const std::shared_ptr& schema, + std::shared_ptr* out); /// Read record batch from file given metadata and schema /// @@ -170,10 +169,10 @@ Status ARROW_EXPORT ReadRecordBatch(const Message& message, /// \param(in) file a random access file /// \param(in) max_recursion_depth the maximum permitted nesting depth /// \param(out) out the read record batch -Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata, - const std::shared_ptr& schema, - int max_recursion_depth, io::RandomAccessFile* file, - std::shared_ptr* out); +ARROW_EXPORT +Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr& schema, + int max_recursion_depth, io::RandomAccessFile* file, + std::shared_ptr* out); /// Read record batch as encapsulated IPC message with metadata size prefix and /// header @@ -182,17 +181,18 @@ Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata, /// \param(in) offset the file location of the start of the message /// \param(in) file the file where the batch is located /// \param(out) out the read record batch -Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr& schema, int64_t offset, - io::RandomAccessFile* file, - std::shared_ptr* out); +ARROW_EXPORT +Status ReadRecordBatch(const std::shared_ptr& schema, int64_t offset, + io::RandomAccessFile* file, std::shared_ptr* out); /// EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file /// /// \param(in) offset the file location of the start of the message /// \param(in) file the file where the batch is located /// \param(out) out the read tensor -Status ARROW_EXPORT ReadTensor(int64_t offset, io::RandomAccessFile* file, - std::shared_ptr* out); +ARROW_EXPORT +Status ReadTensor(int64_t offset, io::RandomAccessFile* file, + std::shared_ptr* out); /// Backwards-compatibility for Arrow < 0.4.0 /// diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index bc07dc659f6..2fd83461e36 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -542,6 +542,34 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, return writer.Write(batch, dst, metadata_length, body_length); } +Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, + std::shared_ptr* out) { + int64_t size = 0; + RETURN_NOT_OK(GetRecordBatchSize(batch, &size)); + std::shared_ptr buffer; + RETURN_NOT_OK(AllocateBuffer(pool, size, &buffer)); + + io::FixedSizeBufferWriter stream(buffer); + int32_t metadata_length = 0; + int64_t body_length = 0; + RETURN_NOT_OK(WriteRecordBatch(batch, 0, &stream, &metadata_length, &body_length, pool, + kMaxNestingDepth, true)); + *out = buffer; + return Status::OK(); +} + +Status WriteRecordBatchStream(const std::vector>& batches, + io::OutputStream* dst) { + std::shared_ptr writer; + RETURN_NOT_OK(RecordBatchStreamWriter::Open(dst, batches[0]->schema(), &writer)); + for (const auto& batch : batches) { + // allow sizes > INT32_MAX + RETURN_NOT_OK(writer->WriteRecordBatch(*batch, true)); + } + RETURN_NOT_OK(writer->Close()); + return Status::OK(); +} + Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) { diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index c28dfe0afbb..1afc4f49d2c 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -21,6 +21,7 @@ #define ARROW_IPC_WRITER_H #include +#include #include #include @@ -144,38 +145,67 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter { /// including padding to a 64-byte boundary /// \param(out) body_length: the size of the contiguous buffer block plus /// padding bytes -Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch, - int64_t buffer_start_offset, io::OutputStream* dst, - int32_t* metadata_length, int64_t* body_length, - MemoryPool* pool, - int max_recursion_depth = kMaxNestingDepth, - bool allow_64bit = false); - -// Write Array as a DictionaryBatch message +/// \return Status +/// +/// Low-level API +ARROW_EXPORT +Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, + io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length, MemoryPool* pool, + int max_recursion_depth = kMaxNestingDepth, + bool allow_64bit = false); + +/// \brief Write dictionary message to output stream +/// +/// Low-level API +ARROW_EXPORT Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr& dictionary, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, MemoryPool* pool); +/// \brief Serialize record batch as encapsulated IPC message in a new buffer +/// +/// \param[in] batch the record batch +/// \param[in] pool a MemoryPool to allocate memory from +/// \param[out] out the serialized message +/// \return Status +ARROW_EXPORT +Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, + std::shared_ptr* out); + +/// \brief Write multiple record batches to OutputStream +/// \param[in] batches a vector of batches. Must all have same schema +/// \param[out] dst an OutputStream +ARROW_EXPORT +Status WriteRecordBatchStream(const std::vector>& batches, + io::OutputStream* dst); + // Compute the precise number of bytes needed in a contiguous memory segment to // write the record batch. This involves generating the complete serialized // Flatbuffers metadata. -Status ARROW_EXPORT GetRecordBatchSize(const RecordBatch& batch, int64_t* size); +ARROW_EXPORT +Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size); // Compute the precise number of bytes needed in a contiguous memory segment to // write the tensor including metadata, padding, and data -Status ARROW_EXPORT GetTensorSize(const Tensor& tensor, int64_t* size); - -/// EXPERIMENTAL: Write RecordBatch allowing lengths over INT32_MAX. This data -/// may not be readable by all Arrow implementations -Status ARROW_EXPORT WriteLargeRecordBatch(const RecordBatch& batch, - int64_t buffer_start_offset, - io::OutputStream* dst, int32_t* metadata_length, - int64_t* body_length, MemoryPool* pool); +ARROW_EXPORT +Status GetTensorSize(const Tensor& tensor, int64_t* size); /// EXPERIMENTAL: Write arrow::Tensor as a contiguous message /// -Status ARROW_EXPORT WriteTensor(const Tensor& tensor, io::OutputStream* dst, - int32_t* metadata_length, int64_t* body_length); +ARROW_EXPORT +Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length); + +#ifndef ARROW_NO_DEPRECATED_API +/// EXPERIMENTAL: Write RecordBatch allowing lengths over INT32_MAX. This data +/// may not be readable by all Arrow implementations +/// \deprecated Since 0.7.0 +ARROW_EXPORT +Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, + io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length, MemoryPool* pool); +#endif /// Backwards-compatibility for Arrow < 0.4.0 /// diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index 47d8ef60c4b..a1ccd994e6c 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -634,15 +634,11 @@ Status SerializeObject(PyObject* sequence, SerializedPyObject* out) { Status WriteSerializedObject(const SerializedPyObject& obj, io::OutputStream* dst) { int32_t num_tensors = static_cast(obj.tensors.size()); - std::shared_ptr writer; - int32_t metadata_length; - int64_t body_length; - RETURN_NOT_OK(dst->Write(reinterpret_cast(&num_tensors), sizeof(int32_t))); - RETURN_NOT_OK(ipc::RecordBatchStreamWriter::Open(dst, obj.batch->schema(), &writer)); - RETURN_NOT_OK(writer->WriteRecordBatch(*obj.batch)); - RETURN_NOT_OK(writer->Close()); + RETURN_NOT_OK(ipc::WriteRecordBatchStream({obj.batch}, dst)); + int32_t metadata_length; + int64_t body_length; for (const auto& tensor : obj.tensors) { RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length)); } From a3996fed5333bfe1be1974b5f4e23d6bc8396cca Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 21 Aug 2017 23:00:17 -0400 Subject: [PATCH 2/2] Add DCHECK to catch unequal schemas Change-Id: If097de5f2823a81b7d0f04dfe3712d2b9686c795 --- cpp/src/arrow/ipc/writer.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 2fd83461e36..9492364413e 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -564,6 +564,7 @@ Status WriteRecordBatchStream(const std::vector>& b RETURN_NOT_OK(RecordBatchStreamWriter::Open(dst, batches[0]->schema(), &writer)); for (const auto& batch : batches) { // allow sizes > INT32_MAX + DCHECK(batch->schema()->Equals(*batches[0]->schema())) << "Schemas unequal"; RETURN_NOT_OK(writer->WriteRecordBatch(*batch, true)); } RETURN_NOT_OK(writer->Close());