Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-185: Make padding and alignment for all buffers be 64 bytes #74

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions cpp/src/arrow/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ Status ArrayBuilder::AppendToBitmap(const uint8_t* valid_bytes, int32_t length)
}

Status ArrayBuilder::Init(int32_t capacity) {
capacity_ = capacity;
int32_t to_alloc = util::ceil_byte(capacity) / 8;
null_bitmap_ = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(null_bitmap_->Resize(to_alloc));
// Buffers might allocate more then necessary to satisfy padding requirements
const int byte_capacity = null_bitmap_->capacity();
capacity_ = capacity;
null_bitmap_data_ = null_bitmap_->mutable_data();
memset(null_bitmap_data_, 0, to_alloc);
memset(null_bitmap_data_, 0, byte_capacity);
return Status::OK();
}

Expand All @@ -60,8 +62,11 @@ Status ArrayBuilder::Resize(int32_t new_bits) {
int32_t old_bytes = null_bitmap_->size();
RETURN_NOT_OK(null_bitmap_->Resize(new_bytes));
null_bitmap_data_ = null_bitmap_->mutable_data();
// The buffer might be overpadded to deal with padding according to the spec
const int32_t byte_capacity = null_bitmap_->capacity();
capacity_ = new_bits;
if (old_bytes < new_bytes) {
memset(null_bitmap_data_ + old_bytes, 0, new_bytes - old_bytes);
memset(null_bitmap_data_ + old_bytes, 0, byte_capacity - old_bytes);
}
return Status::OK();
}
Expand Down
20 changes: 19 additions & 1 deletion cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ namespace flatbuf = apache::arrow::flatbuf;

namespace ipc {

namespace {
Status CheckMultipleOf64(int64_t size) {
if (util::is_multiple_of_64(size)) { return Status::OK(); }
return Status::Invalid(
"Attempted to write a buffer that "
"wasn't a multiple of 64 bytes");
}
}

static bool IsPrimitive(const DataType* type) {
DCHECK(type != nullptr);
switch (type->type) {
Expand Down Expand Up @@ -115,6 +124,8 @@ Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes
} else if (arr->type_enum() == Type::STRUCT) {
// TODO(wesm)
return Status::NotImplemented("Struct type");
} else {
return Status::NotImplemented("Unrecognized type");
}
return Status::OK();
}
Expand Down Expand Up @@ -142,7 +153,13 @@ class RowBatchWriter {
int64_t size = 0;

// The buffer might be null if we are handling zero row lengths.
if (buffer) { size = buffer->size(); }
if (buffer) {
// We use capacity here, because size might not reflect the padding
// requirements of buffers but capacity always should.
size = buffer->capacity();
// check that padding is appropriate
RETURN_NOT_OK(CheckMultipleOf64(size));
}
// TODO(wesm): We currently have no notion of shared memory page id's,
// but we've included it in the metadata IDL for when we have it in the
// future. Use page=0 for now
Expand Down Expand Up @@ -305,6 +322,7 @@ class RowBatchReader::Impl {

Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
BufferMetadata metadata = metadata_->buffer(buffer_index);
RETURN_NOT_OK(CheckMultipleOf64(metadata.length));
return source_->ReadAt(metadata.offset, metadata.length, out);
}

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/ipc/ipc-adapter-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch,

void TestGetRowBatchSize(std::shared_ptr<RowBatch> batch) {
MockMemorySource mock_source(1 << 16);
int64_t mock_header_location;
int64_t size;
int64_t mock_header_location = -1;
int64_t size = -1;
ASSERT_OK(WriteRowBatch(&mock_source, batch.get(), 0, &mock_header_location));
ASSERT_OK(GetRowBatchSize(batch.get(), &size));
ASSERT_EQ(mock_source.GetExtentBytesWritten(), size);
Expand Down Expand Up @@ -270,7 +270,7 @@ TEST_F(RecursionLimits, WriteLimit) {
}

TEST_F(RecursionLimits, ReadLimit) {
int64_t header_location;
int64_t header_location = -1;
std::shared_ptr<Schema> schema;
ASSERT_OK(WriteToMmap(64, true, &header_location, &schema));

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/types/list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ bool ListArray::Equals(const std::shared_ptr<Array>& arr) const {
Status ListArray::Validate() const {
if (length_ < 0) { return Status::Invalid("Length was negative"); }
if (!offset_buf_) { return Status::Invalid("offset_buf_ was null"); }
if (offset_buf_->size() / sizeof(int32_t) < length_) {
if (offset_buf_->size() / static_cast<int>(sizeof(int32_t)) < length_) {
std::stringstream ss;
ss << "offset buffer size (bytes): " << offset_buf_->size()
<< " isn't large enough for length: " << length_;
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/types/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <cstdint>
#include <cstring>
#include <limits>
#include <memory>

#include "arrow/array.h"
Expand Down Expand Up @@ -113,12 +114,14 @@ class ListBuilder : public ArrayBuilder {
values_(values) {}

Status Init(int32_t elements) override {
DCHECK_LT(elements, std::numeric_limits<int32_t>::max());
RETURN_NOT_OK(ArrayBuilder::Init(elements));
// one more then requested for offsets
return offset_builder_.Resize((elements + 1) * sizeof(int32_t));
}

Status Resize(int32_t capacity) override {
DCHECK_LT(capacity, std::numeric_limits<int32_t>::max());
// one more then requested for offsets
RETURN_NOT_OK(offset_builder_.Resize((capacity + 1) * sizeof(int32_t)));
return ArrayBuilder::Resize(capacity);
Expand Down
7 changes: 3 additions & 4 deletions cpp/src/arrow/types/primitive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Status PrimitiveBuilder<T>::Init(int32_t capacity) {

int64_t nbytes = type_traits<T>::bytes_required(capacity);
RETURN_NOT_OK(data_->Resize(nbytes));
// TODO(emkornfield) valgrind complains without this
memset(data_->mutable_data(), 0, nbytes);

raw_data_ = reinterpret_cast<value_type*>(data_->mutable_data());
Expand All @@ -91,15 +92,13 @@ Status PrimitiveBuilder<T>::Resize(int32_t capacity) {
RETURN_NOT_OK(Init(capacity));
} else {
RETURN_NOT_OK(ArrayBuilder::Resize(capacity));

int64_t old_bytes = data_->size();
int64_t new_bytes = type_traits<T>::bytes_required(capacity);
const int64_t old_bytes = data_->size();
const int64_t new_bytes = type_traits<T>::bytes_required(capacity);
RETURN_NOT_OK(data_->Resize(new_bytes));
raw_data_ = reinterpret_cast<value_type*>(data_->mutable_data());

memset(data_->mutable_data() + old_bytes, 0, new_bytes - old_bytes);
}
capacity_ = capacity;
return Status::OK();
}

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/util/bit-util-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@

namespace arrow {

TEST(UtilTests, TestIsMultipleOf64) {
using util::is_multiple_of_64;
EXPECT_TRUE(is_multiple_of_64(64));
EXPECT_TRUE(is_multiple_of_64(0));
EXPECT_TRUE(is_multiple_of_64(128));
EXPECT_TRUE(is_multiple_of_64(192));
EXPECT_FALSE(is_multiple_of_64(23));
EXPECT_FALSE(is_multiple_of_64(32));
}

TEST(UtilTests, TestNextPower2) {
using util::next_power2;

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/util/bit-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ static inline int64_t next_power2(int64_t n) {
return n;
}

static inline bool is_multiple_of_64(int64_t n) {
return (n & 63) == 0;
}

void bytes_to_bits(const std::vector<uint8_t>& bytes, uint8_t* bits);
Status bytes_to_bits(const std::vector<uint8_t>&, std::shared_ptr<Buffer>*);

Expand Down
17 changes: 17 additions & 0 deletions cpp/src/arrow/util/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,32 @@
#include "arrow/util/buffer.h"

#include <cstdint>
#include <limits>

#include "arrow/util/logging.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

namespace arrow {

namespace {
int64_t RoundUpToMultipleOf64(int64_t num) {
DCHECK_GE(num, 0);
constexpr int64_t round_to = 64;
constexpr int64_t force_carry_addend = round_to - 1;
constexpr int64_t truncate_bitmask = ~(round_to - 1);
constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to;
if (num <= max_roundable_num) { return (num + force_carry_addend) & truncate_bitmask; }
// handle overflow case. This should result in a malloc error upstream
return num;
}
} // namespace

Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size) {
data_ = parent->data() + offset;
size_ = size;
parent_ = parent;
capacity_ = size;
}

Buffer::~Buffer() {}
Expand All @@ -48,6 +64,7 @@ PoolBuffer::~PoolBuffer() {
Status PoolBuffer::Reserve(int64_t new_capacity) {
if (!mutable_data_ || new_capacity > capacity_) {
uint8_t* new_data;
new_capacity = RoundUpToMultipleOf64(new_capacity);
if (mutable_data_) {
RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data));
memcpy(new_data, mutable_data_, size_);
Expand Down
34 changes: 22 additions & 12 deletions cpp/src/arrow/util/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,23 @@ class Status;
// Buffer classes

// Immutable API for a chunk of bytes which may or may not be owned by the
// class instance
// class instance. Buffers have two related notions of length: size and
// capacity. Size is the number of bytes that might have valid data.
// Capacity is the number of bytes that where allocated for the buffer in
// total.
// The following invariant is always true: Size < Capacity
class Buffer : public std::enable_shared_from_this<Buffer> {
public:
Buffer(const uint8_t* data, int64_t size) : data_(data), size_(size) {}
Buffer(const uint8_t* data, int64_t size) : data_(data), size_(size), capacity_(size) {}
virtual ~Buffer();

// An offset into data that is owned by another buffer, but we want to be
// able to retain a valid pointer to it even after other shared_ptr's to the
// parent buffer have been destroyed
//
// This method makes no assertions about alignment or padding of the buffer but
// in general we expected buffers to be aligned and padded to 64 bytes. In the future
// we might add utility methods to help determine if a buffer satisfies this contract.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably what we can do is add a method to produce a buffer that is guaranteed to be aligned and padded (allocating as necessary). For example: if there is incoming data from another library to libarrow that is not aligned or padded, some algorithms may work without alignment or padding, while others (e.g. requiring SIMD) would require the buffer to be "fixed". This could get pretty hairy, though...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about the case where an Arrow array is constructed from memory allocated elsewhere with zero copy

Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size);

std::shared_ptr<Buffer> get_shared_ptr() { return shared_from_this(); }
Expand All @@ -63,6 +71,7 @@ class Buffer : public std::enable_shared_from_this<Buffer> {
(data_ == other.data_ || !memcmp(data_, other.data_, size_)));
}

int64_t capacity() const { return capacity_; }
const uint8_t* data() const { return data_; }

int64_t size() const { return size_; }
Expand All @@ -76,6 +85,7 @@ class Buffer : public std::enable_shared_from_this<Buffer> {
protected:
const uint8_t* data_;
int64_t size_;
int64_t capacity_;

// nullptr by default, but may be set
std::shared_ptr<Buffer> parent_;
Expand Down Expand Up @@ -105,18 +115,17 @@ class MutableBuffer : public Buffer {
class ResizableBuffer : public MutableBuffer {
public:
// Change buffer reported size to indicated size, allocating memory if
// necessary
// necessary. This will ensure that the capacity of the buffer is a multiple
// of 64 bytes as defined in Layout.md.
virtual Status Resize(int64_t new_size) = 0;

// Ensure that buffer has enough memory allocated to fit the indicated
// capacity. Does not change buffer's reported size
// capacity (and meets the 64 byte padding requirement in Layout.md).
// It does not change buffer's reported size.
virtual Status Reserve(int64_t new_capacity) = 0;

protected:
ResizableBuffer(uint8_t* data, int64_t size)
: MutableBuffer(data, size), capacity_(size) {}

int64_t capacity_;
ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {}
};

// A Buffer whose lifetime is tied to a particular MemoryPool
Expand All @@ -125,8 +134,8 @@ class PoolBuffer : public ResizableBuffer {
explicit PoolBuffer(MemoryPool* pool = nullptr);
virtual ~PoolBuffer();

virtual Status Resize(int64_t new_size);
virtual Status Reserve(int64_t new_capacity);
Status Resize(int64_t new_size) override;
Status Reserve(int64_t new_capacity) override;

private:
MemoryPool* pool_;
Expand All @@ -138,10 +147,11 @@ class BufferBuilder {
public:
explicit BufferBuilder(MemoryPool* pool) : pool_(pool), capacity_(0), size_(0) {}

// Resizes the buffer to the nearest multiple of 64 bytes per Layout.md
Status Resize(int32_t elements) {
if (capacity_ == 0) { buffer_ = std::make_shared<PoolBuffer>(pool_); }
capacity_ = elements;
RETURN_NOT_OK(buffer_->Resize(capacity_));
RETURN_NOT_OK(buffer_->Resize(elements));
capacity_ = buffer_->capacity();
data_ = buffer_->mutable_data();
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/util/memory-pool-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ TEST(DefaultMemoryPool, MemoryTracking) {

uint8_t* data;
ASSERT_OK(pool->Allocate(100, &data));
EXPECT_EQ(0, reinterpret_cast<uint64_t>(data) % 64);
ASSERT_EQ(100, pool->bytes_allocated());

pool->Free(data, 100);
Expand Down
31 changes: 24 additions & 7 deletions cpp/src/arrow/util/memory-pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "arrow/util/memory-pool.h"

#include <stdlib.h>
#include <cstdlib>
#include <mutex>
#include <sstream>
Expand All @@ -25,6 +26,28 @@

namespace arrow {

namespace {
// Allocate memory according to the alignment requirements for Arrow
// (as of May 2016 64 bytes)
Status AllocateAligned(int64_t size, uint8_t** out) {
// TODO(emkornfield) find something compatible with windows
constexpr size_t kAlignment = 64;
const int result = posix_memalign(reinterpret_cast<void**>(out), kAlignment, size);
if (result == ENOMEM) {
std::stringstream ss;
ss << "malloc of size " << size << " failed";
return Status::OutOfMemory(ss.str());
}

if (result == EINVAL) {
std::stringstream ss;
ss << "invalid alignment parameter: " << kAlignment;
return Status::Invalid(ss.str());
}
return Status::OK();
}
} // namespace

MemoryPool::~MemoryPool() {}

class InternalMemoryPool : public MemoryPool {
Expand All @@ -45,13 +68,7 @@ class InternalMemoryPool : public MemoryPool {

Status InternalMemoryPool::Allocate(int64_t size, uint8_t** out) {
std::lock_guard<std::mutex> guard(pool_lock_);
*out = static_cast<uint8_t*>(std::malloc(size));
if (*out == nullptr) {
std::stringstream ss;
ss << "malloc of size " << size << " failed";
return Status::OutOfMemory(ss.str());
}

RETURN_NOT_OK(AllocateAligned(size, out));
bytes_allocated_ += size;

return Status::OK();
Expand Down
Loading