diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000000..a00cbba065a --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Compiled source +*.a +*.dll +*.o +*.py[ocd] +*.so +*.dylib +.build_cache_dir +MANIFEST diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 0edb8ce410b..1a970081234 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -528,7 +528,6 @@ if(ARROW_BUILD_TESTS) ExternalProject_Add(gflags_ep GIT_REPOSITORY https://github.com/gflags/gflags.git GIT_TAG cce68f0c9c5d054017425e6e6fd54f696d36e8ee - # URL "https://github.com/gflags/gflags/archive/v${GFLAGS_VERSION}.tar.gz" BUILD_IN_SOURCE 1 CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DCMAKE_INSTALL_PREFIX=${GFLAGS_PREFIX} diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc index 54c21d2e62c..fad49cef899 100644 --- a/cpp/src/arrow/io/io-file-test.cc +++ b/cpp/src/arrow/io/io-file-test.cc @@ -19,7 +19,7 @@ #include #include #ifndef _MSC_VER -# include +#include #endif #include #include diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 71b0f1e29b2..af495e27e56 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -258,8 +258,11 @@ Status BufferOutputStream::Reserve(int64_t nbytes) { // ---------------------------------------------------------------------- // In-memory buffer reader -BufferReader::BufferReader(const uint8_t* buffer, int buffer_size) - : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} +BufferReader::BufferReader(const std::shared_ptr& buffer) + : buffer_(buffer), data_(buffer->data()), size_(buffer->size()), position_(0) {} + +BufferReader::BufferReader(const uint8_t* data, int64_t size) + : buffer_(nullptr), data_(data), size_(size), position_(0) {} BufferReader::~BufferReader() {} @@ -278,26 +281,32 @@ bool BufferReader::supports_zero_copy() const { } Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { - memcpy(buffer, buffer_ + position_, nbytes); - *bytes_read = std::min(nbytes, buffer_size_ - position_); + memcpy(buffer, data_ + position_, nbytes); + *bytes_read = std::min(nbytes, size_ - position_); position_ += *bytes_read; return Status::OK(); } Status BufferReader::Read(int64_t nbytes, std::shared_ptr* out) { - int64_t size = std::min(nbytes, buffer_size_ - position_); - *out = std::make_shared(buffer_ + position_, size); + int64_t size = std::min(nbytes, size_ - position_); + + if (buffer_ != nullptr) { + *out = SliceBuffer(buffer_, position_, size); + } else { + *out = std::make_shared(data_ + position_, size); + } + position_ += nbytes; return Status::OK(); } Status BufferReader::GetSize(int64_t* size) { - *size = buffer_size_; + *size = size_; return Status::OK(); } Status BufferReader::Seek(int64_t position) { - if (position < 0 || position >= buffer_size_) { + if (position < 0 || position >= size_) { return Status::IOError("position out of bounds"); } diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index df2fe8d6efb..b72f93b9391 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -99,7 +99,8 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { class ARROW_EXPORT BufferReader : public ReadableFileInterface { public: - BufferReader(const uint8_t* buffer, int buffer_size); + explicit BufferReader(const std::shared_ptr& buffer); + BufferReader(const uint8_t* data, int64_t size); ~BufferReader(); Status Close() override; @@ -116,8 +117,9 @@ class ARROW_EXPORT BufferReader : public ReadableFileInterface { bool supports_zero_copy() const override; private: - const uint8_t* buffer_; - int buffer_size_; + std::shared_ptr buffer_; + const uint8_t* data_; + int64_t size_; int64_t position_; }; diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index da718c08d54..edf716f6627 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -48,15 +48,6 @@ namespace flatbuf = org::apache::arrow::flatbuf; namespace ipc { -namespace { -Status CheckMultipleOf64(int64_t size) { - if (BitUtil::IsMultipleOf64(size)) { return Status::OK(); } - return Status::Invalid( - "Attempted to write a buffer that " - "wasn't a multiple of 64 bytes"); -} -} - static bool IsPrimitive(const DataType* type) { DCHECK(type != nullptr); switch (type->type) { @@ -124,30 +115,30 @@ Status VisitArray(const Array* arr, std::vector* field_nodes class RecordBatchWriter { public: RecordBatchWriter(const std::vector>& columns, int32_t num_rows, - int max_recursion_depth) + int64_t buffer_start_offset, int max_recursion_depth) : columns_(&columns), num_rows_(num_rows), + buffer_start_offset_(buffer_start_offset), max_recursion_depth_(max_recursion_depth) {} - Status AssemblePayload() { + Status AssemblePayload(int64_t* body_length) { + if (field_nodes_.size() > 0) { + field_nodes_.clear(); + buffer_meta_.clear(); + buffers_.clear(); + } + // Perform depth-first traversal of the row-batch for (size_t i = 0; i < columns_->size(); ++i) { const Array* arr = (*columns_)[i].get(); RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_, max_recursion_depth_)); } - return Status::OK(); - } - Status Write( - io::OutputStream* dst, int64_t* body_end_offset, int64_t* header_end_offset) { - // Get the starting position - int64_t start_position; - RETURN_NOT_OK(dst->Tell(&start_position)); - - // Keep track of the current position so we can determine the size of the - // message body - int64_t position = start_position; + // The position for the start of a buffer relative to the passed frame of + // reference. May be 0 or some other position in an address space + int64_t offset = buffer_start_offset_; + // Construct the buffer metadata for the record batch header for (size_t i = 0; i < buffers_.size(); ++i) { const Buffer* buffer = buffers_[i].get(); int64_t size = 0; @@ -161,65 +152,103 @@ class RecordBatchWriter { // TODO(wesm): We currently have no notion of shared memory page id's, // but we've included it in the metadata IDL for when we have it in the - // future. Use page=0 for now + // future. Use page = -1 for now // // Note that page ids are a bespoke notion for Arrow and not a feature we // are using from any OS-level shared memory. The thought is that systems // may (in the future) associate integer page id's with physical memory // pages (according to whatever is the desired shared memory mechanism) - buffer_meta_.push_back(flatbuf::Buffer(0, position, size + padding)); - - if (size > 0) { - RETURN_NOT_OK(dst->Write(buffer->data(), size)); - position += size; - } - - if (padding > 0) { - RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); - position += padding; - } + buffer_meta_.push_back(flatbuf::Buffer(-1, offset, size + padding)); + offset += size + padding; } - *body_end_offset = position; + *body_length = offset - buffer_start_offset_; + DCHECK(BitUtil::IsMultipleOf64(*body_length)); + + return Status::OK(); + } + Status WriteMetadata( + int64_t body_length, io::OutputStream* dst, int32_t* metadata_length) { // Now that we have computed the locations of all of the buffers in shared // memory, the data header can be converted to a flatbuffer and written out // // Note: The memory written here is prefixed by the size of the flatbuffer - // itself as an int32_t. On reading from a input, you will have to - // determine the data header size then request a buffer such that you can - // construct the flatbuffer data accessor object (see arrow::ipc::Message) - std::shared_ptr data_header; - RETURN_NOT_OK(WriteDataHeader( - num_rows_, position - start_position, field_nodes_, buffer_meta_, &data_header)); + // itself as an int32_t. + std::shared_ptr metadata_fb; + RETURN_NOT_OK(WriteRecordBatchMetadata( + num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb)); + + // Need to write 4 bytes (metadata size), the metadata, plus padding to + // fall on a 64-byte offset + int64_t padded_metadata_length = + BitUtil::RoundUpToMultipleOf64(metadata_fb->size() + 4); + + // The returned metadata size includes the length prefix, the flatbuffer, + // plus padding + *metadata_length = padded_metadata_length; - // Write the data header at the end - RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size())); + // Write the flatbuffer size prefix + int32_t flatbuffer_size = metadata_fb->size(); + RETURN_NOT_OK( + dst->Write(reinterpret_cast(&flatbuffer_size), sizeof(int32_t))); - position += data_header->size(); - *header_end_offset = position; + // Write the flatbuffer + RETURN_NOT_OK(dst->Write(metadata_fb->data(), metadata_fb->size())); - return Align(dst, &position); + // Write any padding + int64_t padding = padded_metadata_length - metadata_fb->size() - 4; + if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); } + + return Status::OK(); } - Status Align(io::OutputStream* dst, int64_t* position) { - // Write all buffers here on word boundaries - // TODO(wesm): Is there benefit to 64-byte padding in IPC? - int64_t remainder = PaddedLength(*position) - *position; - if (remainder > 0) { - RETURN_NOT_OK(dst->Write(kPaddingBytes, remainder)); - *position += remainder; + Status Write(io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) { + RETURN_NOT_OK(AssemblePayload(body_length)); + +#ifndef NDEBUG + int64_t start_position, current_position; + RETURN_NOT_OK(dst->Tell(&start_position)); +#endif + + RETURN_NOT_OK(WriteMetadata(*body_length, dst, metadata_length)); + +#ifndef NDEBUG + RETURN_NOT_OK(dst->Tell(¤t_position)); + DCHECK(BitUtil::IsMultipleOf8(current_position)); +#endif + + // Now write the buffers + for (size_t i = 0; i < buffers_.size(); ++i) { + const Buffer* buffer = buffers_[i].get(); + int64_t size = 0; + int64_t padding = 0; + + // The buffer might be null if we are handling zero row lengths. + if (buffer) { + size = buffer->size(); + padding = BitUtil::RoundUpToMultipleOf64(size) - size; + } + + if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); } + + if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); } } + +#ifndef NDEBUG + RETURN_NOT_OK(dst->Tell(¤t_position)); + DCHECK(BitUtil::IsMultipleOf8(current_position)); +#endif + return Status::OK(); } - // This must be called after invoking AssemblePayload Status GetTotalSize(int64_t* size) { // emulates the behavior of Write without actually writing - int64_t body_offset; - int64_t data_header_offset; + int32_t metadata_length; + int64_t body_length; MockOutputStream dst; - RETURN_NOT_OK(Write(&dst, &body_offset, &data_header_offset)); + RETURN_NOT_OK(Write(&dst, &metadata_length, &body_length)); *size = dst.GetExtentBytesWritten(); return Status::OK(); } @@ -228,6 +257,7 @@ class RecordBatchWriter { // Do not copy this vector. Ownership must be retained elsewhere const std::vector>* columns_; int32_t num_rows_; + int64_t buffer_start_offset_; std::vector field_nodes_; std::vector buffer_meta_; @@ -236,18 +266,17 @@ class RecordBatchWriter { }; Status WriteRecordBatch(const std::vector>& columns, - int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset, - int64_t* header_end_offset, int max_recursion_depth) { + int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst, + int32_t* metadata_length, int64_t* body_length, int max_recursion_depth) { DCHECK_GT(max_recursion_depth, 0); - RecordBatchWriter serializer(columns, num_rows, max_recursion_depth); - RETURN_NOT_OK(serializer.AssemblePayload()); - return serializer.Write(dst, body_end_offset, header_end_offset); + RecordBatchWriter serializer( + columns, num_rows, buffer_start_offset, max_recursion_depth); + return serializer.Write(dst, metadata_length, body_length); } Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) { RecordBatchWriter serializer( - batch->columns(), batch->num_rows(), kMaxIpcRecursionDepth); - RETURN_NOT_OK(serializer.AssemblePayload()); + batch->columns(), batch->num_rows(), 0, kMaxIpcRecursionDepth); RETURN_NOT_OK(serializer.GetTotalSize(size)); return Status::OK(); } @@ -255,30 +284,33 @@ Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) { // ---------------------------------------------------------------------- // Record batch read path -class RecordBatchReader::RecordBatchReaderImpl { +class RecordBatchReader { public: - RecordBatchReaderImpl(io::ReadableFileInterface* file, - const std::shared_ptr& metadata, int max_recursion_depth) - : file_(file), metadata_(metadata), max_recursion_depth_(max_recursion_depth) { + RecordBatchReader(const std::shared_ptr& metadata, + const std::shared_ptr& schema, int max_recursion_depth, + io::ReadableFileInterface* file) + : metadata_(metadata), + schema_(schema), + max_recursion_depth_(max_recursion_depth), + file_(file) { num_buffers_ = metadata->num_buffers(); num_flattened_fields_ = metadata->num_fields(); } - Status AssembleBatch( - const std::shared_ptr& schema, std::shared_ptr* out) { - std::vector> arrays(schema->num_fields()); + Status Read(std::shared_ptr* out) { + std::vector> arrays(schema_->num_fields()); // The field_index and buffer_index are incremented in NextArray based on // how much of the batch is "consumed" (through nested data reconstruction, // for example) field_index_ = 0; buffer_index_ = 0; - for (int i = 0; i < schema->num_fields(); ++i) { - const Field* field = schema->field(i).get(); + for (int i = 0; i < schema_->num_fields(); ++i) { + const Field* field = schema_->field(i).get(); RETURN_NOT_OK(NextArray(field, max_recursion_depth_, &arrays[i])); } - *out = std::make_shared(schema, metadata_->length(), arrays); + *out = std::make_shared(schema_, metadata_->length(), arrays); return Status::OK(); } @@ -370,67 +402,56 @@ class RecordBatchReader::RecordBatchReaderImpl { Status GetBuffer(int buffer_index, std::shared_ptr* out) { BufferMetadata metadata = metadata_->buffer(buffer_index); - RETURN_NOT_OK(CheckMultipleOf64(metadata.length)); - return file_->ReadAt(metadata.offset, metadata.length, out); + + if (metadata.length == 0) { + *out = std::make_shared(nullptr, 0); + return Status::OK(); + } else { + return file_->ReadAt(metadata.offset, metadata.length, out); + } } private: + std::shared_ptr metadata_; + std::shared_ptr schema_; + int max_recursion_depth_; io::ReadableFileInterface* file_; - std::shared_ptr metadata_; int field_index_; int buffer_index_; - int max_recursion_depth_; int num_buffers_; int num_flattened_fields_; }; -Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset, - std::shared_ptr* out) { - return Open(file, offset, kMaxIpcRecursionDepth, out); -} - -Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset, - int max_recursion_depth, std::shared_ptr* out) { +Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length, + io::ReadableFileInterface* file, std::shared_ptr* metadata) { std::shared_ptr buffer; - RETURN_NOT_OK(file->ReadAt(offset - sizeof(int32_t), sizeof(int32_t), &buffer)); - - int32_t metadata_size = *reinterpret_cast(buffer->data()); + RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer)); - if (metadata_size + static_cast(sizeof(int32_t)) > offset) { - return Status::Invalid("metadata size invalid"); - } - - // Read the metadata - RETURN_NOT_OK( - file->ReadAt(offset - metadata_size - sizeof(int32_t), metadata_size, &buffer)); - - // TODO(wesm): buffer slicing here would be better in case ReadAt returns - // allocated memory - - std::shared_ptr message; - RETURN_NOT_OK(Message::Open(buffer, &message)); + int32_t flatbuffer_size = *reinterpret_cast(buffer->data()); - if (message->type() != Message::RECORD_BATCH) { - return Status::Invalid("Metadata message is not a record batch"); + if (flatbuffer_size + static_cast(sizeof(int32_t)) > metadata_length) { + std::stringstream ss; + ss << "flatbuffer size " << metadata_length << " invalid. File offset: " << offset + << ", metadata length: " << metadata_length; + return Status::Invalid(ss.str()); } - std::shared_ptr batch_meta = message->GetRecordBatch(); - - std::shared_ptr result(new RecordBatchReader()); - result->impl_.reset(new RecordBatchReaderImpl(file, batch_meta, max_recursion_depth)); - *out = result; - + *metadata = std::make_shared(buffer, sizeof(int32_t)); return Status::OK(); } -// Here the explicit destructor is required for compilers to be aware of -// the complete information of RecordBatchReader::RecordBatchReaderImpl class -RecordBatchReader::~RecordBatchReader() {} +Status ReadRecordBatch(const std::shared_ptr& metadata, + const std::shared_ptr& schema, io::ReadableFileInterface* file, + std::shared_ptr* out) { + return ReadRecordBatch(metadata, schema, kMaxIpcRecursionDepth, file, out); +} -Status RecordBatchReader::GetRecordBatch( - const std::shared_ptr& schema, std::shared_ptr* out) { - return impl_->AssembleBatch(schema, out); +Status ReadRecordBatch(const std::shared_ptr& metadata, + const std::shared_ptr& schema, int max_recursion_depth, + io::ReadableFileInterface* file, std::shared_ptr* out) { + RecordBatchReader reader(metadata, schema, max_recursion_depth, file); + return reader.Read(out); } } // namespace ipc diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h index b02de284dfc..963b9ee3685 100644 --- a/cpp/src/arrow/ipc/adapter.h +++ b/cpp/src/arrow/ipc/adapter.h @@ -43,7 +43,7 @@ class OutputStream; namespace ipc { -class RecordBatchMessage; +class RecordBatchMetadata; // ---------------------------------------------------------------------- // Write path @@ -51,22 +51,30 @@ class RecordBatchMessage; // TODO(emkornfield) investigate this more constexpr int kMaxIpcRecursionDepth = 64; -// Write the RecordBatch (collection of equal-length Arrow arrays) to the output -// stream +// Write the RecordBatch (collection of equal-length Arrow arrays) to the +// output stream in a contiguous block. The record batch metadata is written as +// a flatbuffer (see format/Message.fbs -- the RecordBatch message type) +// prefixed by its size, followed by each of the memory buffers in the batch +// written end to end (with appropriate alignment and padding): // -// First, each of the memory buffers are written out end-to-end -// -// Then, this function writes the batch metadata as a flatbuffer (see -// format/Message.fbs -- the RecordBatch message type) like so: -// -// +// // // Finally, the absolute offsets (relative to the start of the output stream) // to the end of the body and end of the metadata / data header (suffixed by // the header size) is returned in out-variables +// +// @param(in) buffer_start_offset: the start offset to use in the buffer metadata, +// default should be 0 +// +// @param(out) metadata_length: the size of the length-prefixed flatbuffer +// including padding to a 64-byte boundary +// +// @param(out) body_length: the size of the contiguous buffer block plus +// padding bytes ARROW_EXPORT Status WriteRecordBatch(const std::vector>& columns, - int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset, - int64_t* header_end_offset, int max_recursion_depth = kMaxIpcRecursionDepth); + int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst, + int32_t* metadata_length, int64_t* body_length, + int max_recursion_depth = kMaxIpcRecursionDepth); // int64_t GetRecordBatchMetadata(const RecordBatch* batch); @@ -78,27 +86,20 @@ ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size); // ---------------------------------------------------------------------- // "Read" path; does not copy data if the input supports zero copy reads -class ARROW_EXPORT RecordBatchReader { - public: - // The offset is the absolute position to the *end* of the record batch data - // header - static Status Open(io::ReadableFileInterface* file, int64_t offset, - std::shared_ptr* out); - - static Status Open(io::ReadableFileInterface* file, int64_t offset, - int max_recursion_depth, std::shared_ptr* out); - - virtual ~RecordBatchReader(); - - // Reassemble the record batch. A Schema is required to be able to construct - // the right array containers - Status GetRecordBatch( - const std::shared_ptr& schema, std::shared_ptr* out); - - private: - class RecordBatchReaderImpl; - std::unique_ptr impl_; -}; +// Read the record batch flatbuffer metadata starting at the indicated file offset +// +// The flatbuffer is expected to be length-prefixed, so the metadata_length +// includes at least the length prefix and the flatbuffer +Status ARROW_EXPORT ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length, + io::ReadableFileInterface* file, std::shared_ptr* metadata); + +Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr& metadata, + const std::shared_ptr& schema, io::ReadableFileInterface* file, + std::shared_ptr* out); + +Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr& metadata, + const std::shared_ptr& schema, int max_recursion_depth, + io::ReadableFileInterface* file, std::shared_ptr* out); } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc index c68244d5025..06001cc1c77 100644 --- a/cpp/src/arrow/ipc/file.cc +++ b/cpp/src/arrow/ipc/file.cc @@ -23,6 +23,7 @@ #include #include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" #include "arrow/ipc/adapter.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" @@ -87,19 +88,19 @@ Status FileWriter::WriteRecordBatch( int64_t offset = position_; - int64_t body_end_offset; - int64_t header_end_offset; + // There may be padding ever the end of the metadata, so we cannot rely on + // position_ + int32_t metadata_length; + int64_t body_length; + + // Frame of reference in file format is 0, see ARROW-384 + const int64_t buffer_start_offset = 0; RETURN_NOT_OK(arrow::ipc::WriteRecordBatch( - columns, num_rows, sink_, &body_end_offset, &header_end_offset)); + columns, num_rows, buffer_start_offset, sink_, &metadata_length, &body_length)); RETURN_NOT_OK(UpdatePosition()); DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes"; - // There may be padding ever the end of the metadata, so we cannot rely on - // position_ - int32_t metadata_length = header_end_offset - body_end_offset; - int32_t body_length = body_end_offset - offset; - // Append metadata, to be written in the footer later record_batches_.emplace_back(offset, metadata_length, body_length); @@ -198,12 +199,18 @@ Status FileReader::GetRecordBatch(int i, std::shared_ptr* batch) { DCHECK_GE(i, 0); DCHECK_LT(i, num_record_batches()); FileBlock block = footer_->record_batch(i); - int64_t metadata_end_offset = block.offset + block.body_length + block.metadata_length; - std::shared_ptr reader; - RETURN_NOT_OK(RecordBatchReader::Open(file_.get(), metadata_end_offset, &reader)); + std::shared_ptr metadata; + RETURN_NOT_OK(ReadRecordBatchMetadata( + block.offset, block.metadata_length, file_.get(), &metadata)); + + // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see + // ARROW-384). + std::shared_ptr buffer_block; + RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block)); + io::BufferReader reader(buffer_block); - return reader->GetRecordBatch(schema_, batch); + return ReadRecordBatch(metadata, schema_, &reader, batch); } } // namespace ipc diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index f5611d4840c..1accfde7c48 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -54,17 +54,24 @@ class TestWriteRecordBatch : public ::testing::TestWithParam, std::string path = "test-write-row-batch"; io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); - int64_t body_end_offset; - int64_t header_end_offset; + int32_t metadata_length; + int64_t body_length; - RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), mmap_.get(), - &body_end_offset, &header_end_offset)); + const int64_t buffer_offset = 0; - std::shared_ptr reader; - RETURN_NOT_OK(RecordBatchReader::Open(mmap_.get(), header_end_offset, &reader)); + RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), buffer_offset, + mmap_.get(), &metadata_length, &body_length)); - RETURN_NOT_OK(reader->GetRecordBatch(batch.schema(), batch_result)); - return Status::OK(); + std::shared_ptr metadata; + RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata)); + + // The buffer offsets start at 0, so we must construct a + // ReadableFileInterface according to that frame of reference + std::shared_ptr buffer_payload; + RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload)); + io::BufferReader buffer_reader(buffer_payload); + + return ReadRecordBatch(metadata, batch.schema(), &buffer_reader, batch_result); } protected: @@ -96,11 +103,11 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch, void TestGetRecordBatchSize(std::shared_ptr batch) { ipc::MockOutputStream mock; - int64_t mock_header_offset = -1; - int64_t mock_body_offset = -1; + int32_t mock_metadata_length = -1; + int64_t mock_body_length = -1; int64_t size = -1; - ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), &mock, - &mock_body_offset, &mock_header_offset)); + ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), 0, &mock, + &mock_metadata_length, &mock_body_length)); ASSERT_OK(GetRecordBatchSize(batch.get(), &size)); ASSERT_EQ(mock.GetExtentBytesWritten(), size); } @@ -129,39 +136,36 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { void SetUp() { pool_ = default_memory_pool(); } void TearDown() { io::MemoryMapFixture::TearDown(); } - Status WriteToMmap(int recursion_level, bool override_level, - int64_t* header_out = nullptr, std::shared_ptr* schema_out = nullptr) { + Status WriteToMmap(int recursion_level, bool override_level, int32_t* metadata_length, + int64_t* body_length, std::shared_ptr* schema) { const int batch_length = 5; - TypePtr type = kInt32; + TypePtr type = int32(); ArrayPtr array; const bool include_nulls = true; RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array)); for (int i = 0; i < recursion_level; ++i) { - type = std::static_pointer_cast(std::make_shared(type)); + type = list(type); RETURN_NOT_OK( MakeRandomListArray(array, batch_length, include_nulls, pool_, &array)); } - auto f0 = std::make_shared("f0", type); - std::shared_ptr schema(new Schema({f0})); - if (schema_out != nullptr) { *schema_out = schema; } + auto f0 = field("f0", type); + + *schema = std::shared_ptr(new Schema({f0})); + std::vector arrays = {array}; - auto batch = std::make_shared(schema, batch_length, arrays); + auto batch = std::make_shared(*schema, batch_length, arrays); std::string path = "test-write-past-max-recursion"; const int memory_map_size = 1 << 16; io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); - int64_t body_offset; - int64_t header_offset; - - int64_t* header_out_param = header_out == nullptr ? &header_offset : header_out; if (override_level) { - return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(), - &body_offset, header_out_param, recursion_level + 1); + return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(), + metadata_length, body_length, recursion_level + 1); } else { - return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(), - &body_offset, header_out_param); + return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(), + metadata_length, body_length); } } @@ -171,18 +175,29 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { }; TEST_F(RecursionLimits, WriteLimit) { - ASSERT_RAISES(Invalid, WriteToMmap((1 << 8) + 1, false)); + int32_t metadata_length = -1; + int64_t body_length = -1; + std::shared_ptr schema; + ASSERT_RAISES( + Invalid, WriteToMmap((1 << 8) + 1, false, &metadata_length, &body_length, &schema)); } TEST_F(RecursionLimits, ReadLimit) { - int64_t header_offset = -1; + int32_t metadata_length = -1; + int64_t body_length = -1; std::shared_ptr schema; - ASSERT_OK(WriteToMmap(64, true, &header_offset, &schema)); + ASSERT_OK(WriteToMmap(64, true, &metadata_length, &body_length, &schema)); - std::shared_ptr reader; - ASSERT_OK(RecordBatchReader::Open(mmap_.get(), header_offset, &reader)); - std::shared_ptr batch_result; - ASSERT_RAISES(Invalid, reader->GetRecordBatch(schema, &batch_result)); + std::shared_ptr metadata; + ASSERT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata)); + + std::shared_ptr payload; + ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload)); + + io::BufferReader reader(payload); + + std::shared_ptr batch; + ASSERT_RAISES(Invalid, ReadRecordBatch(metadata, schema, &reader, &batch)); } } // namespace ipc diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc index cd424bf385c..a1feac401f2 100644 --- a/cpp/src/arrow/ipc/ipc-file-test.cc +++ b/cpp/src/arrow/ipc/ipc-file-test.cc @@ -68,7 +68,7 @@ class TestFileFormat : public ::testing::TestWithParam { RETURN_NOT_OK(sink_->Tell(&footer_offset)); // Open the file - auto reader = std::make_shared(buffer_->data(), buffer_->size()); + auto reader = std::make_shared(buffer_); RETURN_NOT_OK(FileReader::Open(reader, footer_offset, &file_reader_)); EXPECT_EQ(num_batches, file_reader_->num_record_batches()); diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc index a51371c6200..e5c3a081fca 100644 --- a/cpp/src/arrow/ipc/ipc-json-test.cc +++ b/cpp/src/arrow/ipc/ipc-json-test.cc @@ -284,19 +284,23 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) { "name": "foo", "type": {"name": "int", "isSigned": true, "bitWidth": 32}, "nullable": true, "children": [], - "typeLayout": [ - {"type": "VALIDITY", "typeBitWidth": 1}, - {"type": "DATA", "typeBitWidth": 32} - ] + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 32} + ] + } }, { "name": "bar", "type": {"name": "floatingpoint", "precision": "DOUBLE"}, "nullable": true, "children": [], - "typeLayout": [ - {"type": "VALIDITY", "typeBitWidth": 1}, - {"type": "DATA", "typeBitWidth": 64} - ] + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 64} + ] + } } ] }, diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc index 1dc39692332..d29583f8488 100644 --- a/cpp/src/arrow/ipc/ipc-metadata-test.cc +++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc @@ -43,7 +43,7 @@ static inline void assert_schema_equal(const Schema* lhs, const Schema* rhs) { } } -class TestSchemaMessage : public ::testing::Test { +class TestSchemaMetadata : public ::testing::Test { public: void SetUp() {} @@ -52,11 +52,11 @@ class TestSchemaMessage : public ::testing::Test { ASSERT_OK(WriteSchema(schema, &buffer)); std::shared_ptr message; - ASSERT_OK(Message::Open(buffer, &message)); + ASSERT_OK(Message::Open(buffer, 0, &message)); ASSERT_EQ(Message::SCHEMA, message->type()); - std::shared_ptr schema_msg = message->GetSchema(); + auto schema_msg = std::make_shared(message); ASSERT_EQ(schema->num_fields(), schema_msg->num_fields()); std::shared_ptr schema2; @@ -68,7 +68,7 @@ class TestSchemaMessage : public ::testing::Test { const std::shared_ptr INT32 = std::make_shared(); -TEST_F(TestSchemaMessage, PrimitiveFields) { +TEST_F(TestSchemaMetadata, PrimitiveFields) { auto f0 = std::make_shared("f0", std::make_shared()); auto f1 = std::make_shared("f1", std::make_shared()); auto f2 = std::make_shared("f2", std::make_shared()); @@ -85,7 +85,7 @@ TEST_F(TestSchemaMessage, PrimitiveFields) { CheckRoundtrip(&schema); } -TEST_F(TestSchemaMessage, NestedFields) { +TEST_F(TestSchemaMetadata, NestedFields) { auto type = std::make_shared(std::make_shared()); auto f0 = std::make_shared("f0", type); @@ -111,7 +111,7 @@ class TestFileFooter : public ::testing::Test { std::unique_ptr footer; ASSERT_OK(FileFooter::Open(buffer, &footer)); - ASSERT_EQ(MetadataVersion::V1_SNAPSHOT, footer->version()); + ASSERT_EQ(MetadataVersion::V2, footer->version()); // Check schema std::shared_ptr schema2; diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index 5eff8998afb..7a313f791e6 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -255,19 +255,23 @@ static const char* JSON_EXAMPLE = R"example( "name": "foo", "type": {"name": "int", "isSigned": true, "bitWidth": 32}, "nullable": true, "children": [], - "typeLayout": [ - {"type": "VALIDITY", "typeBitWidth": 1}, - {"type": "DATA", "typeBitWidth": 32} - ] + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 32} + ] + } }, { "name": "bar", "type": {"name": "floatingpoint", "precision": "DOUBLE"}, "nullable": true, "children": [], - "typeLayout": [ - {"type": "VALIDITY", "typeBitWidth": 1}, - {"type": "DATA", "typeBitWidth": 64} - ] + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 64} + ] + } } ] }, @@ -301,10 +305,12 @@ static const char* JSON_EXAMPLE2 = R"example( "name": "foo", "type": {"name": "int", "isSigned": true, "bitWidth": 32}, "nullable": true, "children": [], - "typeLayout": [ - {"type": "VALIDITY", "typeBitWidth": 1}, - {"type": "DATA", "typeBitWidth": 32} - ] + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 32} + ] + } } ] }, diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc index 31fe35b44ce..e56bcb32b94 100644 --- a/cpp/src/arrow/ipc/json-internal.cc +++ b/cpp/src/arrow/ipc/json-internal.cc @@ -45,8 +45,6 @@ namespace ipc { using RjArray = rj::Value::ConstArray; using RjObject = rj::Value::ConstObject; -enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY }; - static std::string GetBufferTypeName(BufferType type) { switch (type) { case BufferType::DATA: @@ -93,27 +91,6 @@ static std::string GetTimeUnitName(TimeUnit unit) { return "UNKNOWN"; } -class BufferLayout { - public: - BufferLayout(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {} - - BufferType type() const { return type_; } - int bit_width() const { return bit_width_; } - - private: - BufferType type_; - int bit_width_; -}; - -static const BufferLayout kValidityBuffer(BufferType::VALIDITY, 1); -static const BufferLayout kOffsetBuffer(BufferType::OFFSET, 32); -static const BufferLayout kTypeBuffer(BufferType::TYPE, 32); -static const BufferLayout kBooleanBuffer(BufferType::DATA, 1); -static const BufferLayout kValues64(BufferType::DATA, 64); -static const BufferLayout kValues32(BufferType::DATA, 32); -static const BufferLayout kValues16(BufferType::DATA, 16); -static const BufferLayout kValues8(BufferType::DATA, 8); - class JsonSchemaWriter : public TypeVisitor { public: explicit JsonSchemaWriter(const Schema& schema, RjWriter* writer) @@ -154,9 +131,9 @@ class JsonSchemaWriter : public TypeVisitor { } template - typename std::enable_if::value || - std::is_base_of::value || - std::is_base_of::value, + typename std::enable_if< + std::is_base_of::value || std::is_base_of::value || + std::is_base_of::value || std::is_base_of::value, void>::type WriteTypeMetadata(const T& type) {} @@ -243,11 +220,10 @@ class JsonSchemaWriter : public TypeVisitor { } template - Status WritePrimitive(const std::string& typeclass, const T& type, - const std::vector& buffer_layout) { + Status WritePrimitive(const std::string& typeclass, const T& type) { WriteName(typeclass, type); SetNoChildren(); - WriteBufferLayout(buffer_layout); + WriteBufferLayout(type.GetBufferLayout()); return Status::OK(); } @@ -255,15 +231,17 @@ class JsonSchemaWriter : public TypeVisitor { Status WriteVarBytes(const std::string& typeclass, const T& type) { WriteName(typeclass, type); SetNoChildren(); - WriteBufferLayout({kValidityBuffer, kOffsetBuffer, kValues8}); + WriteBufferLayout(type.GetBufferLayout()); return Status::OK(); } - void WriteBufferLayout(const std::vector& buffer_layout) { + void WriteBufferLayout(const std::vector& buffer_layout) { writer_->Key("typeLayout"); + writer_->StartObject(); + writer_->Key("vectors"); writer_->StartArray(); - for (const BufferLayout& buffer : buffer_layout) { + for (const BufferDescr& buffer : buffer_layout) { writer_->StartObject(); writer_->Key("type"); writer_->String(GetBufferTypeName(buffer.type())); @@ -274,6 +252,7 @@ class JsonSchemaWriter : public TypeVisitor { writer_->EndObject(); } writer_->EndArray(); + writer_->EndObject(); } Status WriteChildren(const std::vector>& children) { @@ -286,74 +265,52 @@ class JsonSchemaWriter : public TypeVisitor { return Status::OK(); } - Status Visit(const NullType& type) override { return WritePrimitive("null", type, {}); } + Status Visit(const NullType& type) override { return WritePrimitive("null", type); } - Status Visit(const BooleanType& type) override { - return WritePrimitive("bool", type, {kValidityBuffer, kBooleanBuffer}); - } + Status Visit(const BooleanType& type) override { return WritePrimitive("bool", type); } - Status Visit(const Int8Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues8}); - } + Status Visit(const Int8Type& type) override { return WritePrimitive("int", type); } - Status Visit(const Int16Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues16}); - } + Status Visit(const Int16Type& type) override { return WritePrimitive("int", type); } - Status Visit(const Int32Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues32}); - } + Status Visit(const Int32Type& type) override { return WritePrimitive("int", type); } - Status Visit(const Int64Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues64}); - } + Status Visit(const Int64Type& type) override { return WritePrimitive("int", type); } - Status Visit(const UInt8Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues8}); - } + Status Visit(const UInt8Type& type) override { return WritePrimitive("int", type); } - Status Visit(const UInt16Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues16}); - } + Status Visit(const UInt16Type& type) override { return WritePrimitive("int", type); } - Status Visit(const UInt32Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues32}); - } + Status Visit(const UInt32Type& type) override { return WritePrimitive("int", type); } - Status Visit(const UInt64Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues64}); - } + Status Visit(const UInt64Type& type) override { return WritePrimitive("int", type); } Status Visit(const HalfFloatType& type) override { - return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues16}); + return WritePrimitive("floatingpoint", type); } Status Visit(const FloatType& type) override { - return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues32}); + return WritePrimitive("floatingpoint", type); } Status Visit(const DoubleType& type) override { - return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues64}); + return WritePrimitive("floatingpoint", type); } Status Visit(const StringType& type) override { return WriteVarBytes("utf8", type); } Status Visit(const BinaryType& type) override { return WriteVarBytes("binary", type); } - Status Visit(const DateType& type) override { - return WritePrimitive("date", type, {kValidityBuffer, kValues64}); - } + Status Visit(const DateType& type) override { return WritePrimitive("date", type); } - Status Visit(const TimeType& type) override { - return WritePrimitive("time", type, {kValidityBuffer, kValues64}); - } + Status Visit(const TimeType& type) override { return WritePrimitive("time", type); } Status Visit(const TimestampType& type) override { - return WritePrimitive("timestamp", type, {kValidityBuffer, kValues64}); + return WritePrimitive("timestamp", type); } Status Visit(const IntervalType& type) override { - return WritePrimitive("interval", type, {kValidityBuffer, kValues64}); + return WritePrimitive("interval", type); } Status Visit(const DecimalType& type) override { return Status::NotImplemented("NYI"); } @@ -361,26 +318,21 @@ class JsonSchemaWriter : public TypeVisitor { Status Visit(const ListType& type) override { WriteName("list", type); RETURN_NOT_OK(WriteChildren(type.children())); - WriteBufferLayout({kValidityBuffer, kOffsetBuffer}); + WriteBufferLayout(type.GetBufferLayout()); return Status::OK(); } Status Visit(const StructType& type) override { WriteName("struct", type); WriteChildren(type.children()); - WriteBufferLayout({kValidityBuffer, kTypeBuffer}); + WriteBufferLayout(type.GetBufferLayout()); return Status::OK(); } Status Visit(const UnionType& type) override { WriteName("union", type); WriteChildren(type.children()); - - if (type.mode == UnionMode::SPARSE) { - WriteBufferLayout({kValidityBuffer, kTypeBuffer}); - } else { - WriteBufferLayout({kValidityBuffer, kTypeBuffer, kOffsetBuffer}); - } + WriteBufferLayout(type.GetBufferLayout()); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 7102012c29a..b99522825d9 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -37,20 +37,6 @@ namespace flatbuf = org::apache::arrow::flatbuf; namespace ipc { -const std::shared_ptr BOOL = std::make_shared(); -const std::shared_ptr INT8 = std::make_shared(); -const std::shared_ptr INT16 = std::make_shared(); -const std::shared_ptr INT32 = std::make_shared(); -const std::shared_ptr INT64 = std::make_shared(); -const std::shared_ptr UINT8 = std::make_shared(); -const std::shared_ptr UINT16 = std::make_shared(); -const std::shared_ptr UINT32 = std::make_shared(); -const std::shared_ptr UINT64 = std::make_shared(); -const std::shared_ptr FLOAT = std::make_shared(); -const std::shared_ptr DOUBLE = std::make_shared(); -const std::shared_ptr STRING = std::make_shared(); -const std::shared_ptr BINARY = std::make_shared(); - static Status IntFromFlatbuffer( const flatbuf::Int* int_data, std::shared_ptr* out) { if (int_data->bitWidth() > 64) { @@ -62,16 +48,16 @@ static Status IntFromFlatbuffer( switch (int_data->bitWidth()) { case 8: - *out = int_data->is_signed() ? INT8 : UINT8; + *out = int_data->is_signed() ? int8() : uint8(); break; case 16: - *out = int_data->is_signed() ? INT16 : UINT16; + *out = int_data->is_signed() ? int16() : uint16(); break; case 32: - *out = int_data->is_signed() ? INT32 : UINT32; + *out = int_data->is_signed() ? int32() : uint32(); break; case 64: - *out = int_data->is_signed() ? INT64 : UINT64; + *out = int_data->is_signed() ? int64() : uint64(); break; default: return Status::NotImplemented("Integers not in cstdint are not implemented"); @@ -81,10 +67,12 @@ static Status IntFromFlatbuffer( static Status FloatFromFlatuffer( const flatbuf::FloatingPoint* float_data, std::shared_ptr* out) { - if (float_data->precision() == flatbuf::Precision_SINGLE) { - *out = FLOAT; + if (float_data->precision() == flatbuf::Precision_HALF) { + *out = float16(); + } else if (float_data->precision() == flatbuf::Precision_SINGLE) { + *out = float32(); } else { - *out = DOUBLE; + *out = float64(); } return Status::OK(); } @@ -100,13 +88,13 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, return FloatFromFlatuffer( static_cast(type_data), out); case flatbuf::Type_Binary: - *out = BINARY; + *out = binary(); return Status::OK(); case flatbuf::Type_Utf8: - *out = STRING; + *out = utf8(); return Status::OK(); case flatbuf::Type_Bool: - *out = BOOL; + *out = boolean(); return Status::OK(); case flatbuf::Type_Decimal: case flatbuf::Type_Timestamp: @@ -164,7 +152,32 @@ static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr& type break; static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr& type, - std::vector* children, flatbuf::Type* out_type, Offset* offset) { + std::vector* children, std::vector* layout, + flatbuf::Type* out_type, Offset* offset) { + std::vector buffer_layout = type->GetBufferLayout(); + for (const BufferDescr& descr : buffer_layout) { + flatbuf::VectorType vector_type; + switch (descr.type()) { + case BufferType::OFFSET: + vector_type = flatbuf::VectorType_OFFSET; + break; + case BufferType::DATA: + vector_type = flatbuf::VectorType_DATA; + break; + case BufferType::VALIDITY: + vector_type = flatbuf::VectorType_VALIDITY; + break; + case BufferType::TYPE: + vector_type = flatbuf::VectorType_TYPE; + break; + default: + vector_type = flatbuf::VectorType_DATA; + break; + } + auto offset = flatbuf::CreateVectorLayout(fbb, descr.bit_width(), vector_type); + layout->push_back(offset); + } + switch (type->type) { case Type::BOOL: *out_type = flatbuf::Type_Bool; @@ -223,14 +236,18 @@ static Status FieldToFlatbuffer( flatbuf::Type type_enum; Offset type_data; + Offset type_layout; std::vector children; + std::vector layout; - RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type, &children, &type_enum, &type_data)); + RETURN_NOT_OK( + TypeToFlatbuffer(fbb, field->type, &children, &layout, &type_enum, &type_data)); auto fb_children = fbb.CreateVector(children); + auto fb_layout = fbb.CreateVector(layout); // TODO: produce the list of VectorTypes *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_data, - field->dictionary, fb_children); + field->dictionary, fb_children, fb_layout); return Status::OK(); } @@ -300,13 +317,26 @@ Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length, return Status::OK(); } -Status WriteDataHeader(int32_t length, int64_t body_length, +Status WriteRecordBatchMetadata(int32_t length, int64_t body_length, const std::vector& nodes, const std::vector& buffers, std::shared_ptr* out) { - MessageBuilder message; - RETURN_NOT_OK(message.SetRecordBatch(length, body_length, nodes, buffers)); - RETURN_NOT_OK(message.Finish()); - return message.GetBuffer(out); + flatbuffers::FlatBufferBuilder fbb; + + auto batch = flatbuf::CreateRecordBatch( + fbb, length, fbb.CreateVectorOfStructs(nodes), fbb.CreateVectorOfStructs(buffers)); + + fbb.Finish(batch); + + int32_t size = fbb.GetSize(); + + auto result = std::make_shared(); + RETURN_NOT_OK(result->Resize(size)); + + uint8_t* dst = result->mutable_data(); + memcpy(dst, fbb.GetBufferPointer(), size); + + *out = result; + return Status::OK(); } Status MessageBuilder::Finish() { @@ -317,17 +347,13 @@ Status MessageBuilder::Finish() { } Status MessageBuilder::GetBuffer(std::shared_ptr* out) { - // The message buffer is suffixed by the size of the complete flatbuffer as - // int32_t - // int32_t size = fbb_.GetSize(); auto result = std::make_shared(); - RETURN_NOT_OK(result->Resize(size + sizeof(int32_t))); + RETURN_NOT_OK(result->Resize(size)); uint8_t* dst = result->mutable_data(); memcpy(dst, fbb_.GetBufferPointer(), size); - memcpy(dst + size, reinterpret_cast(&size), sizeof(int32_t)); *out = result; return Status::OK(); diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index c404cfde22c..4826ebe2289 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -41,10 +41,10 @@ namespace ipc { using FBB = flatbuffers::FlatBufferBuilder; using FieldOffset = flatbuffers::Offset; +using VectorLayoutOffset = flatbuffers::Offset; using Offset = flatbuffers::Offset; -static constexpr flatbuf::MetadataVersion kMetadataVersion = - flatbuf::MetadataVersion_V1_SNAPSHOT; +static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2; Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr* out); @@ -70,7 +70,7 @@ class MessageBuilder { flatbuffers::FlatBufferBuilder fbb_; }; -Status WriteDataHeader(int32_t length, int64_t body_length, +Status WriteRecordBatchMetadata(int32_t length, int64_t body_length, const std::vector& nodes, const std::vector& buffers, std::shared_ptr* out); diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index 66df8a6711f..44d3939c04f 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -50,9 +50,15 @@ Status WriteSchema(const Schema* schema, std::shared_ptr* out) { class Message::MessageImpl { public: - explicit MessageImpl( - const std::shared_ptr& buffer, const flatbuf::Message* message) - : buffer_(buffer), message_(message) {} + explicit MessageImpl(const std::shared_ptr& buffer, int64_t offset) + : buffer_(buffer), offset_(offset), message_(nullptr) {} + + Status Open() { + message_ = flatbuf::GetMessage(buffer_->data() + offset_); + + // TODO(wesm): verify the message + return Status::OK(); + } Message::Type type() const { switch (message_->header_type()) { @@ -72,25 +78,23 @@ class Message::MessageImpl { int64_t body_length() const { return message_->bodyLength(); } private: - // Owns the memory this message accesses + // Retain reference to memory std::shared_ptr buffer_; + int64_t offset_; const flatbuf::Message* message_; }; -Message::Message() {} - -Status Message::Open( - const std::shared_ptr& buffer, std::shared_ptr* out) { - std::shared_ptr result(new Message()); - - const flatbuf::Message* message = flatbuf::GetMessage(buffer->data()); +Message::Message(const std::shared_ptr& buffer, int64_t offset) { + impl_.reset(new MessageImpl(buffer, offset)); +} - // TODO(wesm): verify message - result->impl_.reset(new MessageImpl(buffer, message)); - *out = result; +Status Message::Open(const std::shared_ptr& buffer, int64_t offset, + std::shared_ptr* out) { + // ctor is private - return Status::OK(); + *out = std::shared_ptr(new Message(buffer, offset)); + return (*out)->impl_->Open(); } Message::Type Message::type() const { @@ -101,20 +105,12 @@ int64_t Message::body_length() const { return impl_->body_length(); } -std::shared_ptr Message::get_shared_ptr() { - return this->shared_from_this(); -} - -std::shared_ptr Message::GetSchema() { - return std::make_shared(this->shared_from_this(), impl_->header()); -} - // ---------------------------------------------------------------------- -// SchemaMessage +// SchemaMetadata -class SchemaMessage::SchemaMessageImpl { +class SchemaMetadata::SchemaMetadataImpl { public: - explicit SchemaMessageImpl(const void* schema) + explicit SchemaMetadataImpl(const void* schema) : schema_(static_cast(schema)) {} const flatbuf::Field* field(int i) const { return schema_->fields()->Get(i); } @@ -125,22 +121,29 @@ class SchemaMessage::SchemaMessageImpl { const flatbuf::Schema* schema_; }; -SchemaMessage::SchemaMessage( - const std::shared_ptr& message, const void* schema) { +SchemaMetadata::SchemaMetadata( + const std::shared_ptr& message, const void* flatbuf) { + message_ = message; + impl_.reset(new SchemaMetadataImpl(flatbuf)); +} + +SchemaMetadata::SchemaMetadata(const std::shared_ptr& message) { message_ = message; - impl_.reset(new SchemaMessageImpl(schema)); + impl_.reset(new SchemaMetadataImpl(message->impl_->header())); } -int SchemaMessage::num_fields() const { +SchemaMetadata::~SchemaMetadata() {} + +int SchemaMetadata::num_fields() const { return impl_->num_fields(); } -Status SchemaMessage::GetField(int i, std::shared_ptr* out) const { +Status SchemaMetadata::GetField(int i, std::shared_ptr* out) const { const flatbuf::Field* field = impl_->field(i); return FieldFromFlatbuffer(field, out); } -Status SchemaMessage::GetSchema(std::shared_ptr* out) const { +Status SchemaMetadata::GetSchema(std::shared_ptr* out) const { std::vector> fields(num_fields()); for (int i = 0; i < this->num_fields(); ++i) { RETURN_NOT_OK(GetField(i, &fields[i])); @@ -150,11 +153,11 @@ Status SchemaMessage::GetSchema(std::shared_ptr* out) const { } // ---------------------------------------------------------------------- -// RecordBatchMessage +// RecordBatchMetadata -class RecordBatchMessage::RecordBatchMessageImpl { +class RecordBatchMetadata::RecordBatchMetadataImpl { public: - explicit RecordBatchMessageImpl(const void* batch) + explicit RecordBatchMetadataImpl(const void* batch) : batch_(static_cast(batch)) { nodes_ = batch_->nodes(); buffers_ = batch_->buffers(); @@ -176,19 +179,29 @@ class RecordBatchMessage::RecordBatchMessageImpl { const flatbuffers::Vector* buffers_; }; -std::shared_ptr Message::GetRecordBatch() { - return std::make_shared(this->shared_from_this(), impl_->header()); +RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr& message) { + message_ = message; + impl_.reset(new RecordBatchMetadataImpl(message->impl_->header())); } -RecordBatchMessage::RecordBatchMessage( - const std::shared_ptr& message, const void* batch) { - message_ = message; - impl_.reset(new RecordBatchMessageImpl(batch)); +RecordBatchMetadata::RecordBatchMetadata( + const std::shared_ptr& buffer, int64_t offset) { + message_ = nullptr; + buffer_ = buffer; + + const flatbuf::RecordBatch* metadata = + flatbuffers::GetRoot(buffer->data() + offset); + + // TODO(wesm): validate table + + impl_.reset(new RecordBatchMetadataImpl(metadata)); } +RecordBatchMetadata::~RecordBatchMetadata() {} + // TODO(wesm): Copying the flatbuffer data isn't great, but this will do for // now -FieldMetadata RecordBatchMessage::field(int i) const { +FieldMetadata RecordBatchMetadata::field(int i) const { const flatbuf::FieldNode* node = impl_->field(i); FieldMetadata result; @@ -197,7 +210,7 @@ FieldMetadata RecordBatchMessage::field(int i) const { return result; } -BufferMetadata RecordBatchMessage::buffer(int i) const { +BufferMetadata RecordBatchMetadata::buffer(int i) const { const flatbuf::Buffer* buffer = impl_->buffer(i); BufferMetadata result; @@ -207,15 +220,15 @@ BufferMetadata RecordBatchMessage::buffer(int i) const { return result; } -int32_t RecordBatchMessage::length() const { +int32_t RecordBatchMetadata::length() const { return impl_->length(); } -int RecordBatchMessage::num_buffers() const { +int RecordBatchMetadata::num_buffers() const { return impl_->num_buffers(); } -int RecordBatchMessage::num_fields() const { +int RecordBatchMetadata::num_fields() const { return impl_->num_fields(); } @@ -268,11 +281,13 @@ class FileFooter::FileFooterImpl { MetadataVersion::type version() const { switch (footer_->version()) { - case flatbuf::MetadataVersion_V1_SNAPSHOT: - return MetadataVersion::V1_SNAPSHOT; + case flatbuf::MetadataVersion_V1: + return MetadataVersion::V1; + case flatbuf::MetadataVersion_V2: + return MetadataVersion::V2; // Add cases as other versions become available default: - return MetadataVersion::V1_SNAPSHOT; + return MetadataVersion::V2; } } @@ -285,7 +300,7 @@ class FileFooter::FileFooterImpl { } Status GetSchema(std::shared_ptr* out) const { - auto schema_msg = std::make_shared(nullptr, footer_->schema()); + auto schema_msg = std::make_shared(nullptr, footer_->schema()); return schema_msg->GetSchema(out); } diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 2f0e853bf97..1c4ef64d62f 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -42,7 +42,7 @@ class OutputStream; namespace ipc { struct MetadataVersion { - enum type { V1_SNAPSHOT }; + enum type { V1, V2 }; }; //---------------------------------------------------------------------- @@ -58,10 +58,14 @@ Status WriteSchema(const Schema* schema, std::shared_ptr* out); class Message; // Container for serialized Schema metadata contained in an IPC message -class ARROW_EXPORT SchemaMessage { +class ARROW_EXPORT SchemaMetadata { public: + explicit SchemaMetadata(const std::shared_ptr& message); + // Accepts an opaque flatbuffer pointer - SchemaMessage(const std::shared_ptr& message, const void* schema); + SchemaMetadata(const std::shared_ptr& message, const void* schema); + + ~SchemaMetadata(); int num_fields() const; @@ -76,8 +80,8 @@ class ARROW_EXPORT SchemaMessage { // Parent, owns the flatbuffer data std::shared_ptr message_; - class SchemaMessageImpl; - std::unique_ptr impl_; + class SchemaMetadataImpl; + std::unique_ptr impl_; }; // Field metadata @@ -93,10 +97,13 @@ struct BufferMetadata { }; // Container for serialized record batch metadata contained in an IPC message -class ARROW_EXPORT RecordBatchMessage { +class ARROW_EXPORT RecordBatchMetadata { public: - // Accepts an opaque flatbuffer pointer - RecordBatchMessage(const std::shared_ptr& message, const void* batch_meta); + explicit RecordBatchMetadata(const std::shared_ptr& message); + + RecordBatchMetadata(const std::shared_ptr& message, int64_t offset); + + ~RecordBatchMetadata(); FieldMetadata field(int i) const; BufferMetadata buffer(int i) const; @@ -108,37 +115,34 @@ class ARROW_EXPORT RecordBatchMessage { private: // Parent, owns the flatbuffer data std::shared_ptr message_; + std::shared_ptr buffer_; - class RecordBatchMessageImpl; - std::unique_ptr impl_; + class RecordBatchMetadataImpl; + std::unique_ptr impl_; }; -class ARROW_EXPORT DictionaryBatchMessage { +class ARROW_EXPORT DictionaryBatchMetadata { public: int64_t id() const; - std::unique_ptr data() const; + std::unique_ptr data() const; }; -class ARROW_EXPORT Message : public std::enable_shared_from_this { +class ARROW_EXPORT Message { public: enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH }; - static Status Open( - const std::shared_ptr& buffer, std::shared_ptr* out); - - std::shared_ptr get_shared_ptr(); + static Status Open(const std::shared_ptr& buffer, int64_t offset, + std::shared_ptr* out); int64_t body_length() const; Type type() const; - // These methods only to be invoked if you have checked the message type - std::shared_ptr GetSchema(); - std::shared_ptr GetRecordBatch(); - std::shared_ptr GetDictionaryBatch(); - private: - Message(); + Message(const std::shared_ptr& buffer, int64_t offset); + + friend class RecordBatchMetadata; + friend class SchemaMetadata; // Hide serialization details from user API class MessageImpl; diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 9abc20d876d..65b37821522 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -39,8 +39,7 @@ namespace arrow { namespace ipc { -const auto kInt32 = std::make_shared(); -const auto kListInt32 = list(kInt32); +const auto kListInt32 = list(int32()); const auto kListListInt32 = list(kListInt32); Status MakeRandomInt32Array( @@ -99,8 +98,8 @@ Status MakeIntRecordBatch(std::shared_ptr* out) { const int length = 1000; // Make the schema - auto f0 = std::make_shared("f0", kInt32); - auto f1 = std::make_shared("f1", kInt32); + auto f0 = std::make_shared("f0", int32()); + auto f1 = std::make_shared("f1", int32()); std::shared_ptr schema(new Schema({f0, f1})); // Example data @@ -161,7 +160,7 @@ Status MakeListRecordBatch(std::shared_ptr* out) { // Make the schema auto f0 = std::make_shared("f0", kListInt32); auto f1 = std::make_shared("f1", kListListInt32); - auto f2 = std::make_shared("f2", kInt32); + auto f2 = std::make_shared("f2", int32()); std::shared_ptr schema(new Schema({f0, f1, f2})); // Example data @@ -184,7 +183,7 @@ Status MakeZeroLengthRecordBatch(std::shared_ptr* out) { // Make the schema auto f0 = std::make_shared("f0", kListInt32); auto f1 = std::make_shared("f1", kListListInt32); - auto f2 = std::make_shared("f2", kInt32); + auto f2 = std::make_shared("f2", int32()); std::shared_ptr schema(new Schema({f0, f1, f2})); // Example data @@ -205,7 +204,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr* out) { // Make the schema auto f0 = std::make_shared("f0", kListInt32); auto f1 = std::make_shared("f1", kListListInt32); - auto f2 = std::make_shared("f2", kInt32); + auto f2 = std::make_shared("f2", int32()); std::shared_ptr schema(new Schema({f0, f1, f2})); // Example data @@ -226,7 +225,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr* out) { Status MakeDeeplyNestedList(std::shared_ptr* out) { const int batch_length = 5; - TypePtr type = kInt32; + TypePtr type = int32(); MemoryPool* pool = default_memory_pool(); ArrayPtr array; diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h index 9000d1bb0c6..242d6624f1e 100644 --- a/cpp/src/arrow/ipc/util.h +++ b/cpp/src/arrow/ipc/util.h @@ -28,12 +28,10 @@ namespace arrow { namespace ipc { // Align on 8-byte boundaries -static constexpr int kArrowAlignment = 8; - // Buffers are padded to 64-byte boundaries (for SIMD) -static constexpr int kArrowBufferAlignment = 64; +static constexpr int kArrowAlignment = 64; -static constexpr uint8_t kPaddingBytes[kArrowBufferAlignment] = {0}; +static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0}; static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) { return ((nbytes + alignment - 1) / alignment) * alignment; diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 93dd5b69b1b..63c2166a573 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -61,10 +61,10 @@ // Alias MSVC popcount to GCC name #ifdef _MSC_VER -# include -# define __builtin_popcount __popcnt -# include -# define __builtin_popcountll _mm_popcnt_u64 +#include +#define __builtin_popcount __popcnt +#include +#define __builtin_popcountll _mm_popcnt_u64 #endif namespace arrow { diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index 589bdadb77c..80f295c487f 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -105,10 +105,6 @@ std::string UnionType::ToString() const { return s.str(); } -int NullType::bit_width() const { - return 0; -} - std::string NullType::ToString() const { return name(); } @@ -187,4 +183,46 @@ std::shared_ptr field( return std::make_shared(name, type, nullable, dictionary); } +static const BufferDescr kValidityBuffer(BufferType::VALIDITY, 1); +static const BufferDescr kOffsetBuffer(BufferType::OFFSET, 32); +static const BufferDescr kTypeBuffer(BufferType::TYPE, 32); +static const BufferDescr kBooleanBuffer(BufferType::DATA, 1); +static const BufferDescr kValues64(BufferType::DATA, 64); +static const BufferDescr kValues32(BufferType::DATA, 32); +static const BufferDescr kValues16(BufferType::DATA, 16); +static const BufferDescr kValues8(BufferType::DATA, 8); + +std::vector FixedWidthType::GetBufferLayout() const { + return {kValidityBuffer, BufferDescr(BufferType::DATA, bit_width())}; +} + +std::vector NullType::GetBufferLayout() const { + return {}; +} + +std::vector BinaryType::GetBufferLayout() const { + return {kValidityBuffer, kOffsetBuffer, kValues8}; +} + +std::vector ListType::GetBufferLayout() const { + return {kValidityBuffer, kOffsetBuffer}; +} + +std::vector StructType::GetBufferLayout() const { + return {kValidityBuffer, kTypeBuffer}; +} + +std::vector UnionType::GetBufferLayout() const { + if (mode == UnionMode::SPARSE) { + return {kValidityBuffer, kTypeBuffer}; + } else { + return {kValidityBuffer, kTypeBuffer, kOffsetBuffer}; + } +} + +std::vector DecimalType::GetBufferLayout() const { + // TODO(wesm) + return {}; +} + } // namespace arrow diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 876d7ea464b..30777384dfb 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -101,6 +101,20 @@ struct Type { }; }; +enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY }; + +class BufferDescr { + public: + BufferDescr(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {} + + BufferType type() const { return type_; } + int bit_width() const { return bit_width_; } + + private: + BufferType type_; + int bit_width_; +}; + struct ARROW_EXPORT DataType { Type::type type; @@ -129,12 +143,18 @@ struct ARROW_EXPORT DataType { virtual Status Accept(TypeVisitor* visitor) const = 0; virtual std::string ToString() const = 0; + + virtual std::vector GetBufferLayout() const = 0; }; typedef std::shared_ptr TypePtr; -struct ARROW_EXPORT FixedWidthMeta { +struct ARROW_EXPORT FixedWidthType : public DataType { + using DataType::DataType; + virtual int bit_width() const = 0; + + std::vector GetBufferLayout() const override; }; struct ARROW_EXPORT IntegerMeta { @@ -184,12 +204,12 @@ struct ARROW_EXPORT Field { }; typedef std::shared_ptr FieldPtr; -struct ARROW_EXPORT PrimitiveCType : public DataType { - using DataType::DataType; +struct ARROW_EXPORT PrimitiveCType : public FixedWidthType { + using FixedWidthType::FixedWidthType; }; template -struct ARROW_EXPORT CTypeImpl : public PrimitiveCType, public FixedWidthMeta { +struct ARROW_EXPORT CTypeImpl : public PrimitiveCType { using c_type = C_TYPE; static constexpr Type::type type_id = TYPE_ID; @@ -204,16 +224,17 @@ struct ARROW_EXPORT CTypeImpl : public PrimitiveCType, public FixedWidthMeta { std::string ToString() const override { return std::string(DERIVED::name()); } }; -struct ARROW_EXPORT NullType : public DataType, public FixedWidthMeta { +struct ARROW_EXPORT NullType : public DataType { static constexpr Type::type type_id = Type::NA; NullType() : DataType(Type::NA) {} - int bit_width() const override; Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; static std::string name() { return "null"; } + + std::vector GetBufferLayout() const override; }; template @@ -221,10 +242,10 @@ struct IntegerTypeImpl : public CTypeImpl, public Inte bool is_signed() const override { return std::is_signed::value; } }; -struct ARROW_EXPORT BooleanType : public DataType, FixedWidthMeta { +struct ARROW_EXPORT BooleanType : public FixedWidthType { static constexpr Type::type type_id = Type::BOOL; - BooleanType() : DataType(Type::BOOL) {} + BooleanType() : FixedWidthType(Type::BOOL) {} Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; @@ -306,6 +327,8 @@ struct ARROW_EXPORT ListType : public DataType, public NoExtraMeta { std::string ToString() const override; static std::string name() { return "list"; } + + std::vector GetBufferLayout() const override; }; // BinaryType type is reprsents lists of 1-byte values. @@ -318,6 +341,8 @@ struct ARROW_EXPORT BinaryType : public DataType, public NoExtraMeta { std::string ToString() const override; static std::string name() { return "binary"; } + std::vector GetBufferLayout() const override; + protected: // Allow subclasses to change the logical type. explicit BinaryType(Type::type logical_type) : DataType(logical_type) {} @@ -345,6 +370,8 @@ struct ARROW_EXPORT StructType : public DataType, public NoExtraMeta { Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; static std::string name() { return "struct"; } + + std::vector GetBufferLayout() const override; }; struct ARROW_EXPORT DecimalType : public DataType { @@ -358,6 +385,8 @@ struct ARROW_EXPORT DecimalType : public DataType { Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; static std::string name() { return "decimal"; } + + std::vector GetBufferLayout() const override; }; enum class UnionMode : char { SPARSE, DENSE }; @@ -375,14 +404,20 @@ struct ARROW_EXPORT UnionType : public DataType { static std::string name() { return "union"; } Status Accept(TypeVisitor* visitor) const override; + std::vector GetBufferLayout() const override; + UnionMode mode; std::vector type_ids; }; -struct ARROW_EXPORT DateType : public DataType, public NoExtraMeta { +struct ARROW_EXPORT DateType : public FixedWidthType { static constexpr Type::type type_id = Type::DATE; - DateType() : DataType(Type::DATE) {} + using c_type = int32_t; + + DateType() : FixedWidthType(Type::DATE) {} + + int bit_width() const override { return sizeof(c_type) * 8; } Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override { return name(); } @@ -391,13 +426,17 @@ struct ARROW_EXPORT DateType : public DataType, public NoExtraMeta { enum class TimeUnit : char { SECOND = 0, MILLI = 1, MICRO = 2, NANO = 3 }; -struct ARROW_EXPORT TimeType : public DataType { +struct ARROW_EXPORT TimeType : public FixedWidthType { static constexpr Type::type type_id = Type::TIME; using Unit = TimeUnit; + using c_type = int64_t; TimeUnit unit; - explicit TimeType(TimeUnit unit = TimeUnit::MILLI) : DataType(Type::TIME), unit(unit) {} + int bit_width() const override { return sizeof(c_type) * 8; } + + explicit TimeType(TimeUnit unit = TimeUnit::MILLI) + : FixedWidthType(Type::TIME), unit(unit) {} TimeType(const TimeType& other) : TimeType(other.unit) {} Status Accept(TypeVisitor* visitor) const override; @@ -405,7 +444,7 @@ struct ARROW_EXPORT TimeType : public DataType { static std::string name() { return "time"; } }; -struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta { +struct ARROW_EXPORT TimestampType : public FixedWidthType { using Unit = TimeUnit; typedef int64_t c_type; @@ -416,7 +455,7 @@ struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta { TimeUnit unit; explicit TimestampType(TimeUnit unit = TimeUnit::MILLI) - : DataType(Type::TIMESTAMP), unit(unit) {} + : FixedWidthType(Type::TIMESTAMP), unit(unit) {} TimestampType(const TimestampType& other) : TimestampType(other.unit) {} @@ -425,10 +464,10 @@ struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta { static std::string name() { return "timestamp"; } }; -struct ARROW_EXPORT IntervalType : public DataType, public FixedWidthMeta { +struct ARROW_EXPORT IntervalType : public FixedWidthType { enum class Unit : char { YEAR_MONTH = 0, DAY_TIME = 1 }; - typedef int64_t c_type; + using c_type = int64_t; static constexpr Type::type type_id = Type::INTERVAL; int bit_width() const override { return sizeof(int64_t) * 8; } @@ -436,7 +475,7 @@ struct ARROW_EXPORT IntervalType : public DataType, public FixedWidthMeta { Unit unit; explicit IntervalType(Unit unit = Unit::YEAR_MONTH) - : DataType(Type::INTERVAL), unit(unit) {} + : FixedWidthType(Type::INTERVAL), unit(unit) {} IntervalType(const IntervalType& other) : IntervalType(other.unit) {} diff --git a/cpp/src/arrow/types/primitive.cc b/cpp/src/arrow/types/primitive.cc index 14667ee5b6e..f42a3cac021 100644 --- a/cpp/src/arrow/types/primitive.cc +++ b/cpp/src/arrow/types/primitive.cc @@ -49,7 +49,7 @@ bool PrimitiveArray::EqualsExact(const PrimitiveArray& other) const { const uint8_t* this_data = raw_data_; const uint8_t* other_data = other.raw_data_; - auto size_meta = dynamic_cast(type_.get()); + auto size_meta = dynamic_cast(type_.get()); int value_byte_size = size_meta->bit_width() / 8; DCHECK_GT(value_byte_size, 0); diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h index 13b7e19593d..5c8055f9c61 100644 --- a/cpp/src/arrow/util/bit-util.h +++ b/cpp/src/arrow/util/bit-util.h @@ -78,6 +78,10 @@ static inline bool IsMultipleOf64(int64_t n) { return (n & 63) == 0; } +static inline bool IsMultipleOf8(int64_t n) { + return (n & 7) == 0; +} + inline int64_t RoundUpToMultipleOf64(int64_t num) { // TODO(wesm): is this definitely needed? // DCHECK_GE(num, 0); diff --git a/dev/release/run-rat.sh b/dev/release/run-rat.sh index d8ec6507fc4..e26dd589695 100755 --- a/dev/release/run-rat.sh +++ b/dev/release/run-rat.sh @@ -28,6 +28,7 @@ $RAT $1 \ -e ".*" \ -e mman.h \ -e "*_generated.h" \ + -e "*.json" \ -e random.h \ -e status.cc \ -e status.h \ @@ -49,5 +50,3 @@ else echo "${UNAPPROVED} unapproved licences. Check rat report: rat.txt" exit 1 fi - - diff --git a/format/IPC.md b/format/IPC.md index 3f78126ef55..a55dcdff481 100644 --- a/format/IPC.md +++ b/format/IPC.md @@ -15,3 +15,109 @@ # Interprocess messaging / communication (IPC) ## File format + +We define a self-contained "file format" containing an Arrow schema along with +one or more record batches defining a dataset. See [format/File.fbs][1] for the +precise details of the file metadata. + +In general, the file looks like: + +``` + + + +... + + +... + + + + +``` + +See the File.fbs document for details about the Flatbuffers metadata. The +record batches have a particular structure, defined next. + +### Record batches + +The record batch metadata is written as a flatbuffer (see +[format/Message.fbs][2] -- the RecordBatch message type) prefixed by its size, +followed by each of the memory buffers in the batch written end to end (with +appropriate alignment and padding): + +``` + + + + +``` + +The `RecordBatch` metadata contains a depth-first (pre-order) flattened set of +field metadata and physical memory buffers (some comments from [Message.fbs][2] +have been shortened / removed): + +``` +table RecordBatch { + length: int; + nodes: [FieldNode]; + buffers: [Buffer]; +} + +struct FieldNode { + /// The number of value slots in the Arrow array at this level of a nested + /// tree + length: int; + + /// The number of observed nulls. Fields with null_count == 0 may choose not + /// to write their physical validity bitmap out as a materialized buffer, + /// instead setting the length of the bitmap buffer to 0. + null_count: int; +} + +struct Buffer { + /// The shared memory page id where this buffer is located. Currently this is + /// not used + page: int; + + /// The relative offset into the shared memory page where the bytes for this + /// buffer starts + offset: long; + + /// The absolute length (in bytes) of the memory buffer. The memory is found + /// from offset (inclusive) to offset + length (non-inclusive). + length: long; +} +``` + +In the context of a file, the `page` is not used, and the `Buffer` offsets use +as a frame of reference the start of the segment where they are written in the +file. So, while in a general IPC setting these offsets may be anyplace in one +or more shared memory regions, in the file format the offsets start from 0. + +The location of a record batch and the size of the metadata block as well as +the body of buffers is stored in the file footer: + +``` +struct Block { + offset: long; + metaDataLength: int; + bodyLength: long; +} +``` + +Some notes about this + +* The `Block` offset indicates the starting byte of the record batch. +* The metadata length includes the flatbuffer size, the record batch metadata + flatbuffer, and any padding bytes + + +### Dictionary batches + +Dictionary batches have not yet been implemented, while they are provided for +in the metadata. For the time being, the `DICTIONARY` segments shown above in +the file do not appear in any of the file implementations. + +[1]: https://github.com/apache/arrow/blob/master/format/File.fbs +[1]: https://github.com/apache/arrow/blob/master/format/Message.fbs \ No newline at end of file diff --git a/format/Message.fbs b/format/Message.fbs index 2ec9fd1817b..d07d0666dce 100644 --- a/format/Message.fbs +++ b/format/Message.fbs @@ -18,7 +18,8 @@ namespace org.apache.arrow.flatbuf; enum MetadataVersion:short { - V1_SNAPSHOT + V1, + V2 } /// ---------------------------------------------------------------------- diff --git a/integration/data/simple.json b/integration/data/simple.json new file mode 100644 index 00000000000..a91b405d4f0 --- /dev/null +++ b/integration/data/simple.json @@ -0,0 +1,66 @@ +{ + "schema": { + "fields": [ + { + "name": "foo", + "type": {"name": "int", "isSigned": true, "bitWidth": 32}, + "nullable": true, "children": [], + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 32} + ] + } + }, + { + "name": "bar", + "type": {"name": "floatingpoint", "precision": "DOUBLE"}, + "nullable": true, "children": [], + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 64} + ] + } + }, + { + "name": "baz", + "type": {"name": "utf8"}, + "nullable": true, "children": [], + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "OFFSET", "typeBitWidth": 32}, + {"type": "DATA", "typeBitWidth": 64} + ] + } + } + ] + }, + "batches": [ + { + "count": 5, + "columns": [ + { + "name": "foo", + "count": 5, + "VALIDITY": [1, 0, 1, 1, 1], + "DATA": [1, 2, 3, 4, 5] + }, + { + "name": "bar", + "count": 5, + "VALIDITY": [1, 0, 0, 1, 1], + "DATA": [1.0, 2.0, 3.0, 4.0, 5.0] + }, + { + "name": "baz", + "count": 5, + "VALIDITY": [1, 0, 0, 1, 1], + "OFFSET": [0, 2, 2, 2, 5, 9], + "DATA": ["aa", "", "", "bbb", "cccc"] + } + ] + } + ] +} diff --git a/integration/integration_test.py b/integration/integration_test.py new file mode 100644 index 00000000000..6ea634d7795 --- /dev/null +++ b/integration/integration_test.py @@ -0,0 +1,177 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import argparse +import glob +import itertools +import os +import six +import subprocess +import tempfile +import uuid + + +ARROW_HOME = os.path.abspath(__file__).rsplit("/", 2)[0] + + +def guid(): + return uuid.uuid4().hex + + +def run_cmd(cmd): + if isinstance(cmd, six.string_types): + cmd = cmd.split(' ') + + try: + output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + # this avoids hiding the stdout / stderr of failed processes + print('Command failed: %s' % ' '.join(cmd)) + print('With output:') + print('--------------') + print(e.output) + print('--------------') + raise e + + if isinstance(output, six.binary_type): + output = output.decode('utf-8') + return output + + +class IntegrationRunner(object): + + def __init__(self, json_files, testers, debug=False): + self.json_files = json_files + self.testers = testers + self.temp_dir = tempfile.mkdtemp() + self.debug = debug + + def run(self): + for producer, consumer in itertools.product(self.testers, + self.testers): + if producer is consumer: + continue + + print('-- {0} producing, {1} consuming'.format(producer.name, + consumer.name)) + + for json_path in self.json_files: + print('Testing with {0}'.format(json_path)) + + arrow_path = os.path.join(self.temp_dir, guid()) + + producer.json_to_arrow(json_path, arrow_path) + consumer.validate(json_path, arrow_path) + + +class Tester(object): + + def __init__(self, debug=False): + self.debug = debug + + def json_to_arrow(self, json_path, arrow_path): + raise NotImplementedError + + def validate(self, json_path, arrow_path): + raise NotImplementedError + + +class JavaTester(Tester): + + ARROW_TOOLS_JAR = os.path.join(ARROW_HOME, + 'java/tools/target/arrow-tools-0.1.1-' + 'SNAPSHOT-jar-with-dependencies.jar') + + name = 'Java' + + def _run(self, arrow_path=None, json_path=None, command='VALIDATE'): + cmd = ['java', '-cp', self.ARROW_TOOLS_JAR, + 'org.apache.arrow.tools.Integration'] + + if arrow_path is not None: + cmd.extend(['-a', arrow_path]) + + if json_path is not None: + cmd.extend(['-j', json_path]) + + cmd.extend(['-c', command]) + + if self.debug: + print(' '.join(cmd)) + + return run_cmd(cmd) + + def validate(self, json_path, arrow_path): + return self._run(arrow_path, json_path, 'VALIDATE') + + def json_to_arrow(self, json_path, arrow_path): + return self._run(arrow_path, json_path, 'JSON_TO_ARROW') + + +class CPPTester(Tester): + + CPP_INTEGRATION_EXE = os.environ.get( + 'ARROW_CPP_TESTER', + os.path.join(ARROW_HOME, + 'cpp/test-build/debug/json-integration-test')) + + name = 'C++' + + def _run(self, arrow_path=None, json_path=None, command='VALIDATE'): + cmd = [self.CPP_INTEGRATION_EXE, '--integration'] + + if arrow_path is not None: + cmd.append('--arrow=' + arrow_path) + + if json_path is not None: + cmd.append('--json=' + json_path) + + cmd.append('--mode=' + command) + + if self.debug: + print(' '.join(cmd)) + + return run_cmd(cmd) + + def validate(self, json_path, arrow_path): + return self._run(arrow_path, json_path, 'VALIDATE') + + def json_to_arrow(self, json_path, arrow_path): + return self._run(arrow_path, json_path, 'JSON_TO_ARROW') + + +def get_json_files(): + glob_pattern = os.path.join(ARROW_HOME, 'integration', 'data', '*.json') + return glob.glob(glob_pattern) + + +def run_all_tests(debug=False): + testers = [JavaTester(debug=debug), CPPTester(debug=debug)] + json_files = get_json_files() + + runner = IntegrationRunner(json_files, testers, debug=debug) + runner.run() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Arrow integration test CLI') + parser.add_argument('--debug', dest='debug', action='store_true', + default=False, + help='Run executables in debug mode as relevant') + + args = parser.parse_args() + run_all_tests(debug=args.debug) diff --git a/java/pom.xml b/java/pom.xml index 7221a140d96..a147d66c983 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -24,7 +24,7 @@ pom Apache Arrow Java Root POM - Apache arrow is an open source, low latency SQL query engine for Hadoop and NoSQL. + Apache Arrow is open source, in-memory columnar data structures and low-overhead messaging http://arrow.apache.org/ @@ -442,8 +442,8 @@ test - + org.mockito mockito-core 1.9.5 diff --git a/java/tools/pom.xml b/java/tools/pom.xml index 84b0b5eb425..ef96328f766 100644 --- a/java/tools/pom.xml +++ b/java/tools/pom.xml @@ -45,6 +45,12 @@ commons-cli 1.2 + + ch.qos.logback + logback-classic + 1.0.13 + run + diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java index 29f0ee29e3c..fa4bedca7a9 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java @@ -220,6 +220,7 @@ private Command toCommand(String commandName) { private static void fatalError(String message, Throwable e) { System.err.println(message); + System.err.println(e.getMessage()); LOGGER.error(message, e); System.exit(1); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 4afd82315d9..c5d642ee0cc 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -81,7 +81,9 @@ private void loadBuffers(FieldVector vector, Field field, Iterator buf try { vector.loadFieldBuffers(fieldNode, ownBuffers); } catch (RuntimeException e) { - throw new IllegalArgumentException("Could not load buffers for field " + field, e); + e.printStackTrace(); + throw new IllegalArgumentException("Could not load buffers for field " + + field + " error message" + e.getMessage(), e); } List children = field.getChildren(); if (children.size() > 0) { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java index bbcd3e9f470..cd520da54f2 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java @@ -123,7 +123,11 @@ public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws IOEx if (n != l) { throw new IllegalStateException(n + " != " + l); } - RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer()); + + // Record batch flatbuffer is prefixed by its size as int32le + final ArrowBuf metadata = buffer.slice(4, recordBatchBlock.getMetadataLength() - 4); + RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(metadata.nioBuffer().asReadOnlyBuffer()); + int nodesLength = recordBatchFB.nodesLength(); final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength()); List nodes = new ArrayList<>(); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java index 9881a229c23..1cd87ebc335 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java @@ -99,9 +99,10 @@ private long writeIntLittleEndian(int v) throws IOException { public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException { checkStarted(); align(); - // write metadata header + + // write metadata header with int32 size prefix long offset = currentPosition; - write(recordBatch); + write(recordBatch, true); align(); // write body long bodyOffset = currentPosition; @@ -117,6 +118,7 @@ public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException { if (startPosition != currentPosition) { writeZeros((int)(startPosition - currentPosition)); } + write(buffer); if (currentPosition != startPosition + layout.getSize()) { throw new IllegalStateException("wrong buffer size: " + currentPosition + " != " + startPosition + layout.getSize()); @@ -133,7 +135,9 @@ public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException { } private void write(ArrowBuf buffer) throws IOException { - write(buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes())); + ByteBuffer nioBuffer = buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes()); + LOGGER.debug("Writing buffer with size: " + nioBuffer.remaining()); + write(nioBuffer); } private void checkStarted() throws IOException { @@ -166,14 +170,21 @@ private void writeMagic() throws IOException { private void writeFooter() throws IOException { // TODO: dictionaries - write(new ArrowFooter(schema, Collections.emptyList(), recordBatches)); + write(new ArrowFooter(schema, Collections.emptyList(), recordBatches), false); } - private long write(FBSerializable writer) throws IOException { + private long write(FBSerializable writer, boolean withSizePrefix) throws IOException { FlatBufferBuilder builder = new FlatBufferBuilder(); int root = writer.writeTo(builder); builder.finish(root); - return write(builder.dataBuffer()); + + ByteBuffer buffer = builder.dataBuffer(); + + if (withSizePrefix) { + writeIntLittleEndian(buffer.remaining()); + } + + return write(buffer); } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java index f07b5172507..f2059820d23 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java @@ -127,8 +127,13 @@ private void readVector(Field field, FieldVector vector) throws JsonParseExcepti ValueVector valueVector = (ValueVector)innerVector; valueVector.allocateNew(); Mutator mutator = valueVector.getMutator(); - mutator.setValueCount(count); - for (int i = 0; i < count; i++) { + + int innerVectorCount = count; + if (vectorType.getName() == "OFFSET") { + innerVectorCount++; + } + mutator.setValueCount(innerVectorCount); + for (int i = 0; i < innerVectorCount; i++) { parser.nextToken(); setValueFromParser(valueVector, i); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java index 812b3da32f8..6ff35777448 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java @@ -114,7 +114,7 @@ private void writeVector(Field field, FieldVector vector) throws IOException { BufferBacked innerVector = fieldInnerVectors.get(v); generator.writeArrayFieldStart(vectorType.getName()); ValueVector valueVector = (ValueVector)innerVector; - for (int i = 0; i < valueCount; i++) { + for (int i = 0; i < valueVector.getAccessor().getValueCount(); i++) { writeValueToGenerator(valueVector, i); } generator.writeEndArray(); diff --git a/python/.gitignore b/python/.gitignore index 07f28355a25..c37efc4b566 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -12,16 +12,6 @@ Testing/ # Editor temporary/working/backup files *flymake* -# Compiled source -*.a -*.dll -*.o -*.py[ocd] -*.so -*.dylib -.build_cache_dir -MANIFEST - # Generated sources *.c *.cpp