Skip to content

Commit

Permalink
Return processed rows in avx2 functions
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 committed Aug 22, 2024
1 parent 9b92cec commit 27bc9da
Showing 1 changed file with 39 additions and 36 deletions.
75 changes: 39 additions & 36 deletions cpp/src/arrow/acero/swiss_join_avx2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,10 @@ int RowArray::DecodeFixedLength_avx2(ResizableArrayData* output, int output_star
const uint32_t* row_ids) const {
DCHECK_EQ(output_start_row % 8, 0);

int num_rows_processed = 0;
switch (fixed_length) {
case 0:
RowArrayAccessor::Visit_avx2(
num_rows_processed = RowArrayAccessor::Visit_avx2(
rows_, column_id, num_rows_to_append, row_ids,
[&](int i, const uint8_t* row_ptr_base, __m256i offset_lo, __m256i offset_hi,
__m256i num_bytes) {
Expand All @@ -337,7 +338,7 @@ int RowArray::DecodeFixedLength_avx2(ResizableArrayData* output, int output_star
});
break;
case 1:
RowArrayAccessor::Visit_avx2(
num_rows_processed = RowArrayAccessor::Visit_avx2(
rows_, column_id, num_rows_to_append, row_ids,
[&](int i, const uint8_t* row_ptr_base, __m256i offset_lo, __m256i offset_hi,
__m256i num_bytes) {
Expand All @@ -346,7 +347,7 @@ int RowArray::DecodeFixedLength_avx2(ResizableArrayData* output, int output_star
});
break;
case 2:
RowArrayAccessor::Visit_avx2(
num_rows_processed = RowArrayAccessor::Visit_avx2(
rows_, column_id, num_rows_to_append, row_ids,
[&](int i, const uint8_t* row_ptr_base, __m256i offset_lo, __m256i offset_hi,
__m256i num_bytes) {
Expand All @@ -357,7 +358,7 @@ int RowArray::DecodeFixedLength_avx2(ResizableArrayData* output, int output_star
});
break;
case 4:
RowArrayAccessor::Visit_avx2(
num_rows_processed = RowArrayAccessor::Visit_avx2(
rows_, column_id, num_rows_to_append, row_ids,
[&](int i, const uint8_t* row_ptr_base, __m256i offset_lo, __m256i offset_hi,
__m256i num_bytes) {
Expand All @@ -368,7 +369,7 @@ int RowArray::DecodeFixedLength_avx2(ResizableArrayData* output, int output_star
});
break;
case 8:
RowArrayAccessor::Visit_avx2(
num_rows_processed = RowArrayAccessor::Visit_avx2(
rows_, column_id, num_rows_to_append, row_ids,
[&](int i, const uint8_t* row_ptr_base, __m256i offset_lo, __m256i offset_hi,
__m256i num_bytes) {
Expand All @@ -379,7 +380,7 @@ int RowArray::DecodeFixedLength_avx2(ResizableArrayData* output, int output_star
});
break;
default:
RowArrayAccessor::Visit_avx2(
num_rows_processed = RowArrayAccessor::Visit_avx2(
rows_, column_id, num_rows_to_append, row_ids,
[&](int i, const uint8_t* row_ptr_base, __m256i offset_lo, __m256i offset_hi,
__m256i num_bytes) {
Expand All @@ -397,7 +398,7 @@ int RowArray::DecodeFixedLength_avx2(ResizableArrayData* output, int output_star
break;
}

return num_rows_to_append;
return num_rows_processed;
}

int RowArray::DecodeOffsets_avx2(ResizableArrayData* output, int output_start_row,
Expand All @@ -406,54 +407,56 @@ int RowArray::DecodeOffsets_avx2(ResizableArrayData* output, int output_start_ro
uint32_t* offsets =
reinterpret_cast<uint32_t*>(output->mutable_data(1)) + output_start_row;
uint32_t sum = (output_start_row == 0) ? 0 : offsets[0];
RowArrayAccessor::Visit_avx2(rows_, column_id, num_rows_to_append, row_ids,
[&](int i, const uint8_t* row_ptr_base, __m256i offset_lo,
__m256i offset_hi, __m256i num_bytes) {
// offsets[i] = num_bytes;
});
for (int i = 0; i < num_rows_to_append; ++i) {
int num_rows_processed = RowArrayAccessor::Visit_avx2(
rows_, column_id, num_rows_to_append, row_ids,
[&](int i, const uint8_t* row_ptr_base, __m256i offset_lo, __m256i offset_hi,
__m256i num_bytes) {
// offsets[i] = num_bytes;
});
for (int i = 0; i < num_rows_processed; ++i) {
uint32_t length = offsets[i];
offsets[i] = sum;
sum += length;
}
offsets[num_rows_to_append] = sum;
offsets[num_rows_processed] = sum;

return num_rows_to_append;
return num_rows_processed;
}

int RowArray::DecodeVarLength_avx2(ResizableArrayData* output, int output_start_row,
int column_id, int num_rows_to_append,
const uint32_t* row_ids) const {
RowArrayAccessor::Visit_avx2(rows_, column_id, num_rows_to_append, row_ids,
[&](int i, const uint8_t* row_ptr_base, __m256i offset_lo,
__m256i offset_hi, __m256i num_bytes) {
// uint64_t* dst = reinterpret_cast<uint64_t*>(
// output->mutable_data(2) + reinterpret_cast<const
// uint32_t*>(
// output->mutable_data(1))[output_start_row
// + i]);
// const uint64_t* src = reinterpret_cast<const
// uint64_t*>(ptr); for (uint32_t word_id = 0;
// word_id < bit_util::CeilDiv(num_bytes,
// sizeof(uint64_t)); ++word_id) {
// arrow::util::SafeStore<uint64_t>(dst + word_id,
// arrow::util::SafeLoad(src
// + word_id));
// }
});

return num_rows_to_append;
int num_rows_processed = RowArrayAccessor::Visit_avx2(
rows_, column_id, num_rows_to_append, row_ids,
[&](int i, const uint8_t* row_ptr_base, __m256i offset_lo, __m256i offset_hi,
__m256i num_bytes) {
// uint64_t* dst = reinterpret_cast<uint64_t*>(
// output->mutable_data(2) + reinterpret_cast<const
// uint32_t*>(
// output->mutable_data(1))[output_start_row
// + i]);
// const uint64_t* src = reinterpret_cast<const
// uint64_t*>(ptr); for (uint32_t word_id = 0;
// word_id < bit_util::CeilDiv(num_bytes,
// sizeof(uint64_t)); ++word_id) {
// arrow::util::SafeStore<uint64_t>(dst + word_id,
// arrow::util::SafeLoad(src
// + word_id));
// }
});

return num_rows_processed;
}

int RowArray::DecodeNulls_avx2(ResizableArrayData* output, int output_start_row,
int column_id, int num_rows_to_append,
const uint32_t* row_ids) const {
RowArrayAccessor::VisitNulls_avx2(
int num_rows_processed = RowArrayAccessor::VisitNulls_avx2(
rows_, column_id, num_rows_to_append, row_ids, [&](int i, uint8_t value) {
bit_util::SetBitTo(output->mutable_data(0), output_start_row + i, value == 0);
});

return num_rows_to_append;
return num_rows_processed;
}

} // namespace acero
Expand Down

0 comments on commit 27bc9da

Please sign in to comment.