Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-495: [C++] Implement streaming binary format, refactoring #293

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
option(ARROW_ALTIVEC
"Build Arrow with Altivec"
ON)

endif()

if(NOT ARROW_BUILD_TESTS)
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer)
Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
int64_t size = std::min(nbytes, size_ - position_);

if (buffer_ != nullptr) {
if (size > 0 && buffer_ != nullptr) {
*out = SliceBuffer(buffer_, position_, size);
} else {
*out = std::make_shared<Buffer>(data_ + position_, size);
}

position_ += nbytes;
position_ += size;
return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/ipc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ set(ARROW_IPC_SRCS
json-internal.cc
metadata.cc
metadata-internal.cc
stream.cc
)

if(NOT APPLE)
Expand Down
44 changes: 22 additions & 22 deletions cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ namespace ipc {

class RecordBatchWriter : public ArrayVisitor {
public:
RecordBatchWriter(const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows,
int64_t buffer_start_offset, int max_recursion_depth)
: columns_(columns),
num_rows_(num_rows),
RecordBatchWriter(
const RecordBatch& batch, int64_t buffer_start_offset, int max_recursion_depth)
: batch_(batch),
max_recursion_depth_(max_recursion_depth),
buffer_start_offset_(buffer_start_offset) {}

Expand All @@ -79,8 +78,8 @@ class RecordBatchWriter : public ArrayVisitor {
}

// Perform depth-first traversal of the row-batch
for (size_t i = 0; i < columns_.size(); ++i) {
RETURN_NOT_OK(VisitArray(*columns_[i].get()));
for (int i = 0; i < batch_.num_columns(); ++i) {
RETURN_NOT_OK(VisitArray(*batch_.column(i)));
}

// The position for the start of a buffer relative to the passed frame of
Expand Down Expand Up @@ -126,18 +125,23 @@ class RecordBatchWriter : public ArrayVisitor {
// itself as an int32_t.
std::shared_ptr<Buffer> metadata_fb;
RETURN_NOT_OK(WriteRecordBatchMetadata(
num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb));
batch_.num_rows(), body_length, field_nodes_, buffer_meta_, &metadata_fb));

// Need to write 4 bytes (metadata size), the metadata, plus padding to
// fall on an 8-byte offset
int64_t padded_metadata_length = BitUtil::CeilByte(metadata_fb->size() + 4);
// end on an 8-byte offset
int64_t start_offset;
RETURN_NOT_OK(dst->Tell(&start_offset));

int64_t padded_metadata_length = metadata_fb->size() + 4;
const int remainder = (padded_metadata_length + start_offset) % 8;
if (remainder != 0) { padded_metadata_length += 8 - remainder; }

// The returned metadata size includes the length prefix, the flatbuffer,
// plus padding
*metadata_length = static_cast<int32_t>(padded_metadata_length);

// Write the flatbuffer size prefix
int32_t flatbuffer_size = metadata_fb->size();
// Write the flatbuffer size prefix including padding
int32_t flatbuffer_size = padded_metadata_length - 4;
RETURN_NOT_OK(
dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));

Expand Down Expand Up @@ -294,9 +298,7 @@ class RecordBatchWriter : public ArrayVisitor {
return Status::OK();
}

// Do not copy this vector. Ownership must be retained elsewhere
const std::vector<std::shared_ptr<Array>>& columns_;
int32_t num_rows_;
const RecordBatch& batch_;

std::vector<flatbuf::FieldNode> field_nodes_;
std::vector<flatbuf::Buffer> buffer_meta_;
Expand All @@ -306,18 +308,16 @@ class RecordBatchWriter : public ArrayVisitor {
int64_t buffer_start_offset_;
};

Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
int32_t* metadata_length, int64_t* body_length, int max_recursion_depth) {
Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
int max_recursion_depth) {
DCHECK_GT(max_recursion_depth, 0);
RecordBatchWriter serializer(
columns, num_rows, buffer_start_offset, max_recursion_depth);
RecordBatchWriter serializer(batch, buffer_start_offset, max_recursion_depth);
return serializer.Write(dst, metadata_length, body_length);
}

Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
RecordBatchWriter serializer(
batch->columns(), batch->num_rows(), 0, kMaxIpcRecursionDepth);
Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
RecordBatchWriter serializer(batch, 0, kMaxIpcRecursionDepth);
RETURN_NOT_OK(serializer.GetTotalSize(size));
return Status::OK();
}
Expand Down
11 changes: 4 additions & 7 deletions cpp/src/arrow/ipc/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,14 @@ constexpr int kMaxIpcRecursionDepth = 64;
//
// @param(out) body_length: the size of the contiguous buffer block plus
// padding bytes
ARROW_EXPORT Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
int32_t* metadata_length, int64_t* body_length,
int max_recursion_depth = kMaxIpcRecursionDepth);

// int64_t GetRecordBatchMetadata(const RecordBatch* batch);
ARROW_EXPORT Status WriteRecordBatch(const RecordBatch& batch,
int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
int64_t* body_length, int max_recursion_depth = kMaxIpcRecursionDepth);

// Compute the precise number of bytes needed in a contiguous memory segment to
// write the record batch. This involves generating the complete serialized
// Flatbuffers metadata.
ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size);
ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);

// ----------------------------------------------------------------------
// "Read" path; does not copy data if the input supports zero copy reads
Expand Down
167 changes: 120 additions & 47 deletions cpp/src/arrow/ipc/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/adapter.h"
#include "arrow/ipc/metadata-internal.h"
#include "arrow/ipc/metadata.h"
#include "arrow/ipc/util.h"
#include "arrow/status.h"
Expand All @@ -35,82 +36,154 @@ namespace arrow {
namespace ipc {

static constexpr const char* kArrowMagicBytes = "ARROW1";

// ----------------------------------------------------------------------
// Writer implementation
// File footer

static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
std::vector<flatbuf::Block> fb_blocks;

FileWriter::FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
: sink_(sink), schema_(schema), position_(-1), started_(false) {}
for (const FileBlock& block : blocks) {
fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length);
}

Status FileWriter::UpdatePosition() {
return sink_->Tell(&position_);
return fbb.CreateVectorOfStructs(fb_blocks);
}

Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
std::shared_ptr<FileWriter>* out) {
*out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema)); // ctor is private
RETURN_NOT_OK((*out)->UpdatePosition());
return Status::OK();
Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
const std::vector<FileBlock>& record_batches, io::OutputStream* out) {
FBB fbb;

flatbuffers::Offset<flatbuf::Schema> fb_schema;
RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema));

auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);

auto footer = flatbuf::CreateFooter(
fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);

fbb.Finish(footer);

int32_t size = fbb.GetSize();

return out->Write(fbb.GetBufferPointer(), size);
}

Status FileWriter::Write(const uint8_t* data, int64_t nbytes) {
RETURN_NOT_OK(sink_->Write(data, nbytes));
position_ += nbytes;
return Status::OK();
static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength());
}

Status FileWriter::Align() {
int64_t remainder = PaddedLength(position_) - position_;
if (remainder > 0) { return Write(kPaddingBytes, remainder); }
class FileFooter::FileFooterImpl {
public:
FileFooterImpl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Footer* footer)
: buffer_(buffer), footer_(footer) {}

int num_dictionaries() const { return footer_->dictionaries()->size(); }

int num_record_batches() const { return footer_->recordBatches()->size(); }

MetadataVersion::type version() const {
switch (footer_->version()) {
case flatbuf::MetadataVersion_V1:
return MetadataVersion::V1;
case flatbuf::MetadataVersion_V2:
return MetadataVersion::V2;
// Add cases as other versions become available
default:
return MetadataVersion::V2;
}
}

FileBlock record_batch(int i) const {
return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
}

FileBlock dictionary(int i) const {
return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
}

Status GetSchema(std::shared_ptr<Schema>* out) const {
auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema());
return schema_msg->GetSchema(out);
}

private:
// Retain reference to memory
std::shared_ptr<Buffer> buffer_;

const flatbuf::Footer* footer_;
};

FileFooter::FileFooter() {}

FileFooter::~FileFooter() {}

Status FileFooter::Open(
const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) {
const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data());

*out = std::unique_ptr<FileFooter>(new FileFooter());

// TODO(wesm): Verify the footer
(*out)->impl_.reset(new FileFooterImpl(buffer, footer));

return Status::OK();
}

Status FileWriter::WriteAligned(const uint8_t* data, int64_t nbytes) {
RETURN_NOT_OK(Write(data, nbytes));
return Align();
int FileFooter::num_dictionaries() const {
return impl_->num_dictionaries();
}

Status FileWriter::Start() {
RETURN_NOT_OK(WriteAligned(
reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)));
started_ = true;
return Status::OK();
int FileFooter::num_record_batches() const {
return impl_->num_record_batches();
}

Status FileWriter::CheckStarted() {
if (!started_) { return Start(); }
return Status::OK();
MetadataVersion::type FileFooter::version() const {
return impl_->version();
}

Status FileWriter::WriteRecordBatch(
const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
RETURN_NOT_OK(CheckStarted());

int64_t offset = position_;
FileBlock FileFooter::record_batch(int i) const {
return impl_->record_batch(i);
}

// There may be padding ever the end of the metadata, so we cannot rely on
// position_
int32_t metadata_length;
int64_t body_length;
FileBlock FileFooter::dictionary(int i) const {
return impl_->dictionary(i);
}

// Frame of reference in file format is 0, see ARROW-384
const int64_t buffer_start_offset = 0;
RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
columns, num_rows, buffer_start_offset, sink_, &metadata_length, &body_length));
RETURN_NOT_OK(UpdatePosition());
Status FileFooter::GetSchema(std::shared_ptr<Schema>* out) const {
return impl_->GetSchema(out);
}

DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes";
// ----------------------------------------------------------------------
// File writer implementation

// Append metadata, to be written in the footer later
record_batches_.emplace_back(offset, metadata_length, body_length);
Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
std::shared_ptr<FileWriter>* out) {
*out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema)); // ctor is private
RETURN_NOT_OK((*out)->UpdatePosition());
return Status::OK();
}

Status FileWriter::Start() {
RETURN_NOT_OK(WriteAligned(
reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)));
started_ = true;
return Status::OK();
}

Status FileWriter::WriteRecordBatch(const RecordBatch& batch) {
// Push an empty FileBlock
// Append metadata, to be written in the footer later
record_batches_.emplace_back(0, 0, 0);
return BaseStreamWriter::WriteRecordBatch(
batch, &record_batches_[record_batches_.size() - 1]);
}

Status FileWriter::Close() {
// Write metadata
int64_t initial_position = position_;
RETURN_NOT_OK(WriteFileFooter(schema_.get(), dictionaries_, record_batches_, sink_));
RETURN_NOT_OK(WriteFileFooter(*schema_, dictionaries_, record_batches_, sink_));
RETURN_NOT_OK(UpdatePosition());

// Write footer length
Expand Down
Loading