Skip to content

Commit 95157f2

Browse files
committed
Make record batch writes aligned on word boundaries
Change-Id: Ia0e59d236054db576bcc458231c914a4a865b428
1 parent 7c50251 commit 95157f2

File tree

3 files changed

+57
-25
lines changed

3 files changed

+57
-25
lines changed

cpp/src/arrow/ipc/adapter.cc

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -142,21 +142,21 @@ class RecordBatchWriter {
142142

143143
Status AssemblePayload() {
144144
// Perform depth-first traversal of the row-batch
145-
for (int i = 0; i < columns_->size(); ++i) {
145+
for (size_t i = 0; i < columns_->size(); ++i) {
146146
const Array* arr = (*columns_)[i].get();
147147
RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_, max_recursion_depth_));
148148
}
149149
return Status::OK();
150150
}
151151

152152
Status Write(io::OutputStream* dst, int64_t* data_header_offset) {
153-
// Write out all the buffers contiguously and compute the total size of the
154-
// memory payload
155-
int64_t offset = 0;
156-
157153
// Get the starting position
158-
int64_t position;
159-
RETURN_NOT_OK(dst->Tell(&position));
154+
int64_t start_position;
155+
RETURN_NOT_OK(dst->Tell(&start_position));
156+
157+
// Keep track of the current position so we can determine the size of the
158+
// message body
159+
int64_t position = start_position;
160160

161161
for (size_t i = 0; i < buffers_.size(); ++i) {
162162
const Buffer* buffer = buffers_[i].get();
@@ -178,11 +178,11 @@ class RecordBatchWriter {
178178
// are using from any OS-level shared memory. The thought is that systems
179179
// may (in the future) associate integer page id's with physical memory
180180
// pages (according to whatever is the desired shared memory mechanism)
181-
buffer_meta_.push_back(flatbuf::Buffer(0, position + offset, size));
181+
buffer_meta_.push_back(flatbuf::Buffer(0, position, size));
182182

183183
if (size > 0) {
184184
RETURN_NOT_OK(dst->Write(buffer->data(), size));
185-
offset += size;
185+
position += size;
186186
}
187187
}
188188

@@ -194,13 +194,24 @@ class RecordBatchWriter {
194194
// determine the data header size then request a buffer such that you can
195195
// construct the flatbuffer data accessor object (see arrow::ipc::Message)
196196
std::shared_ptr<Buffer> data_header;
197-
RETURN_NOT_OK(
198-
WriteDataHeader(num_rows_, offset, field_nodes_, buffer_meta_, &data_header));
197+
RETURN_NOT_OK(WriteDataHeader(
198+
num_rows_, position - start_position, field_nodes_, buffer_meta_, &data_header));
199199

200200
// Write the data header at the end
201201
RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size()));
202+
*data_header_offset = position;
202203

203-
*data_header_offset = position + offset;
204+
return Align(dst, &position);
205+
}
206+
207+
Status Align(io::OutputStream* dst, int64_t* position) {
208+
// Write all buffers here on word boundaries
209+
// TODO(wesm): Is there benefit to 64-byte padding in IPC?
210+
int64_t remainder = PaddedLength(*position) - *position;
211+
if (remainder > 0) {
212+
RETURN_NOT_OK(dst->Write(kPaddingBytes, remainder));
213+
*position += remainder;
214+
}
204215
return Status::OK();
205216
}
206217

cpp/src/arrow/ipc/file.cc

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
#include <sstream>
2323
#include <vector>
2424

25+
#include "arrow/ipc/adapter.h"
2526
#include "arrow/ipc/metadata.h"
27+
#include "arrow/ipc/util.h"
2628
#include "arrow/io/interfaces.h"
2729
#include "arrow/util/buffer.h"
2830
#include "arrow/util/logging.h"
@@ -33,14 +35,6 @@ namespace ipc {
3335

3436
static constexpr const char* kArrowMagicBytes = "ARROW1";
3537

36-
// Align on 8-byte boundaries
37-
static constexpr int kArrowAlignment = 8;
38-
static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};
39-
40-
static inline int64_t PaddedLength(int64_t nbytes) {
41-
return ((nbytes + kArrowAlignment - 1) / kArrowAlignment) * kArrowAlignment;
42-
}
43-
4438
// ----------------------------------------------------------------------
4539
// Writer implementation
4640

@@ -87,6 +81,30 @@ Status FileWriter::CheckStarted() {
8781
return Status::OK();
8882
}
8983

84+
Status FileWriter::WriteRecordBatch(
85+
const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
86+
RETURN_NOT_OK(CheckStarted());
87+
88+
int64_t offset = position_;
89+
90+
int64_t header_offset;
91+
RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(columns, num_rows, sink_, &header_offset));
92+
RETURN_NOT_OK(UpdatePosition());
93+
94+
DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes";
95+
96+
// We can infer the metadata length and length of the record batch body (the
97+
// concatenated buffers) from the heade offset and the new output stream
98+
// position
99+
int32_t metadata_length = position_ - header_offset;
100+
int32_t body_length = position_ - offset - metadata_length;
101+
102+
// Append metadata, to be written in the footer latera
103+
record_batches_.emplace_back(offset, metadata_length, body_length);
104+
105+
return Status::OK();
106+
}
107+
90108
Status FileWriter::Close() {
91109
// Write metadata
92110
int64_t initial_position = position_;
@@ -105,11 +123,6 @@ Status FileWriter::Close() {
105123
reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes));
106124
}
107125

108-
Status FileWriter::WriteRecordBatch(
109-
const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
110-
return Status::OK();
111-
}
112-
113126
// ----------------------------------------------------------------------
114127
// Reader implementation
115128

cpp/src/arrow/ipc/util.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@
2727
namespace arrow {
2828
namespace ipc {
2929

30+
// Align on 8-byte boundaries
31+
static constexpr int kArrowAlignment = 8;
32+
static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};
33+
34+
static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) {
35+
return ((nbytes + alignment - 1) / alignment) * alignment;
36+
}
37+
3038
// A helper class to tracks the size of allocations
3139
class MockOutputStream : public io::OutputStream {
3240
public:

0 commit comments

Comments
 (0)