diff --git a/NOTICE.txt b/NOTICE.txt index a85101617ce..ce6e567dcb5 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12,3 +12,9 @@ This product includes software from the Numpy project (BSD-new) https://github.com/numpy/numpy/blob/e1f191c46f2eebd6cb892a4bfe14d9dd43a06c4e/numpy/core/src/multiarray/multiarraymodule.c#L2910 * Copyright (c) 1995, 1996, 1997 Jim Hugunin, hugunin@mit.edu * Copyright (c) 2005 Travis E. Oliphant oliphant@ee.byu.edu Brigham Young University + +This product includes software from the Feather project (Apache 2.0) +https://github.com/wesm/feather + +This product includes software from the DyND project (BSD 2-clause) +https://github.com/libdynd diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 1dd6c3a0230..c168c91c5f8 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -206,6 +206,43 @@ Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) { return Status::OK(); } +// ---------------------------------------------------------------------- +// OutputStream that writes to resizable buffer + +static constexpr int64_t kBufferMinimumSize = 256; + +BufferOutputStream::BufferOutputStream(const std::shared_ptr& buffer) + : buffer_(buffer), + capacity_(buffer->size()), + position_(0), + mutable_data_(buffer->mutable_data()) {} + +Status BufferOutputStream::Close() { + return Status::OK(); +} + +Status BufferOutputStream::Tell(int64_t* position) { + *position = position_; + return Status::OK(); +} + +Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) { + RETURN_NOT_OK(Reserve(nbytes)); + std::memcpy(mutable_data_ + position_, data, nbytes); + position_ += nbytes; + return Status::OK(); +} + +Status BufferOutputStream::Reserve(int64_t nbytes) { + while (position_ + nbytes > capacity_) { + int64_t new_capacity = std::max(kBufferMinimumSize, capacity_ * 2); + RETURN_NOT_OK(buffer_->Resize(new_capacity)); + capacity_ = new_capacity; + } + mutable_data_ = buffer_->mutable_data(); + return Status::OK(); +} + // ---------------------------------------------------------------------- // In-memory buffer reader diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index 6fe47c3b515..51601a0a626 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -32,32 +32,30 @@ namespace arrow { class Buffer; -class MutableBuffer; +class ResizableBuffer; class Status; namespace io { // An output stream that writes to a MutableBuffer, such as one obtained from a // memory map -// -// TODO(wesm): Implement this class class ARROW_EXPORT BufferOutputStream : public OutputStream { public: - explicit BufferOutputStream(const std::shared_ptr& buffer) - : buffer_(buffer) {} + explicit BufferOutputStream(const std::shared_ptr& buffer); // Implement the OutputStream interface Status Close() override; Status Tell(int64_t* position) override; - Status Write(const uint8_t* data, int64_t length) override; - - // Returns the number of bytes remaining in the buffer - int64_t bytes_remaining() const; + Status Write(const uint8_t* data, int64_t nbytes) override; private: - std::shared_ptr buffer_; + // Ensures there is sufficient space available to write nbytes + Status Reserve(int64_t nbytes); + + std::shared_ptr buffer_; int64_t capacity_; int64_t position_; + uint8_t* mutable_data_; }; // A memory source that uses memory-mapped files for memory interactions diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index e5553a63581..bde8c5bf738 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -33,6 +33,7 @@ set(ARROW_IPC_TEST_LINK_LIBS set(ARROW_IPC_SRCS adapter.cc + file.cc metadata.cc metadata-internal.cc ) @@ -60,6 +61,10 @@ ADD_ARROW_TEST(ipc-adapter-test) ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test ${ARROW_IPC_TEST_LINK_LIBS}) +ADD_ARROW_TEST(ipc-file-test) +ARROW_TEST_LINK_LIBRARIES(ipc-file-test + ${ARROW_IPC_TEST_LINK_LIBS}) + ADD_ARROW_TEST(ipc-metadata-test) ARROW_TEST_LINK_LIBRARIES(ipc-metadata-test ${ARROW_IPC_TEST_LINK_LIBS}) @@ -70,14 +75,20 @@ set_source_files_properties(Metadata_generated.h PROPERTIES GENERATED TRUE) set(OUTPUT_DIR ${CMAKE_SOURCE_DIR}/src/arrow/ipc) set(FBS_OUTPUT_FILES "${OUTPUT_DIR}/Message_generated.h") -set(FBS_SRC ${CMAKE_SOURCE_DIR}/../format/Message.fbs) -get_filename_component(ABS_FBS_SRC ${FBS_SRC} ABSOLUTE) +set(FBS_SRC + ${CMAKE_SOURCE_DIR}/../format/Message.fbs + ${CMAKE_SOURCE_DIR}/../format/File.fbs) + +foreach(FIL ${FBS_SRC}) + get_filename_component(ABS_FIL ${FIL} ABSOLUTE) + list(APPEND ABS_FBS_SRC ${ABS_FIL}) +endforeach() add_custom_command( OUTPUT ${FBS_OUTPUT_FILES} COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${ABS_FBS_SRC} DEPENDS ${ABS_FBS_SRC} - COMMENT "Running flatc compiler on ${FBS_SRC}" + COMMENT "Running flatc compiler on ${ABS_FBS_SRC}" VERBATIM ) @@ -87,6 +98,7 @@ add_dependencies(arrow_objlib metadata_fbs) # Headers: top level install(FILES adapter.h + file.h metadata.h DESTINATION include/arrow/ipc) diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index 0e101c89303..89b7fb987c6 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -95,7 +95,7 @@ static bool IsListType(const DataType* type) { } // ---------------------------------------------------------------------- -// Row batch write path +// Record batch write path Status VisitArray(const Array* arr, std::vector* field_nodes, std::vector>* buffers, int max_recursion_depth) { @@ -132,28 +132,32 @@ Status VisitArray(const Array* arr, std::vector* field_nodes return Status::OK(); } -class RowBatchWriter { +class RecordBatchWriter { public: - RowBatchWriter(const RowBatch* batch, int max_recursion_depth) - : batch_(batch), max_recursion_depth_(max_recursion_depth) {} + RecordBatchWriter(const std::vector>& columns, int32_t num_rows, + int max_recursion_depth) + : columns_(&columns), + num_rows_(num_rows), + max_recursion_depth_(max_recursion_depth) {} Status AssemblePayload() { // Perform depth-first traversal of the row-batch - for (int i = 0; i < batch_->num_columns(); ++i) { - const Array* arr = batch_->column(i).get(); + for (size_t i = 0; i < columns_->size(); ++i) { + const Array* arr = (*columns_)[i].get(); RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_, max_recursion_depth_)); } return Status::OK(); } - Status Write(io::OutputStream* dst, int64_t* data_header_offset) { - // Write out all the buffers contiguously and compute the total size of the - // memory payload - int64_t offset = 0; - + Status Write( + io::OutputStream* dst, int64_t* body_end_offset, int64_t* header_end_offset) { // Get the starting position - int64_t position; - RETURN_NOT_OK(dst->Tell(&position)); + int64_t start_position; + RETURN_NOT_OK(dst->Tell(&start_position)); + + // Keep track of the current position so we can determine the size of the + // message body + int64_t position = start_position; for (size_t i = 0; i < buffers_.size(); ++i) { const Buffer* buffer = buffers_[i].get(); @@ -175,14 +179,16 @@ class RowBatchWriter { // are using from any OS-level shared memory. The thought is that systems // may (in the future) associate integer page id's with physical memory // pages (according to whatever is the desired shared memory mechanism) - buffer_meta_.push_back(flatbuf::Buffer(0, position + offset, size)); + buffer_meta_.push_back(flatbuf::Buffer(0, position, size)); if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); - offset += size; + position += size; } } + *body_end_offset = position; + // Now that we have computed the locations of all of the buffers in shared // memory, the data header can be converted to a flatbuffer and written out // @@ -192,27 +198,43 @@ class RowBatchWriter { // construct the flatbuffer data accessor object (see arrow::ipc::Message) std::shared_ptr data_header; RETURN_NOT_OK(WriteDataHeader( - batch_->num_rows(), offset, field_nodes_, buffer_meta_, &data_header)); + num_rows_, position - start_position, field_nodes_, buffer_meta_, &data_header)); // Write the data header at the end RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size())); - *data_header_offset = position + offset; + position += data_header->size(); + *header_end_offset = position; + + return Align(dst, &position); + } + + Status Align(io::OutputStream* dst, int64_t* position) { + // Write all buffers here on word boundaries + // TODO(wesm): Is there benefit to 64-byte padding in IPC? + int64_t remainder = PaddedLength(*position) - *position; + if (remainder > 0) { + RETURN_NOT_OK(dst->Write(kPaddingBytes, remainder)); + *position += remainder; + } return Status::OK(); } // This must be called after invoking AssemblePayload Status GetTotalSize(int64_t* size) { // emulates the behavior of Write without actually writing + int64_t body_offset; int64_t data_header_offset; MockOutputStream dst; - RETURN_NOT_OK(Write(&dst, &data_header_offset)); + RETURN_NOT_OK(Write(&dst, &body_offset, &data_header_offset)); *size = dst.GetExtentBytesWritten(); return Status::OK(); } private: - const RowBatch* batch_; + // Do not copy this vector. Ownership must be retained elsewhere + const std::vector>* columns_; + int32_t num_rows_; std::vector field_nodes_; std::vector buffer_meta_; @@ -220,29 +242,29 @@ class RowBatchWriter { int max_recursion_depth_; }; -Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch, int64_t* header_offset, - int max_recursion_depth) { +Status WriteRecordBatch(const std::vector>& columns, + int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset, + int64_t* header_end_offset, int max_recursion_depth) { DCHECK_GT(max_recursion_depth, 0); - RowBatchWriter serializer(batch, max_recursion_depth); + RecordBatchWriter serializer(columns, num_rows, max_recursion_depth); RETURN_NOT_OK(serializer.AssemblePayload()); - return serializer.Write(dst, header_offset); + return serializer.Write(dst, body_end_offset, header_end_offset); } -Status GetRowBatchSize(const RowBatch* batch, int64_t* size) { - RowBatchWriter serializer(batch, kMaxIpcRecursionDepth); +Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) { + RecordBatchWriter serializer( + batch->columns(), batch->num_rows(), kMaxIpcRecursionDepth); RETURN_NOT_OK(serializer.AssemblePayload()); RETURN_NOT_OK(serializer.GetTotalSize(size)); return Status::OK(); } // ---------------------------------------------------------------------- -// Row batch read path +// Record batch read path -static constexpr int64_t INIT_METADATA_SIZE = 4096; - -class RowBatchReader::RowBatchReaderImpl { +class RecordBatchReader::RecordBatchReaderImpl { public: - RowBatchReaderImpl(io::ReadableFileInterface* file, + RecordBatchReaderImpl(io::ReadableFileInterface* file, const std::shared_ptr& metadata, int max_recursion_depth) : file_(file), metadata_(metadata), max_recursion_depth_(max_recursion_depth) { num_buffers_ = metadata->num_buffers(); @@ -250,7 +272,7 @@ class RowBatchReader::RowBatchReaderImpl { } Status AssembleBatch( - const std::shared_ptr& schema, std::shared_ptr* out) { + const std::shared_ptr& schema, std::shared_ptr* out) { std::vector> arrays(schema->num_fields()); // The field_index and buffer_index are incremented in NextArray based on @@ -263,7 +285,7 @@ class RowBatchReader::RowBatchReaderImpl { RETURN_NOT_OK(NextArray(field, max_recursion_depth_, &arrays[i])); } - *out = std::make_shared(schema, metadata_->length(), arrays); + *out = std::make_shared(schema, metadata_->length(), arrays); return Status::OK(); } @@ -359,29 +381,31 @@ class RowBatchReader::RowBatchReaderImpl { int num_flattened_fields_; }; -Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position, - std::shared_ptr* out) { - return Open(file, position, kMaxIpcRecursionDepth, out); +Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset, + std::shared_ptr* out) { + return Open(file, offset, kMaxIpcRecursionDepth, out); } -Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position, - int max_recursion_depth, std::shared_ptr* out) { - std::shared_ptr metadata; - RETURN_NOT_OK(file->ReadAt(position, INIT_METADATA_SIZE, &metadata)); +Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset, + int max_recursion_depth, std::shared_ptr* out) { + std::shared_ptr buffer; + RETURN_NOT_OK(file->ReadAt(offset - sizeof(int32_t), sizeof(int32_t), &buffer)); - int32_t metadata_size = *reinterpret_cast(metadata->data()); + int32_t metadata_size = *reinterpret_cast(buffer->data()); - // We may not need to call ReadAt again - if (metadata_size > static_cast(INIT_METADATA_SIZE - sizeof(int32_t))) { - // We don't have enough data, read the indicated metadata size. - RETURN_NOT_OK(file->ReadAt(position + sizeof(int32_t), metadata_size, &metadata)); + if (metadata_size + static_cast(sizeof(int32_t)) > offset) { + return Status::Invalid("metadata size invalid"); } + // Read the metadata + RETURN_NOT_OK( + file->ReadAt(offset - metadata_size - sizeof(int32_t), metadata_size, &buffer)); + // TODO(wesm): buffer slicing here would be better in case ReadAt returns // allocated memory std::shared_ptr message; - RETURN_NOT_OK(Message::Open(metadata, &message)); + RETURN_NOT_OK(Message::Open(buffer, &message)); if (message->type() != Message::RECORD_BATCH) { return Status::Invalid("Metadata message is not a record batch"); @@ -389,19 +413,19 @@ Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position, std::shared_ptr batch_meta = message->GetRecordBatch(); - std::shared_ptr result(new RowBatchReader()); - result->impl_.reset(new RowBatchReaderImpl(file, batch_meta, max_recursion_depth)); + std::shared_ptr result(new RecordBatchReader()); + result->impl_.reset(new RecordBatchReaderImpl(file, batch_meta, max_recursion_depth)); *out = result; return Status::OK(); } // Here the explicit destructor is required for compilers to be aware of -// the complete information of RowBatchReader::RowBatchReaderImpl class -RowBatchReader::~RowBatchReader() {} +// the complete information of RecordBatchReader::RecordBatchReaderImpl class +RecordBatchReader::~RecordBatchReader() {} -Status RowBatchReader::GetRowBatch( - const std::shared_ptr& schema, std::shared_ptr* out) { +Status RecordBatchReader::GetRecordBatch( + const std::shared_ptr& schema, std::shared_ptr* out) { return impl_->AssembleBatch(schema, out); } diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h index 215b46f8f65..3fde18dde83 100644 --- a/cpp/src/arrow/ipc/adapter.h +++ b/cpp/src/arrow/ipc/adapter.h @@ -23,13 +23,14 @@ #include #include +#include #include "arrow/util/visibility.h" namespace arrow { class Array; -class RowBatch; +class RecordBatch; class Schema; class Status; @@ -50,7 +51,7 @@ class RecordBatchMessage; // TODO(emkornfield) investigate this more constexpr int kMaxIpcRecursionDepth = 64; -// Write the RowBatch (collection of equal-length Arrow arrays) to the output +// Write the RecordBatch (collection of equal-length Arrow arrays) to the output // stream // // First, each of the memory buffers are written out end-to-end @@ -60,39 +61,43 @@ constexpr int kMaxIpcRecursionDepth = 64; // // // -// Finally, the absolute offset (relative to the start of the output stream) to -// the start of the metadata / data header is returned in an out-variable -ARROW_EXPORT Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch, - int64_t* header_offset, int max_recursion_depth = kMaxIpcRecursionDepth); +// Finally, the absolute offsets (relative to the start of the output stream) +// to the end of the body and end of the metadata / data header (suffixed by +// the header size) is returned in out-variables +ARROW_EXPORT Status WriteRecordBatch(const std::vector>& columns, + int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset, + int64_t* header_end_offset, int max_recursion_depth = kMaxIpcRecursionDepth); -// int64_t GetRowBatchMetadata(const RowBatch* batch); +// int64_t GetRecordBatchMetadata(const RecordBatch* batch); // Compute the precise number of bytes needed in a contiguous memory segment to -// write the row batch. This involves generating the complete serialized +// write the record batch. This involves generating the complete serialized // Flatbuffers metadata. -ARROW_EXPORT Status GetRowBatchSize(const RowBatch* batch, int64_t* size); +ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size); // ---------------------------------------------------------------------- // "Read" path; does not copy data if the input supports zero copy reads -class ARROW_EXPORT RowBatchReader { +class ARROW_EXPORT RecordBatchReader { public: - static Status Open(io::ReadableFileInterface* file, int64_t position, - std::shared_ptr* out); + // The offset is the absolute position to the *end* of the record batch data + // header + static Status Open(io::ReadableFileInterface* file, int64_t offset, + std::shared_ptr* out); - static Status Open(io::ReadableFileInterface* file, int64_t position, - int max_recursion_depth, std::shared_ptr* out); + static Status Open(io::ReadableFileInterface* file, int64_t offset, + int max_recursion_depth, std::shared_ptr* out); - virtual ~RowBatchReader(); + virtual ~RecordBatchReader(); - // Reassemble the row batch. A Schema is required to be able to construct the - // right array containers - Status GetRowBatch( - const std::shared_ptr& schema, std::shared_ptr* out); + // Reassemble the record batch. A Schema is required to be able to construct + // the right array containers + Status GetRecordBatch( + const std::shared_ptr& schema, std::shared_ptr* out); private: - class RowBatchReaderImpl; - std::unique_ptr impl_; + class RecordBatchReaderImpl; + std::unique_ptr impl_; }; } // namespace ipc diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc new file mode 100644 index 00000000000..2bf10dde266 --- /dev/null +++ b/cpp/src/arrow/ipc/file.cc @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/ipc/file.h" + +#include +#include +#include +#include + +#include "arrow/ipc/adapter.h" +#include "arrow/ipc/metadata.h" +#include "arrow/ipc/util.h" +#include "arrow/io/interfaces.h" +#include "arrow/util/buffer.h" +#include "arrow/util/logging.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace ipc { + +static constexpr const char* kArrowMagicBytes = "ARROW1"; + +// ---------------------------------------------------------------------- +// Writer implementation + +FileWriter::FileWriter(io::OutputStream* sink, const std::shared_ptr& schema) + : sink_(sink), schema_(schema), position_(-1), started_(false) {} + +Status FileWriter::UpdatePosition() { + return sink_->Tell(&position_); +} + +Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr& schema, + std::shared_ptr* out) { + *out = std::shared_ptr(new FileWriter(sink, schema)); // ctor is private + RETURN_NOT_OK((*out)->UpdatePosition()); + return Status::OK(); +} + +Status FileWriter::Write(const uint8_t* data, int64_t nbytes) { + RETURN_NOT_OK(sink_->Write(data, nbytes)); + position_ += nbytes; + return Status::OK(); +} + +Status FileWriter::Align() { + int64_t remainder = PaddedLength(position_) - position_; + if (remainder > 0) { return Write(kPaddingBytes, remainder); } + return Status::OK(); +} + +Status FileWriter::WriteAligned(const uint8_t* data, int64_t nbytes) { + RETURN_NOT_OK(Write(data, nbytes)); + return Align(); +} + +Status FileWriter::Start() { + RETURN_NOT_OK(WriteAligned( + reinterpret_cast(kArrowMagicBytes), strlen(kArrowMagicBytes))); + started_ = true; + return Status::OK(); +} + +Status FileWriter::CheckStarted() { + if (!started_) { return Start(); } + return Status::OK(); +} + +Status FileWriter::WriteRecordBatch( + const std::vector>& columns, int32_t num_rows) { + RETURN_NOT_OK(CheckStarted()); + + int64_t offset = position_; + + int64_t body_end_offset; + int64_t header_end_offset; + RETURN_NOT_OK(arrow::ipc::WriteRecordBatch( + columns, num_rows, sink_, &body_end_offset, &header_end_offset)); + RETURN_NOT_OK(UpdatePosition()); + + DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes"; + + // There may be padding ever the end of the metadata, so we cannot rely on + // position_ + int32_t metadata_length = header_end_offset - body_end_offset; + int32_t body_length = body_end_offset - offset; + + // Append metadata, to be written in the footer later + record_batches_.emplace_back(offset, metadata_length, body_length); + + return Status::OK(); +} + +Status FileWriter::Close() { + // Write metadata + int64_t initial_position = position_; + RETURN_NOT_OK(WriteFileFooter(schema_.get(), dictionaries_, record_batches_, sink_)); + RETURN_NOT_OK(UpdatePosition()); + + // Write footer length + int32_t footer_length = position_ - initial_position; + + if (footer_length <= 0) { return Status::Invalid("Invalid file footer"); } + + RETURN_NOT_OK(Write(reinterpret_cast(&footer_length), sizeof(int32_t))); + + // Write magic bytes to end file + return Write( + reinterpret_cast(kArrowMagicBytes), strlen(kArrowMagicBytes)); +} + +// ---------------------------------------------------------------------- +// Reader implementation + +FileReader::FileReader( + const std::shared_ptr& file, int64_t footer_offset) + : file_(file), footer_offset_(footer_offset) {} + +FileReader::~FileReader() {} + +Status FileReader::Open(const std::shared_ptr& file, + std::shared_ptr* reader) { + int64_t footer_offset; + RETURN_NOT_OK(file->GetSize(&footer_offset)); + return Open(file, footer_offset, reader); +} + +Status FileReader::Open(const std::shared_ptr& file, + int64_t footer_offset, std::shared_ptr* reader) { + *reader = std::shared_ptr(new FileReader(file, footer_offset)); + return (*reader)->ReadFooter(); +} + +Status FileReader::ReadFooter() { + int magic_size = static_cast(strlen(kArrowMagicBytes)); + + if (footer_offset_ <= magic_size * 2 + 4) { + std::stringstream ss; + ss << "File is too small: " << footer_offset_; + return Status::Invalid(ss.str()); + } + + std::shared_ptr buffer; + int file_end_size = magic_size + sizeof(int32_t); + RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, &buffer)); + + if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) { + return Status::Invalid("Not an Arrow file"); + } + + int32_t footer_length = *reinterpret_cast(buffer->data()); + + if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > footer_offset_) { + return Status::Invalid("File is smaller than indicated metadata size"); + } + + // Now read the footer + RETURN_NOT_OK(file_->ReadAt( + footer_offset_ - footer_length - file_end_size, footer_length, &buffer)); + RETURN_NOT_OK(FileFooter::Open(buffer, &footer_)); + + // Get the schema + return footer_->GetSchema(&schema_); +} + +const std::shared_ptr& FileReader::schema() const { + return schema_; +} + +int FileReader::num_dictionaries() const { + return footer_->num_dictionaries(); +} + +int FileReader::num_record_batches() const { + return footer_->num_record_batches(); +} + +MetadataVersion::type FileReader::version() const { + return footer_->version(); +} + +Status FileReader::GetRecordBatch(int i, std::shared_ptr* batch) { + DCHECK_GE(i, 0); + DCHECK_LT(i, num_record_batches()); + FileBlock block = footer_->record_batch(i); + int64_t metadata_end_offset = block.offset + block.body_length + block.metadata_length; + + std::shared_ptr reader; + RETURN_NOT_OK(RecordBatchReader::Open(file_.get(), metadata_end_offset, &reader)); + + return reader->GetRecordBatch(schema_, batch); +} + +} // namespace ipc +} // namespace arrow diff --git a/cpp/src/arrow/ipc/file.h b/cpp/src/arrow/ipc/file.h new file mode 100644 index 00000000000..4b79c98281b --- /dev/null +++ b/cpp/src/arrow/ipc/file.h @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Implement Arrow file layout for IPC/RPC purposes and short-lived storage + +#ifndef ARROW_IPC_FILE_H +#define ARROW_IPC_FILE_H + +#include +#include +#include + +#include "arrow/ipc/metadata.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; +class Buffer; +struct Field; +class RecordBatch; +class Schema; +class Status; + +namespace io { + +class OutputStream; +class ReadableFileInterface; + +} // namespace io + +namespace ipc { + +class ARROW_EXPORT FileWriter { + public: + static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, + std::shared_ptr* out); + + // TODO(wesm): Write dictionaries + + Status WriteRecordBatch( + const std::vector>& columns, int32_t num_rows); + + Status Close(); + + private: + FileWriter(io::OutputStream* sink, const std::shared_ptr& schema); + + Status CheckStarted(); + Status Start(); + + Status UpdatePosition(); + + // Adds padding bytes if necessary to ensure all memory blocks are written on + // 8-byte boundaries. + Status Align(); + + // Write data and update position + Status Write(const uint8_t* data, int64_t nbytes); + + // Write and align + Status WriteAligned(const uint8_t* data, int64_t nbytes); + + io::OutputStream* sink_; + std::shared_ptr schema_; + int64_t position_; + bool started_; + + std::vector dictionaries_; + std::vector record_batches_; +}; + +class ARROW_EXPORT FileReader { + public: + ~FileReader(); + + // Open a file-like object that is assumed to be self-contained; i.e., the + // end of the file interface is the end of the Arrow file. Note that there + // can be any amount of data preceding the Arrow-formatted data, because we + // need only locate the end of the Arrow file stream to discover the metadata + // and then proceed to read the data into memory. + static Status Open(const std::shared_ptr& file, + std::shared_ptr* reader); + + // If the file is embedded within some larger file or memory region, you can + // pass the absolute memory offset to the end of the file (which contains the + // metadata footer). The metadata must have been written with memory offsets + // relative to the start of the containing file + // + // @param file: the data source + // @param footer_offset: the position of the end of the Arrow "file" + static Status Open(const std::shared_ptr& file, + int64_t footer_offset, std::shared_ptr* reader); + + const std::shared_ptr& schema() const; + + // Shared dictionaries for dictionary-encoding cross record batches + // TODO(wesm): Implement dictionary reading when we also have dictionary + // encoding + int num_dictionaries() const; + + int num_record_batches() const; + + MetadataVersion::type version() const; + + // Read a record batch from the file. Does not copy memory if the input + // source supports zero-copy. + // + // TODO(wesm): Make the copy/zero-copy behavior configurable (e.g. provide an + // "always copy" option) + Status GetRecordBatch(int i, std::shared_ptr* batch); + + private: + FileReader( + const std::shared_ptr& file, int64_t footer_offset); + + Status ReadFooter(); + + std::shared_ptr file_; + + // The location where the Arrow file layout ends. May be the end of the file + // or some other location if embedded in a larger file. + int64_t footer_offset_; + + std::unique_ptr footer_; + std::shared_ptr schema_; +}; + +} // namespace ipc +} // namespace arrow + +#endif // ARROW_IPC_FILE_H diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index ca4d0152b90..f5611d4840c 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -43,31 +43,27 @@ namespace arrow { namespace ipc { -// TODO(emkornfield) convert to google style kInt32, etc? -const auto INT32 = std::make_shared(); -const auto LIST_INT32 = std::make_shared(INT32); -const auto LIST_LIST_INT32 = std::make_shared(LIST_INT32); - -typedef Status MakeRowBatch(std::shared_ptr* out); - -class TestWriteRowBatch : public ::testing::TestWithParam, - public io::MemoryMapFixture { +class TestWriteRecordBatch : public ::testing::TestWithParam, + public io::MemoryMapFixture { public: void SetUp() { pool_ = default_memory_pool(); } void TearDown() { io::MemoryMapFixture::TearDown(); } - Status RoundTripHelper(const RowBatch& batch, int memory_map_size, - std::shared_ptr* batch_result) { + Status RoundTripHelper(const RecordBatch& batch, int memory_map_size, + std::shared_ptr* batch_result) { std::string path = "test-write-row-batch"; io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); - int64_t header_location; - RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, &header_location)); + int64_t body_end_offset; + int64_t header_end_offset; - std::shared_ptr reader; - RETURN_NOT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader)); + RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), mmap_.get(), + &body_end_offset, &header_end_offset)); - RETURN_NOT_OK(reader->GetRowBatch(batch.schema(), batch_result)); + std::shared_ptr reader; + RETURN_NOT_OK(RecordBatchReader::Open(mmap_.get(), header_end_offset, &reader)); + + RETURN_NOT_OK(reader->GetRecordBatch(batch.schema(), batch_result)); return Status::OK(); } @@ -76,10 +72,10 @@ class TestWriteRowBatch : public ::testing::TestWithParam, MemoryPool* pool_; }; -TEST_P(TestWriteRowBatch, RoundTrip) { - std::shared_ptr batch; +TEST_P(TestWriteRecordBatch, RoundTrip) { + std::shared_ptr batch; ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue - std::shared_ptr batch_result; + std::shared_ptr batch_result; ASSERT_OK(RoundTripHelper(*batch, 1 << 16, &batch_result)); // do checks @@ -93,217 +89,39 @@ TEST_P(TestWriteRowBatch, RoundTrip) { } } -Status MakeIntRowBatch(std::shared_ptr* out) { - const int length = 1000; - - // Make the schema - auto f0 = std::make_shared("f0", INT32); - auto f1 = std::make_shared("f1", INT32); - std::shared_ptr schema(new Schema({f0, f1})); - - // Example data - std::shared_ptr a0, a1; - MemoryPool* pool = default_memory_pool(); - RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0)); - RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1)); - out->reset(new RowBatch(schema, length, {a0, a1})); - return Status::OK(); -} +INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch, + ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, + &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, + &MakeStringTypesRecordBatch, &MakeStruct)); -template -Status MakeRandomBinaryArray( - const TypePtr& type, int32_t length, MemoryPool* pool, ArrayPtr* array) { - const std::vector values = { - "", "", "abc", "123", "efg", "456!@#!@#", "12312"}; - Builder builder(pool, type); - const auto values_len = values.size(); - for (int32_t i = 0; i < length; ++i) { - int values_index = i % values_len; - if (values_index == 0) { - RETURN_NOT_OK(builder.AppendNull()); - } else { - const std::string& value = values[values_index]; - RETURN_NOT_OK( - builder.Append(reinterpret_cast(value.data()), value.size())); - } - } - *array = builder.Finish(); - return Status::OK(); -} - -Status MakeStringTypesRowBatch(std::shared_ptr* out) { - const int32_t length = 500; - auto string_type = std::make_shared(); - auto binary_type = std::make_shared(); - auto f0 = std::make_shared("f0", string_type); - auto f1 = std::make_shared("f1", binary_type); - std::shared_ptr schema(new Schema({f0, f1})); - - std::shared_ptr a0, a1; - MemoryPool* pool = default_memory_pool(); - - { - auto status = - MakeRandomBinaryArray(string_type, length, pool, &a0); - RETURN_NOT_OK(status); - } - { - auto status = - MakeRandomBinaryArray(binary_type, length, pool, &a1); - RETURN_NOT_OK(status); - } - out->reset(new RowBatch(schema, length, {a0, a1})); - return Status::OK(); -} - -Status MakeListRowBatch(std::shared_ptr* out) { - // Make the schema - auto f0 = std::make_shared("f0", LIST_INT32); - auto f1 = std::make_shared("f1", LIST_LIST_INT32); - auto f2 = std::make_shared("f2", INT32); - std::shared_ptr schema(new Schema({f0, f1, f2})); - - // Example data - - MemoryPool* pool = default_memory_pool(); - const int length = 200; - std::shared_ptr leaf_values, list_array, list_list_array, flat_array; - const bool include_nulls = true; - RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values)); - RETURN_NOT_OK( - MakeRandomListArray(leaf_values, length, include_nulls, pool, &list_array)); - RETURN_NOT_OK( - MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array)); - RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array)); - out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array})); - return Status::OK(); -} - -Status MakeZeroLengthRowBatch(std::shared_ptr* out) { - // Make the schema - auto f0 = std::make_shared("f0", LIST_INT32); - auto f1 = std::make_shared("f1", LIST_LIST_INT32); - auto f2 = std::make_shared("f2", INT32); - std::shared_ptr schema(new Schema({f0, f1, f2})); - - // Example data - MemoryPool* pool = default_memory_pool(); - const int length = 200; - const bool include_nulls = true; - std::shared_ptr leaf_values, list_array, list_list_array, flat_array; - RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &leaf_values)); - RETURN_NOT_OK(MakeRandomListArray(leaf_values, 0, include_nulls, pool, &list_array)); - RETURN_NOT_OK( - MakeRandomListArray(list_array, 0, include_nulls, pool, &list_list_array)); - RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array)); - out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array})); - return Status::OK(); -} - -Status MakeNonNullRowBatch(std::shared_ptr* out) { - // Make the schema - auto f0 = std::make_shared("f0", LIST_INT32); - auto f1 = std::make_shared("f1", LIST_LIST_INT32); - auto f2 = std::make_shared("f2", INT32); - std::shared_ptr schema(new Schema({f0, f1, f2})); - - // Example data - MemoryPool* pool = default_memory_pool(); - const int length = 50; - std::shared_ptr leaf_values, list_array, list_list_array, flat_array; - - RETURN_NOT_OK(MakeRandomInt32Array(1000, true, pool, &leaf_values)); - bool include_nulls = false; - RETURN_NOT_OK( - MakeRandomListArray(leaf_values, length, include_nulls, pool, &list_array)); - RETURN_NOT_OK( - MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array)); - RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array)); - out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array})); - return Status::OK(); -} - -Status MakeDeeplyNestedList(std::shared_ptr* out) { - const int batch_length = 5; - TypePtr type = INT32; - - MemoryPool* pool = default_memory_pool(); - ArrayPtr array; - const bool include_nulls = true; - RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &array)); - for (int i = 0; i < 63; ++i) { - type = std::static_pointer_cast(std::make_shared(type)); - RETURN_NOT_OK(MakeRandomListArray(array, batch_length, include_nulls, pool, &array)); - } - - auto f0 = std::make_shared("f0", type); - std::shared_ptr schema(new Schema({f0})); - std::vector arrays = {array}; - out->reset(new RowBatch(schema, batch_length, arrays)); - return Status::OK(); -} - -Status MakeStruct(std::shared_ptr* out) { - // reuse constructed list columns - std::shared_ptr list_batch; - RETURN_NOT_OK(MakeListRowBatch(&list_batch)); - std::vector columns = { - list_batch->column(0), list_batch->column(1), list_batch->column(2)}; - auto list_schema = list_batch->schema(); - - // Define schema - std::shared_ptr type(new StructType( - {list_schema->field(0), list_schema->field(1), list_schema->field(2)})); - auto f0 = std::make_shared("non_null_struct", type); - auto f1 = std::make_shared("null_struct", type); - std::shared_ptr schema(new Schema({f0, f1})); - - // construct individual nullable/non-nullable struct arrays - ArrayPtr no_nulls(new StructArray(type, list_batch->num_rows(), columns)); - std::vector null_bytes(list_batch->num_rows(), 1); - null_bytes[0] = 0; - std::shared_ptr null_bitmask; - RETURN_NOT_OK(util::bytes_to_bits(null_bytes, &null_bitmask)); - ArrayPtr with_nulls( - new StructArray(type, list_batch->num_rows(), columns, 1, null_bitmask)); - - // construct batch - std::vector arrays = {no_nulls, with_nulls}; - out->reset(new RowBatch(schema, list_batch->num_rows(), arrays)); - return Status::OK(); -} - -INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch, - ::testing::Values(&MakeIntRowBatch, &MakeListRowBatch, &MakeNonNullRowBatch, - &MakeZeroLengthRowBatch, &MakeDeeplyNestedList, - &MakeStringTypesRowBatch, &MakeStruct)); - -void TestGetRowBatchSize(std::shared_ptr batch) { +void TestGetRecordBatchSize(std::shared_ptr batch) { ipc::MockOutputStream mock; - int64_t mock_header_location = -1; + int64_t mock_header_offset = -1; + int64_t mock_body_offset = -1; int64_t size = -1; - ASSERT_OK(WriteRowBatch(&mock, batch.get(), &mock_header_location)); - ASSERT_OK(GetRowBatchSize(batch.get(), &size)); + ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), &mock, + &mock_body_offset, &mock_header_offset)); + ASSERT_OK(GetRecordBatchSize(batch.get(), &size)); ASSERT_EQ(mock.GetExtentBytesWritten(), size); } -TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) { - std::shared_ptr batch; +TEST_F(TestWriteRecordBatch, IntegerGetRecordBatchSize) { + std::shared_ptr batch; - ASSERT_OK(MakeIntRowBatch(&batch)); - TestGetRowBatchSize(batch); + ASSERT_OK(MakeIntRecordBatch(&batch)); + TestGetRecordBatchSize(batch); - ASSERT_OK(MakeListRowBatch(&batch)); - TestGetRowBatchSize(batch); + ASSERT_OK(MakeListRecordBatch(&batch)); + TestGetRecordBatchSize(batch); - ASSERT_OK(MakeZeroLengthRowBatch(&batch)); - TestGetRowBatchSize(batch); + ASSERT_OK(MakeZeroLengthRecordBatch(&batch)); + TestGetRecordBatchSize(batch); - ASSERT_OK(MakeNonNullRowBatch(&batch)); - TestGetRowBatchSize(batch); + ASSERT_OK(MakeNonNullRecordBatch(&batch)); + TestGetRecordBatchSize(batch); ASSERT_OK(MakeDeeplyNestedList(&batch)); - TestGetRowBatchSize(batch); + TestGetRecordBatchSize(batch); } class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { @@ -314,7 +132,7 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { Status WriteToMmap(int recursion_level, bool override_level, int64_t* header_out = nullptr, std::shared_ptr* schema_out = nullptr) { const int batch_length = 5; - TypePtr type = INT32; + TypePtr type = kInt32; ArrayPtr array; const bool include_nulls = true; RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array)); @@ -328,18 +146,22 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { std::shared_ptr schema(new Schema({f0})); if (schema_out != nullptr) { *schema_out = schema; } std::vector arrays = {array}; - auto batch = std::make_shared(schema, batch_length, arrays); + auto batch = std::make_shared(schema, batch_length, arrays); std::string path = "test-write-past-max-recursion"; const int memory_map_size = 1 << 16; io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); - int64_t header_location; - int64_t* header_out_param = header_out == nullptr ? &header_location : header_out; + + int64_t body_offset; + int64_t header_offset; + + int64_t* header_out_param = header_out == nullptr ? &header_offset : header_out; if (override_level) { - return WriteRowBatch( - mmap_.get(), batch.get(), header_out_param, recursion_level + 1); + return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(), + &body_offset, header_out_param, recursion_level + 1); } else { - return WriteRowBatch(mmap_.get(), batch.get(), header_out_param); + return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(), + &body_offset, header_out_param); } } @@ -353,14 +175,14 @@ TEST_F(RecursionLimits, WriteLimit) { } TEST_F(RecursionLimits, ReadLimit) { - int64_t header_location = -1; + int64_t header_offset = -1; std::shared_ptr schema; - ASSERT_OK(WriteToMmap(64, true, &header_location, &schema)); + ASSERT_OK(WriteToMmap(64, true, &header_offset, &schema)); - std::shared_ptr reader; - ASSERT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader)); - std::shared_ptr batch_result; - ASSERT_RAISES(Invalid, reader->GetRowBatch(schema, &batch_result)); + std::shared_ptr reader; + ASSERT_OK(RecordBatchReader::Open(mmap_.get(), header_offset, &reader)); + std::shared_ptr batch_result; + ASSERT_RAISES(Invalid, reader->GetRecordBatch(schema, &batch_result)); } } // namespace ipc diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc new file mode 100644 index 00000000000..cd424bf385c --- /dev/null +++ b/cpp/src/arrow/ipc/ipc-file-test.cc @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" + +#include "arrow/io/memory.h" +#include "arrow/io/test-common.h" +#include "arrow/ipc/adapter.h" +#include "arrow/ipc/file.h" +#include "arrow/ipc/test-common.h" +#include "arrow/ipc/util.h" + +#include "arrow/test-util.h" +#include "arrow/types/list.h" +#include "arrow/types/primitive.h" +#include "arrow/types/string.h" +#include "arrow/types/struct.h" +#include "arrow/util/bit-util.h" +#include "arrow/util/buffer.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace ipc { + +class TestFileFormat : public ::testing::TestWithParam { + public: + void SetUp() { + pool_ = default_memory_pool(); + buffer_ = std::make_shared(pool_); + sink_.reset(new io::BufferOutputStream(buffer_)); + } + void TearDown() {} + + Status RoundTripHelper( + const RecordBatch& batch, std::vector>* out_batches) { + // Write the file + RETURN_NOT_OK(FileWriter::Open(sink_.get(), batch.schema(), &file_writer_)); + int num_batches = 3; + for (int i = 0; i < num_batches; ++i) { + RETURN_NOT_OK(file_writer_->WriteRecordBatch(batch.columns(), batch.num_rows())); + } + RETURN_NOT_OK(file_writer_->Close()); + + // Current offset into stream is the end of the file + int64_t footer_offset; + RETURN_NOT_OK(sink_->Tell(&footer_offset)); + + // Open the file + auto reader = std::make_shared(buffer_->data(), buffer_->size()); + RETURN_NOT_OK(FileReader::Open(reader, footer_offset, &file_reader_)); + + EXPECT_EQ(num_batches, file_reader_->num_record_batches()); + + out_batches->resize(num_batches); + for (int i = 0; i < num_batches; ++i) { + RETURN_NOT_OK(file_reader_->GetRecordBatch(i, &(*out_batches)[i])); + } + + return Status::OK(); + } + + void CompareBatch(const RecordBatch* left, const RecordBatch* right) { + ASSERT_TRUE(left->schema()->Equals(right->schema())); + ASSERT_EQ(left->num_columns(), right->num_columns()) + << left->schema()->ToString() << " result: " << right->schema()->ToString(); + EXPECT_EQ(left->num_rows(), right->num_rows()); + for (int i = 0; i < left->num_columns(); ++i) { + EXPECT_TRUE(left->column(i)->Equals(right->column(i))) + << "Idx: " << i << " Name: " << left->column_name(i); + } + } + + protected: + MemoryPool* pool_; + + std::unique_ptr sink_; + std::shared_ptr buffer_; + + std::shared_ptr file_writer_; + std::shared_ptr file_reader_; +}; + +TEST_P(TestFileFormat, RoundTrip) { + std::shared_ptr batch; + ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue + + std::vector> out_batches; + + ASSERT_OK(RoundTripHelper(*batch, &out_batches)); + + // Compare batches. Same + for (size_t i = 0; i < out_batches.size(); ++i) { + CompareBatch(batch.get(), out_batches[i].get()); + } +} + +INSTANTIATE_TEST_CASE_P(RoundTripTests, TestFileFormat, + ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, + &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, + &MakeStringTypesRecordBatch, &MakeStruct)); + +} // namespace ipc +} // namespace arrow diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc index 51d79cfb4c4..1dc39692332 100644 --- a/cpp/src/arrow/ipc/ipc-metadata-test.cc +++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc @@ -21,6 +21,7 @@ #include "gtest/gtest.h" +#include "arrow/io/memory.h" #include "arrow/ipc/metadata.h" #include "arrow/schema.h" #include "arrow/test-util.h" @@ -31,6 +32,8 @@ namespace arrow { class Buffer; +namespace ipc { + static inline void assert_schema_equal(const Schema* lhs, const Schema* rhs) { if (!lhs->Equals(*rhs)) { std::stringstream ss; @@ -46,14 +49,14 @@ class TestSchemaMessage : public ::testing::Test { void CheckRoundtrip(const Schema* schema) { std::shared_ptr buffer; - ASSERT_OK(ipc::WriteSchema(schema, &buffer)); + ASSERT_OK(WriteSchema(schema, &buffer)); - std::shared_ptr message; - ASSERT_OK(ipc::Message::Open(buffer, &message)); + std::shared_ptr message; + ASSERT_OK(Message::Open(buffer, &message)); - ASSERT_EQ(ipc::Message::SCHEMA, message->type()); + ASSERT_EQ(Message::SCHEMA, message->type()); - std::shared_ptr schema_msg = message->GetSchema(); + std::shared_ptr schema_msg = message->GetSchema(); ASSERT_EQ(schema->num_fields(), schema_msg->num_fields()); std::shared_ptr schema2; @@ -94,4 +97,68 @@ TEST_F(TestSchemaMessage, NestedFields) { CheckRoundtrip(&schema); } +class TestFileFooter : public ::testing::Test { + public: + void SetUp() {} + + void CheckRoundtrip(const Schema* schema, const std::vector& dictionaries, + const std::vector& record_batches) { + auto buffer = std::make_shared(); + io::BufferOutputStream stream(buffer); + + ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream)); + + std::unique_ptr footer; + ASSERT_OK(FileFooter::Open(buffer, &footer)); + + ASSERT_EQ(MetadataVersion::V1_SNAPSHOT, footer->version()); + + // Check schema + std::shared_ptr schema2; + ASSERT_OK(footer->GetSchema(&schema2)); + assert_schema_equal(schema, schema2.get()); + + // Check blocks + ASSERT_EQ(dictionaries.size(), footer->num_dictionaries()); + ASSERT_EQ(record_batches.size(), footer->num_record_batches()); + + for (int i = 0; i < footer->num_dictionaries(); ++i) { + CheckBlocks(dictionaries[i], footer->dictionary(i)); + } + + for (int i = 0; i < footer->num_record_batches(); ++i) { + CheckBlocks(record_batches[i], footer->record_batch(i)); + } + } + + void CheckBlocks(const FileBlock& left, const FileBlock& right) { + ASSERT_EQ(left.offset, right.offset); + ASSERT_EQ(left.metadata_length, right.metadata_length); + ASSERT_EQ(left.body_length, right.body_length); + } + + private: + std::shared_ptr example_schema_; +}; + +TEST_F(TestFileFooter, Basics) { + auto f0 = std::make_shared("f0", std::make_shared()); + auto f1 = std::make_shared("f1", std::make_shared()); + Schema schema({f0, f1}); + + std::vector dictionaries; + dictionaries.emplace_back(8, 92, 900); + dictionaries.emplace_back(1000, 100, 1900); + dictionaries.emplace_back(3000, 100, 2900); + + std::vector record_batches; + record_batches.emplace_back(6000, 100, 900); + record_batches.emplace_back(7000, 100, 1900); + record_batches.emplace_back(9000, 100, 2900); + record_batches.emplace_back(12000, 100, 3900); + + CheckRoundtrip(&schema, dictionaries, record_batches); +} + +} // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 05e9c7ad4d3..7102012c29a 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -31,10 +31,6 @@ #include "arrow/util/buffer.h" #include "arrow/util/status.h" -typedef flatbuffers::FlatBufferBuilder FBB; -typedef flatbuffers::Offset FieldOffset; -typedef flatbuffers::Offset Offset; - namespace arrow { namespace flatbuf = org::apache::arrow::flatbuf; @@ -52,6 +48,8 @@ const std::shared_ptr UINT32 = std::make_shared(); const std::shared_ptr UINT64 = std::make_shared(); const std::shared_ptr FLOAT = std::make_shared(); const std::shared_ptr DOUBLE = std::make_shared(); +const std::shared_ptr STRING = std::make_shared(); +const std::shared_ptr BINARY = std::make_shared(); static Status IntFromFlatbuffer( const flatbuf::Int* int_data, std::shared_ptr* out) { @@ -102,8 +100,11 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, return FloatFromFlatuffer( static_cast(type_data), out); case flatbuf::Type_Binary: + *out = BINARY; + return Status::OK(); case flatbuf::Type_Utf8: - return Status::NotImplemented("Type is not implemented"); + *out = STRING; + return Status::OK(); case flatbuf::Type_Bool: *out = BOOL; return Status::OK(); @@ -193,6 +194,14 @@ static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr& type, *out_type = flatbuf::Type_FloatingPoint; *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_DOUBLE); break; + case Type::BINARY: + *out_type = flatbuf::Type_Binary; + *offset = flatbuf::CreateBinary(fbb).Union(); + break; + case Type::STRING: + *out_type = flatbuf::Type_Utf8; + *offset = flatbuf::CreateUtf8(fbb).Union(); + break; case Type::LIST: *out_type = flatbuf::Type_List; return ListToFlatbuffer(fbb, type, children, offset); @@ -255,19 +264,26 @@ flatbuf::Endianness endianness() { return bint.c[0] == 1 ? flatbuf::Endianness_Big : flatbuf::Endianness_Little; } -Status MessageBuilder::SetSchema(const Schema* schema) { - header_type_ = flatbuf::MessageHeader_Schema; - +Status SchemaToFlatbuffer( + FBB& fbb, const Schema* schema, flatbuffers::Offset* out) { std::vector field_offsets; for (int i = 0; i < schema->num_fields(); ++i) { const std::shared_ptr& field = schema->field(i); FieldOffset offset; - RETURN_NOT_OK(FieldToFlatbuffer(fbb_, field, &offset)); + RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, &offset)); field_offsets.push_back(offset); } - header_ = - flatbuf::CreateSchema(fbb_, endianness(), fbb_.CreateVector(field_offsets)).Union(); + *out = flatbuf::CreateSchema(fbb, endianness(), fbb.CreateVector(field_offsets)); + return Status::OK(); +} + +Status MessageBuilder::SetSchema(const Schema* schema) { + flatbuffers::Offset fb_schema; + RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, &fb_schema)); + + header_type_ = flatbuf::MessageHeader_Schema; + header_ = fb_schema.Union(); body_length_ = 0; return Status::OK(); } @@ -301,17 +317,17 @@ Status MessageBuilder::Finish() { } Status MessageBuilder::GetBuffer(std::shared_ptr* out) { - // The message buffer is prefixed by the size of the complete flatbuffer as + // The message buffer is suffixed by the size of the complete flatbuffer as // int32_t - // + // int32_t size = fbb_.GetSize(); auto result = std::make_shared(); RETURN_NOT_OK(result->Resize(size + sizeof(int32_t))); uint8_t* dst = result->mutable_data(); - memcpy(dst, reinterpret_cast(&size), sizeof(int32_t)); - memcpy(dst + sizeof(int32_t), fbb_.GetBufferPointer(), size); + memcpy(dst, fbb_.GetBufferPointer(), size); + memcpy(dst + size, reinterpret_cast(&size), sizeof(int32_t)); *out = result; return Status::OK(); diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index d38df840ba0..c404cfde22c 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -24,7 +24,9 @@ #include "flatbuffers/flatbuffers.h" +#include "arrow/ipc/File_generated.h" #include "arrow/ipc/Message_generated.h" +#include "arrow/ipc/metadata.h" namespace arrow { @@ -37,11 +39,18 @@ class Status; namespace ipc { +using FBB = flatbuffers::FlatBufferBuilder; +using FieldOffset = flatbuffers::Offset; +using Offset = flatbuffers::Offset; + static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V1_SNAPSHOT; Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr* out); +Status SchemaToFlatbuffer( + FBB& fbb, const Schema* schema, flatbuffers::Offset* out); + class MessageBuilder { public: Status SetSchema(const Schema* schema); diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index e510755110e..66df8a6711f 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -23,7 +23,8 @@ #include "flatbuffers/flatbuffers.h" -// Generated C++ flatbuffer IDL +#include "arrow/io/interfaces.h" +#include "arrow/ipc/File_generated.h" #include "arrow/ipc/Message_generated.h" #include "arrow/ipc/metadata-internal.h" @@ -47,9 +48,10 @@ Status WriteSchema(const Schema* schema, std::shared_ptr* out) { //---------------------------------------------------------------------- // Message reader -class Message::Impl { +class Message::MessageImpl { public: - explicit Impl(const std::shared_ptr& buffer, const flatbuf::Message* message) + explicit MessageImpl( + const std::shared_ptr& buffer, const flatbuf::Message* message) : buffer_(buffer), message_(message) {} Message::Type type() const { @@ -76,31 +78,16 @@ class Message::Impl { const flatbuf::Message* message_; }; -class SchemaMessage::Impl { - public: - explicit Impl(const void* schema) - : schema_(static_cast(schema)) {} - - const flatbuf::Field* field(int i) const { return schema_->fields()->Get(i); } - - int num_fields() const { return schema_->fields()->size(); } - - private: - const flatbuf::Schema* schema_; -}; - Message::Message() {} Status Message::Open( const std::shared_ptr& buffer, std::shared_ptr* out) { std::shared_ptr result(new Message()); - // The buffer is prefixed by its size as int32_t - const uint8_t* fb_head = buffer->data() + sizeof(int32_t); - const flatbuf::Message* message = flatbuf::GetMessage(fb_head); + const flatbuf::Message* message = flatbuf::GetMessage(buffer->data()); // TODO(wesm): verify message - result->impl_.reset(new Impl(buffer, message)); + result->impl_.reset(new MessageImpl(buffer, message)); *out = result; return Status::OK(); @@ -122,10 +109,26 @@ std::shared_ptr Message::GetSchema() { return std::make_shared(this->shared_from_this(), impl_->header()); } +// ---------------------------------------------------------------------- +// SchemaMessage + +class SchemaMessage::SchemaMessageImpl { + public: + explicit SchemaMessageImpl(const void* schema) + : schema_(static_cast(schema)) {} + + const flatbuf::Field* field(int i) const { return schema_->fields()->Get(i); } + + int num_fields() const { return schema_->fields()->size(); } + + private: + const flatbuf::Schema* schema_; +}; + SchemaMessage::SchemaMessage( const std::shared_ptr& message, const void* schema) { message_ = message; - impl_.reset(new Impl(schema)); + impl_.reset(new SchemaMessageImpl(schema)); } int SchemaMessage::num_fields() const { @@ -146,9 +149,12 @@ Status SchemaMessage::GetSchema(std::shared_ptr* out) const { return Status::OK(); } -class RecordBatchMessage::Impl { +// ---------------------------------------------------------------------- +// RecordBatchMessage + +class RecordBatchMessage::RecordBatchMessageImpl { public: - explicit Impl(const void* batch) + explicit RecordBatchMessageImpl(const void* batch) : batch_(static_cast(batch)) { nodes_ = batch_->nodes(); buffers_ = batch_->buffers(); @@ -177,7 +183,7 @@ std::shared_ptr Message::GetRecordBatch() { RecordBatchMessage::RecordBatchMessage( const std::shared_ptr& message, const void* batch) { message_ = message; - impl_.reset(new Impl(batch)); + impl_.reset(new RecordBatchMessageImpl(batch)); } // TODO(wesm): Copying the flatbuffer data isn't great, but this will do for @@ -213,5 +219,122 @@ int RecordBatchMessage::num_fields() const { return impl_->num_fields(); } +// ---------------------------------------------------------------------- +// File footer + +static flatbuffers::Offset> +FileBlocksToFlatbuffer(FBB& fbb, const std::vector& blocks) { + std::vector fb_blocks; + + for (const FileBlock& block : blocks) { + fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length); + } + + return fbb.CreateVectorOfStructs(fb_blocks); +} + +Status WriteFileFooter(const Schema* schema, const std::vector& dictionaries, + const std::vector& record_batches, io::OutputStream* out) { + FBB fbb; + + flatbuffers::Offset fb_schema; + RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema)); + + auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); + auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); + + auto footer = flatbuf::CreateFooter( + fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches); + + fbb.Finish(footer); + + int32_t size = fbb.GetSize(); + + return out->Write(fbb.GetBufferPointer(), size); +} + +static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { + return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength()); +} + +class FileFooter::FileFooterImpl { + public: + FileFooterImpl(const std::shared_ptr& buffer, const flatbuf::Footer* footer) + : buffer_(buffer), footer_(footer) {} + + int num_dictionaries() const { return footer_->dictionaries()->size(); } + + int num_record_batches() const { return footer_->recordBatches()->size(); } + + MetadataVersion::type version() const { + switch (footer_->version()) { + case flatbuf::MetadataVersion_V1_SNAPSHOT: + return MetadataVersion::V1_SNAPSHOT; + // Add cases as other versions become available + default: + return MetadataVersion::V1_SNAPSHOT; + } + } + + FileBlock record_batch(int i) const { + return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); + } + + FileBlock dictionary(int i) const { + return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); + } + + Status GetSchema(std::shared_ptr* out) const { + auto schema_msg = std::make_shared(nullptr, footer_->schema()); + return schema_msg->GetSchema(out); + } + + private: + // Retain reference to memory + std::shared_ptr buffer_; + + const flatbuf::Footer* footer_; +}; + +FileFooter::FileFooter() {} + +FileFooter::~FileFooter() {} + +Status FileFooter::Open( + const std::shared_ptr& buffer, std::unique_ptr* out) { + const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data()); + + *out = std::unique_ptr(new FileFooter()); + + // TODO(wesm): Verify the footer + (*out)->impl_.reset(new FileFooterImpl(buffer, footer)); + + return Status::OK(); +} + +int FileFooter::num_dictionaries() const { + return impl_->num_dictionaries(); +} + +int FileFooter::num_record_batches() const { + return impl_->num_record_batches(); +} + +MetadataVersion::type FileFooter::version() const { + return impl_->version(); +} + +FileBlock FileFooter::record_batch(int i) const { + return impl_->record_batch(i); +} + +FileBlock FileFooter::dictionary(int i) const { + return impl_->dictionary(i); +} + +Status FileFooter::GetSchema(std::shared_ptr* out) const { + return impl_->GetSchema(out); +} + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index d5ec53317e6..2f0e853bf97 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -22,6 +22,7 @@ #include #include +#include #include "arrow/util/visibility.h" @@ -32,17 +33,24 @@ struct Field; class Schema; class Status; +namespace io { + +class OutputStream; + +} // namespace io + namespace ipc { +struct MetadataVersion { + enum type { V1_SNAPSHOT }; +}; + //---------------------------------------------------------------------- -// Message read/write APIs // Serialize arrow::Schema as a Flatbuffer ARROW_EXPORT Status WriteSchema(const Schema* schema, std::shared_ptr* out); -//---------------------------------------------------------------------- - // Read interface classes. We do not fully deserialize the flatbuffers so that // individual fields metadata can be retrieved from very large schema without // @@ -68,8 +76,8 @@ class ARROW_EXPORT SchemaMessage { // Parent, owns the flatbuffer data std::shared_ptr message_; - class Impl; - std::unique_ptr impl_; + class SchemaMessageImpl; + std::unique_ptr impl_; }; // Field metadata @@ -101,8 +109,8 @@ class ARROW_EXPORT RecordBatchMessage { // Parent, owns the flatbuffer data std::shared_ptr message_; - class Impl; - std::unique_ptr impl_; + class RecordBatchMessageImpl; + std::unique_ptr impl_; }; class ARROW_EXPORT DictionaryBatchMessage { @@ -133,8 +141,46 @@ class ARROW_EXPORT Message : public std::enable_shared_from_this { Message(); // Hide serialization details from user API - class Impl; - std::unique_ptr impl_; + class MessageImpl; + std::unique_ptr impl_; +}; + +// ---------------------------------------------------------------------- +// File footer for file-like representation + +struct FileBlock { + FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length) + : offset(offset), metadata_length(metadata_length), body_length(body_length) {} + + int64_t offset; + int32_t metadata_length; + int64_t body_length; +}; + +ARROW_EXPORT +Status WriteFileFooter(const Schema* schema, const std::vector& dictionaries, + const std::vector& record_batches, io::OutputStream* out); + +class ARROW_EXPORT FileFooter { + public: + ~FileFooter(); + + static Status Open( + const std::shared_ptr& buffer, std::unique_ptr* out); + + int num_dictionaries() const; + int num_record_batches() const; + MetadataVersion::type version() const; + + FileBlock record_batch(int i) const; + FileBlock dictionary(int i) const; + + Status GetSchema(std::shared_ptr* out) const; + + private: + FileFooter(); + class FileFooterImpl; + std::unique_ptr impl_; }; } // namespace ipc diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index f6582fc883b..7d02bc302f4 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -25,21 +25,28 @@ #include #include "arrow/array.h" +#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/types/list.h" #include "arrow/types/primitive.h" +#include "arrow/types/string.h" +#include "arrow/types/struct.h" #include "arrow/util/buffer.h" #include "arrow/util/memory-pool.h" namespace arrow { namespace ipc { +const auto kInt32 = std::make_shared(); +const auto kListInt32 = std::make_shared(kInt32); +const auto kListListInt32 = std::make_shared(kListInt32); + Status MakeRandomInt32Array( int32_t length, bool include_nulls, MemoryPool* pool, std::shared_ptr* array) { std::shared_ptr data; test::MakeRandomInt32PoolBuffer(length, pool, &data); - const auto INT32 = std::make_shared(); - Int32Builder builder(pool, INT32); + const auto kInt32 = std::make_shared(); + Int32Builder builder(pool, kInt32); if (include_nulls) { std::shared_ptr valid_bytes; test::MakeRandomBytePoolBuffer(length, pool, &valid_bytes); @@ -87,6 +94,188 @@ Status MakeRandomListArray(const std::shared_ptr& child_array, int num_li return (*array)->Validate(); } +typedef Status MakeRecordBatch(std::shared_ptr* out); + +Status MakeIntRecordBatch(std::shared_ptr* out) { + const int length = 1000; + + // Make the schema + auto f0 = std::make_shared("f0", kInt32); + auto f1 = std::make_shared("f1", kInt32); + std::shared_ptr schema(new Schema({f0, f1})); + + // Example data + std::shared_ptr a0, a1; + MemoryPool* pool = default_memory_pool(); + RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0)); + RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1)); + out->reset(new RecordBatch(schema, length, {a0, a1})); + return Status::OK(); +} + +template +Status MakeRandomBinaryArray( + const TypePtr& type, int32_t length, MemoryPool* pool, ArrayPtr* array) { + const std::vector values = { + "", "", "abc", "123", "efg", "456!@#!@#", "12312"}; + Builder builder(pool, type); + const auto values_len = values.size(); + for (int32_t i = 0; i < length; ++i) { + int values_index = i % values_len; + if (values_index == 0) { + RETURN_NOT_OK(builder.AppendNull()); + } else { + const std::string& value = values[values_index]; + RETURN_NOT_OK( + builder.Append(reinterpret_cast(value.data()), value.size())); + } + } + *array = builder.Finish(); + return Status::OK(); +} + +Status MakeStringTypesRecordBatch(std::shared_ptr* out) { + const int32_t length = 500; + auto string_type = std::make_shared(); + auto binary_type = std::make_shared(); + auto f0 = std::make_shared("f0", string_type); + auto f1 = std::make_shared("f1", binary_type); + std::shared_ptr schema(new Schema({f0, f1})); + + std::shared_ptr a0, a1; + MemoryPool* pool = default_memory_pool(); + + { + auto status = + MakeRandomBinaryArray(string_type, length, pool, &a0); + RETURN_NOT_OK(status); + } + { + auto status = + MakeRandomBinaryArray(binary_type, length, pool, &a1); + RETURN_NOT_OK(status); + } + out->reset(new RecordBatch(schema, length, {a0, a1})); + return Status::OK(); +} + +Status MakeListRecordBatch(std::shared_ptr* out) { + // Make the schema + auto f0 = std::make_shared("f0", kListInt32); + auto f1 = std::make_shared("f1", kListListInt32); + auto f2 = std::make_shared("f2", kInt32); + std::shared_ptr schema(new Schema({f0, f1, f2})); + + // Example data + + MemoryPool* pool = default_memory_pool(); + const int length = 200; + std::shared_ptr leaf_values, list_array, list_list_array, flat_array; + const bool include_nulls = true; + RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values)); + RETURN_NOT_OK( + MakeRandomListArray(leaf_values, length, include_nulls, pool, &list_array)); + RETURN_NOT_OK( + MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array)); + RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array)); + out->reset(new RecordBatch(schema, length, {list_array, list_list_array, flat_array})); + return Status::OK(); +} + +Status MakeZeroLengthRecordBatch(std::shared_ptr* out) { + // Make the schema + auto f0 = std::make_shared("f0", kListInt32); + auto f1 = std::make_shared("f1", kListListInt32); + auto f2 = std::make_shared("f2", kInt32); + std::shared_ptr schema(new Schema({f0, f1, f2})); + + // Example data + MemoryPool* pool = default_memory_pool(); + const int length = 200; + const bool include_nulls = true; + std::shared_ptr leaf_values, list_array, list_list_array, flat_array; + RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &leaf_values)); + RETURN_NOT_OK(MakeRandomListArray(leaf_values, 0, include_nulls, pool, &list_array)); + RETURN_NOT_OK( + MakeRandomListArray(list_array, 0, include_nulls, pool, &list_list_array)); + RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array)); + out->reset(new RecordBatch(schema, length, {list_array, list_list_array, flat_array})); + return Status::OK(); +} + +Status MakeNonNullRecordBatch(std::shared_ptr* out) { + // Make the schema + auto f0 = std::make_shared("f0", kListInt32); + auto f1 = std::make_shared("f1", kListListInt32); + auto f2 = std::make_shared("f2", kInt32); + std::shared_ptr schema(new Schema({f0, f1, f2})); + + // Example data + MemoryPool* pool = default_memory_pool(); + const int length = 50; + std::shared_ptr leaf_values, list_array, list_list_array, flat_array; + + RETURN_NOT_OK(MakeRandomInt32Array(1000, true, pool, &leaf_values)); + bool include_nulls = false; + RETURN_NOT_OK( + MakeRandomListArray(leaf_values, length, include_nulls, pool, &list_array)); + RETURN_NOT_OK( + MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array)); + RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array)); + out->reset(new RecordBatch(schema, length, {list_array, list_list_array, flat_array})); + return Status::OK(); +} + +Status MakeDeeplyNestedList(std::shared_ptr* out) { + const int batch_length = 5; + TypePtr type = kInt32; + + MemoryPool* pool = default_memory_pool(); + ArrayPtr array; + const bool include_nulls = true; + RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &array)); + for (int i = 0; i < 63; ++i) { + type = std::static_pointer_cast(std::make_shared(type)); + RETURN_NOT_OK(MakeRandomListArray(array, batch_length, include_nulls, pool, &array)); + } + + auto f0 = std::make_shared("f0", type); + std::shared_ptr schema(new Schema({f0})); + std::vector arrays = {array}; + out->reset(new RecordBatch(schema, batch_length, arrays)); + return Status::OK(); +} + +Status MakeStruct(std::shared_ptr* out) { + // reuse constructed list columns + std::shared_ptr list_batch; + RETURN_NOT_OK(MakeListRecordBatch(&list_batch)); + std::vector columns = { + list_batch->column(0), list_batch->column(1), list_batch->column(2)}; + auto list_schema = list_batch->schema(); + + // Define schema + std::shared_ptr type(new StructType( + {list_schema->field(0), list_schema->field(1), list_schema->field(2)})); + auto f0 = std::make_shared("non_null_struct", type); + auto f1 = std::make_shared("null_struct", type); + std::shared_ptr schema(new Schema({f0, f1})); + + // construct individual nullable/non-nullable struct arrays + ArrayPtr no_nulls(new StructArray(type, list_batch->num_rows(), columns)); + std::vector null_bytes(list_batch->num_rows(), 1); + null_bytes[0] = 0; + std::shared_ptr null_bitmask; + RETURN_NOT_OK(util::bytes_to_bits(null_bytes, &null_bitmask)); + ArrayPtr with_nulls( + new StructArray(type, list_batch->num_rows(), columns, 1, null_bitmask)); + + // construct batch + std::vector arrays = {no_nulls, with_nulls}; + out->reset(new RecordBatch(schema, list_batch->num_rows(), arrays)); + return Status::OK(); +} + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h index 3f4001b21a9..94079a38277 100644 --- a/cpp/src/arrow/ipc/util.h +++ b/cpp/src/arrow/ipc/util.h @@ -27,6 +27,14 @@ namespace arrow { namespace ipc { +// Align on 8-byte boundaries +static constexpr int kArrowAlignment = 8; +static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0}; + +static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) { + return ((nbytes + alignment - 1) / alignment) * alignment; +} + // A helper class to tracks the size of allocations class MockOutputStream : public io::OutputStream { public: diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h index a9c64eca997..2689bebea30 100644 --- a/cpp/src/arrow/parquet/reader.h +++ b/cpp/src/arrow/parquet/reader.h @@ -31,7 +31,7 @@ namespace arrow { class Array; class MemoryPool; -class RowBatch; +class RecordBatch; class Status; class Table; diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h index 5aa1ba58717..ecc6a9f8be3 100644 --- a/cpp/src/arrow/parquet/writer.h +++ b/cpp/src/arrow/parquet/writer.h @@ -30,7 +30,7 @@ namespace arrow { class Array; class MemoryPool; class PrimitiveArray; -class RowBatch; +class RecordBatch; class Status; class StringArray; class Table; diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index d9573eae74d..3a250df81d0 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -27,11 +27,11 @@ namespace arrow { -RowBatch::RowBatch(const std::shared_ptr& schema, int num_rows, +RecordBatch::RecordBatch(const std::shared_ptr& schema, int num_rows, const std::vector>& columns) : schema_(schema), num_rows_(num_rows), columns_(columns) {} -const std::string& RowBatch::column_name(int i) const { +const std::string& RecordBatch::column_name(int i) const { return schema_->field(i)->name; } diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 2088fdf0b64..36b3c8ecaf4 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -32,15 +32,15 @@ class Column; class Schema; class Status; -// A row batch is a simpler and more rigid table data structure intended for +// A record batch is a simpler and more rigid table data structure intended for // use primarily in shared memory IPC. It contains a schema (metadata) and a -// corresponding vector of equal-length Arrow arrays -class ARROW_EXPORT RowBatch { +// corresponding sequence of equal-length Arrow arrays +class ARROW_EXPORT RecordBatch { public: - // num_rows is a parameter to allow for row batches of a particular size not + // num_rows is a parameter to allow for record batches of a particular size not // having any materialized columns. Each array should have the same length as // num_rows - RowBatch(const std::shared_ptr& schema, int num_rows, + RecordBatch(const std::shared_ptr& schema, int32_t num_rows, const std::vector>& columns); // @returns: the table's schema @@ -50,17 +50,19 @@ class ARROW_EXPORT RowBatch { // Note: Does not boundscheck const std::shared_ptr& column(int i) const { return columns_[i]; } + const std::vector>& columns() const { return columns_; } + const std::string& column_name(int i) const; // @returns: the number of columns in the table int num_columns() const { return columns_.size(); } // @returns: the number of rows (the corresponding length of each column) - int64_t num_rows() const { return num_rows_; } + int32_t num_rows() const { return num_rows_; } private: std::shared_ptr schema_; - int num_rows_; + int32_t num_rows_; std::vector> columns_; }; diff --git a/format/IPC.md b/format/IPC.md new file mode 100644 index 00000000000..1f39e762ab7 --- /dev/null +++ b/format/IPC.md @@ -0,0 +1,3 @@ +# Interprocess messaging / communication (IPC) + +## File format diff --git a/format/README.md b/format/README.md index 3b0e50364d8..78e15207ee9 100644 --- a/format/README.md +++ b/format/README.md @@ -9,6 +9,7 @@ Currently, the Arrow specification consists of these pieces: - Metadata specification (see Metadata.md) - Physical memory layout specification (see Layout.md) - Metadata serialized representation (see Message.fbs) +- Mechanics of messaging between Arrow systems (IPC, RPC, etc.) (see IPC.md) The metadata currently uses Google's [flatbuffers library][1] for serializing a couple related pieces of information: