Skip to content

Commit

Permalink
Widen the offset of row table to 64b
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 committed Jul 23, 2024
1 parent a88f0cd commit d18b7ea
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 113 deletions.
11 changes: 6 additions & 5 deletions cpp/src/arrow/acero/swiss_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void RowArrayAccessor::Visit(const RowTableImpl& rows, int column_id, int num_ro
if (!is_fixed_length_column) {
int varbinary_column_id = VarbinaryColumnId(rows.metadata(), column_id);
const uint8_t* row_ptr_base = rows.data(2);
const uint32_t* row_offsets = rows.offsets();
const RowTableImpl::offset_type* row_offsets = rows.offsets();
uint32_t field_offset_within_row, field_length;

if (varbinary_column_id == 0) {
Expand Down Expand Up @@ -173,7 +173,7 @@ void RowArrayAccessor::Visit(const RowTableImpl& rows, int column_id, int num_ro
// Case 4: This is a fixed length column in a varying length row
//
const uint8_t* row_ptr_base = rows.data(2) + field_offset_within_row;
const uint32_t* row_offsets = rows.offsets();
const RowTableImpl::offset_type* row_offsets = rows.offsets();
for (int i = 0; i < num_rows; ++i) {
uint32_t row_id = row_ids[i];
const uint8_t* row_ptr = row_ptr_base + row_offsets[row_id];
Expand Down Expand Up @@ -565,8 +565,8 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl&
int64_t first_target_row_offset,
const int64_t* source_rows_permutation) {
int64_t num_source_rows = source.length();
uint32_t* target_offsets = target->mutable_offsets();
const uint32_t* source_offsets = source.offsets();
RowTableImpl::offset_type* target_offsets = target->mutable_offsets();
const RowTableImpl::offset_type* source_offsets = source.offsets();

// Permutation of source rows is optional.
//
Expand All @@ -593,7 +593,8 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl&
int64_t source_row_id = source_rows_permutation[i];
const uint64_t* source_row_ptr = reinterpret_cast<const uint64_t*>(
source.data(2) + source_offsets[source_row_id]);
uint32_t length = source_offsets[source_row_id + 1] - source_offsets[source_row_id];
int64_t length = source_offsets[source_row_id + 1] - source_offsets[source_row_id];
DCHECK_LE(length, std::numeric_limits<uint32_t>::max());

// Rows should be 64-bit aligned.
// In that case we can copy them using a sequence of 64-bit read/writes.
Expand Down
21 changes: 11 additions & 10 deletions cpp/src/arrow/compute/row/compare_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ void KeyCompare::CompareBinaryColumnToRowHelper(
}
} else {
const uint8_t* rows_left = col.data(1);
const uint32_t* offsets_right = rows.offsets();
const RowTableImpl::offset_type* offsets_right = rows.offsets();
const uint8_t* rows_right = rows.data(2);
for (uint32_t i = first_row_to_compare; i < num_rows_to_compare; ++i) {
uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
uint32_t irow_right = left_to_right_map[irow_left];
uint32_t offset_right = offsets_right[irow_right] + offset_within_row;
RowTableImpl::offset_type offset_right =
offsets_right[irow_right] + offset_within_row;
match_bytevector[i] = compare_fn(rows_left, rows_right, irow_left, offset_right);
}
}
Expand Down Expand Up @@ -145,7 +146,7 @@ void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
left_to_right_map, ctx, col, rows, match_bytevector,
[bit_offset](const uint8_t* left_base, const uint8_t* right_base,
uint32_t irow_left, uint32_t offset_right) {
uint32_t irow_left, RowTableImpl::offset_type offset_right) {
uint8_t left =
bit_util::GetBit(left_base, irow_left + bit_offset) ? 0xff : 0x00;
uint8_t right = right_base[offset_right];
Expand All @@ -156,7 +157,7 @@ void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
left_to_right_map, ctx, col, rows, match_bytevector,
[](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
RowTableImpl::offset_type offset_right) {
uint8_t left = left_base[irow_left];
uint8_t right = right_base[offset_right];
return left == right ? 0xff : 0;
Expand All @@ -166,7 +167,7 @@ void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
left_to_right_map, ctx, col, rows, match_bytevector,
[](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
RowTableImpl::offset_type offset_right) {
util::CheckAlignment<uint16_t>(left_base);
util::CheckAlignment<uint16_t>(right_base + offset_right);
uint16_t left = reinterpret_cast<const uint16_t*>(left_base)[irow_left];
Expand All @@ -178,7 +179,7 @@ void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
left_to_right_map, ctx, col, rows, match_bytevector,
[](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
RowTableImpl::offset_type offset_right) {
util::CheckAlignment<uint32_t>(left_base);
util::CheckAlignment<uint32_t>(right_base + offset_right);
uint32_t left = reinterpret_cast<const uint32_t*>(left_base)[irow_left];
Expand All @@ -190,7 +191,7 @@ void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
left_to_right_map, ctx, col, rows, match_bytevector,
[](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
RowTableImpl::offset_type offset_right) {
util::CheckAlignment<uint64_t>(left_base);
util::CheckAlignment<uint64_t>(right_base + offset_right);
uint64_t left = reinterpret_cast<const uint64_t*>(left_base)[irow_left];
Expand All @@ -202,7 +203,7 @@ void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
left_to_right_map, ctx, col, rows, match_bytevector,
[&col](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
RowTableImpl::offset_type offset_right) {
uint32_t length = col.metadata().fixed_length;

// Non-zero length guarantees no underflow
Expand Down Expand Up @@ -241,15 +242,15 @@ void KeyCompare::CompareVarBinaryColumnToRowHelper(
const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
const RowTableImpl& rows, uint8_t* match_bytevector) {
const uint32_t* offsets_left = col.offsets();
const uint32_t* offsets_right = rows.offsets();
const RowTableImpl::offset_type* offsets_right = rows.offsets();
const uint8_t* rows_left = col.data(2);
const uint8_t* rows_right = rows.data(2);
for (uint32_t i = first_row_to_compare; i < num_rows_to_compare; ++i) {
uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
uint32_t irow_right = left_to_right_map[irow_left];
uint32_t begin_left = offsets_left[irow_left];
uint32_t length_left = offsets_left[irow_left + 1] - begin_left;
uint32_t begin_right = offsets_right[irow_right];
RowTableImpl::offset_type begin_right = offsets_right[irow_right];
uint32_t length_right;
uint32_t offset_within_row;
if (!is_first_varbinary_col) {
Expand Down
39 changes: 16 additions & 23 deletions cpp/src/arrow/compute/row/encode_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ void EncoderInteger::Decode(uint32_t start_row, uint32_t num_rows,
DCHECK(false);
}
} else {
const uint32_t* row_offsets = rows.offsets() + start_row;
const RowTableImpl::offset_type* row_offsets = rows.offsets() + start_row;
const uint8_t* row_base = rows.data(2);
row_base += offset_within_row;
uint8_t* col_base = col_prep.mutable_data(1);
Expand Down Expand Up @@ -362,14 +362,14 @@ void EncoderBinary::EncodeSelectedImp(uint32_t offset_within_row, RowTableImpl*
} else {
const uint8_t* src_base = col.data(1);
uint8_t* dst = rows->mutable_data(2) + offset_within_row;
const uint32_t* offsets = rows->offsets();
const RowTableImpl::offset_type* offsets = rows->offsets();
for (uint32_t i = 0; i < num_selected; ++i) {
copy_fn(dst + offsets[i], src_base, selection[i]);
}
if (col.data(0)) {
const uint8_t* non_null_bits = col.data(0);
uint8_t* dst = rows->mutable_data(2) + offset_within_row;
const uint32_t* offsets = rows->offsets();
const RowTableImpl::offset_type* offsets = rows->offsets();
for (uint32_t i = 0; i < num_selected; ++i) {
bool is_null = !bit_util::GetBit(non_null_bits, selection[i] + col.bit_offset(0));
if (is_null) {
Expand Down Expand Up @@ -585,7 +585,7 @@ void EncoderBinaryPair::DecodeImp(uint32_t num_rows_to_skip, uint32_t start_row,
uint8_t* dst_B = col2->mutable_data(1);

uint32_t fixed_length = rows.metadata().fixed_length;
const uint32_t* offsets;
const RowTableImpl::offset_type* offsets;
const uint8_t* src_base;
if (is_row_fixed_length) {
src_base = rows.data(1) + fixed_length * start_row + offset_within_row;
Expand Down Expand Up @@ -640,7 +640,7 @@ void EncoderOffsets::Decode(uint32_t start_row, uint32_t num_rows,
// The Nth element is the sum of all the lengths of varbinary columns data in
// that row, up to and including Nth varbinary column.

const uint32_t* row_offsets = rows.offsets() + start_row;
const RowTableImpl::offset_type* row_offsets = rows.offsets() + start_row;

// Set the base offset for each column
for (size_t col = 0; col < varbinary_cols->size(); ++col) {
Expand All @@ -658,8 +658,8 @@ void EncoderOffsets::Decode(uint32_t start_row, uint32_t num_rows,
// Update the offset of each column
uint32_t offset_within_row = rows.metadata().fixed_length;
for (size_t col = 0; col < varbinary_cols->size(); ++col) {
offset_within_row +=
RowTableMetadata::padding_for_alignment(offset_within_row, string_alignment);
offset_within_row += RowTableMetadata::padding_for_alignment_within_row(
offset_within_row, string_alignment);
uint32_t length = varbinary_ends[col] - offset_within_row;
offset_within_row = varbinary_ends[col];
uint32_t* col_offsets = (*varbinary_cols)[col].mutable_offsets();
Expand All @@ -676,7 +676,7 @@ Status EncoderOffsets::GetRowOffsetsSelected(RowTableImpl* rows,
return Status::OK();
}

uint32_t* row_offsets = rows->mutable_offsets();
RowTableImpl::offset_type* row_offsets = rows->mutable_offsets();
for (uint32_t i = 0; i < num_selected; ++i) {
row_offsets[i] = rows->metadata().fixed_length;
}
Expand All @@ -688,7 +688,7 @@ Status EncoderOffsets::GetRowOffsetsSelected(RowTableImpl* rows,
for (uint32_t i = 0; i < num_selected; ++i) {
uint32_t irow = selection[i];
uint32_t length = col_offsets[irow + 1] - col_offsets[irow];
row_offsets[i] += RowTableMetadata::padding_for_alignment(
row_offsets[i] += RowTableMetadata::padding_for_alignment_row(
row_offsets[i], rows->metadata().string_alignment);
row_offsets[i] += length;
}
Expand All @@ -708,20 +708,13 @@ Status EncoderOffsets::GetRowOffsetsSelected(RowTableImpl* rows,
}
}

uint32_t sum = 0;
int64_t sum = 0;
int row_alignment = rows->metadata().row_alignment;
for (uint32_t i = 0; i < num_selected; ++i) {
uint32_t length = row_offsets[i];
length += RowTableMetadata::padding_for_alignment(length, row_alignment);
RowTableImpl::offset_type length = row_offsets[i];
length += RowTableMetadata::padding_for_alignment_row(length, row_alignment);
row_offsets[i] = sum;
uint32_t sum_maybe_overflow = 0;
if (ARROW_PREDICT_FALSE(
arrow::internal::AddWithOverflow(sum, length, &sum_maybe_overflow))) {
return Status::Invalid(
"Offset overflow detected in EncoderOffsets::GetRowOffsetsSelected for row ", i,
" of length ", length, " bytes, current length in total is ", sum, " bytes");
}
sum = sum_maybe_overflow;
sum += length;
}
row_offsets[num_selected] = sum;

Expand All @@ -732,7 +725,7 @@ template <bool has_nulls, bool is_first_varbinary>
void EncoderOffsets::EncodeSelectedImp(uint32_t ivarbinary, RowTableImpl* rows,
const std::vector<KeyColumnArray>& cols,
uint32_t num_selected, const uint16_t* selection) {
const uint32_t* row_offsets = rows->offsets();
const RowTableImpl::offset_type* row_offsets = rows->offsets();
uint8_t* row_base = rows->mutable_data(2) +
rows->metadata().varbinary_end_array_offset +
ivarbinary * sizeof(uint32_t);
Expand All @@ -753,7 +746,7 @@ void EncoderOffsets::EncodeSelectedImp(uint32_t ivarbinary, RowTableImpl* rows,
row[0] = rows->metadata().fixed_length + length;
} else {
row[0] = row[-1] +
RowTableMetadata::padding_for_alignment(
RowTableMetadata::padding_for_alignment_within_row(
row[-1], rows->metadata().string_alignment) +
length;
}
Expand Down Expand Up @@ -857,7 +850,7 @@ void EncoderNulls::Decode(uint32_t start_row, uint32_t num_rows, const RowTableI
void EncoderVarBinary::EncodeSelected(uint32_t ivarbinary, RowTableImpl* rows,
const KeyColumnArray& cols, uint32_t num_selected,
const uint16_t* selection) {
const uint32_t* row_offsets = rows->offsets();
const RowTableImpl::offset_type* row_offsets = rows->offsets();
uint8_t* row_base = rows->mutable_data(2);
const uint32_t* col_offsets = cols.offsets();
const uint8_t* col_base = cols.data(2);
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/compute/row/encode_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class EncoderBinary {
copy_fn(dst, src, col_width);
}
} else {
const uint32_t* row_offsets = rows_const->offsets();
const RowTableImpl::offset_type* row_offsets = rows_const->offsets();
for (uint32_t i = 0; i < num_rows; ++i) {
const uint8_t* src;
uint8_t* dst;
Expand Down Expand Up @@ -267,15 +267,16 @@ class EncoderVarBinary {
ARROW_DCHECK(!rows_const->metadata().is_fixed_length &&
!col_const->metadata().is_fixed_length);

const uint32_t* row_offsets_for_batch = rows_const->offsets() + start_row;
const RowTableImpl::offset_type* row_offsets_for_batch =
rows_const->offsets() + start_row;
const uint32_t* col_offsets = col_const->offsets();

uint32_t col_offset_next = col_offsets[0];
for (uint32_t i = 0; i < num_rows; ++i) {
uint32_t col_offset = col_offset_next;
col_offset_next = col_offsets[i + 1];

uint32_t row_offset = row_offsets_for_batch[i];
RowTableImpl::offset_type row_offset = row_offsets_for_batch[i];
const uint8_t* row = rows_const->data(2) + row_offset;

uint32_t offset_within_row;
Expand Down
31 changes: 12 additions & 19 deletions cpp/src/arrow/compute/row/row_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ void RowTableMetadata::FromColumnMetadataVector(
const KeyColumnMetadata& col = cols[column_order[i]];
if (col.is_fixed_length && col.fixed_length != 0 &&
ARROW_POPCOUNT64(col.fixed_length) != 1) {
offset_within_row += RowTableMetadata::padding_for_alignment(offset_within_row,
string_alignment, col);
offset_within_row += RowTableMetadata::padding_for_alignment_within_row(
offset_within_row, string_alignment, col);
}
column_offsets[i] = offset_within_row;
if (!col.is_fixed_length) {
Expand All @@ -155,7 +155,7 @@ void RowTableMetadata::FromColumnMetadataVector(
is_fixed_length = (num_varbinary_cols == 0);
fixed_length =
offset_within_row +
RowTableMetadata::padding_for_alignment(
RowTableMetadata::padding_for_alignment_within_row(
offset_within_row, num_varbinary_cols == 0 ? row_alignment : string_alignment);

// We set the number of bytes per row storing null masks of individual key columns
Expand Down Expand Up @@ -235,7 +235,7 @@ int64_t RowTableImpl::size_null_masks(int64_t num_rows) const {
}

int64_t RowTableImpl::size_offsets(int64_t num_rows) const {
return (num_rows + 1) * sizeof(uint32_t) + kPaddingForVectors;
return (num_rows + 1) * sizeof(offset_type) + kPaddingForVectors;
}

int64_t RowTableImpl::size_rows_fixed_length(int64_t num_rows) const {
Expand Down Expand Up @@ -324,23 +324,15 @@ Status RowTableImpl::AppendSelectionFrom(const RowTableImpl& from,

if (!metadata_.is_fixed_length) {
// Varying-length rows
auto from_offsets = reinterpret_cast<const uint32_t*>(from.offsets_->data());
auto to_offsets = reinterpret_cast<uint32_t*>(offsets_->mutable_data());
uint32_t total_length = to_offsets[num_rows_];
uint32_t total_length_to_append = 0;
auto from_offsets = reinterpret_cast<const offset_type*>(from.offsets_->data());
auto to_offsets = reinterpret_cast<offset_type*>(offsets_->mutable_data());
offset_type total_length = to_offsets[num_rows_];
int64_t total_length_to_append = 0;
for (uint32_t i = 0; i < num_rows_to_append; ++i) {
uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
int64_t length = from_offsets[row_id + 1] - from_offsets[row_id];
total_length_to_append += length;
uint32_t to_offset_maybe_overflow = 0;
if (ARROW_PREDICT_FALSE(arrow::internal::AddWithOverflow(
total_length, total_length_to_append, &to_offset_maybe_overflow))) {
return Status::Invalid(
"Offset overflow detected in RowTableImpl::AppendSelectionFrom for row ",
num_rows_ + i, " of length ", length, " bytes, current length in total is ",
to_offsets[num_rows_ + i], " bytes");
}
to_offsets[num_rows_ + i + 1] = to_offset_maybe_overflow;
to_offsets[num_rows_ + i + 1] = total_length + total_length_to_append;
}

RETURN_NOT_OK(ResizeOptionalVaryingLengthBuffer(total_length_to_append));
Expand All @@ -349,7 +341,8 @@ Status RowTableImpl::AppendSelectionFrom(const RowTableImpl& from,
uint8_t* dst = rows_->mutable_data() + total_length;
for (uint32_t i = 0; i < num_rows_to_append; ++i) {
uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
int64_t length = from_offsets[row_id + 1] - from_offsets[row_id];
DCHECK_LE(length, std::numeric_limits<uint32_t>::max());
auto src64 = reinterpret_cast<const uint64_t*>(src + from_offsets[row_id]);
auto dst64 = reinterpret_cast<uint64_t*>(dst);
for (uint32_t j = 0; j < bit_util::CeilDiv(length, 8); ++j) {
Expand Down
Loading

0 comments on commit d18b7ea

Please sign in to comment.