Skip to content

Commit

Permalink
Added an are_cols_sorted option to RowTableMetadatato control whether…
Browse files Browse the repository at this point in the history
… we enable column-sorted in RowTableEncoder
  • Loading branch information
ZhangHuiGui committed May 25, 2024
1 parent 7c8ce45 commit ff8fd7d
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 84 deletions.
50 changes: 26 additions & 24 deletions cpp/src/arrow/compute/row/compare_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,11 @@ void KeyCompare::CompareBinaryColumnToRowHelper(
}

template <bool use_selection>
void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map,
LightContext* ctx, const KeyColumnArray& col,
const RowTableImpl& rows,
uint8_t* match_bytevector) {
void KeyCompare::CompareBinaryColumnToRow(
uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
bool are_cols_in_encoding_order, uint8_t* match_bytevector) {
uint32_t num_processed = 0;
#if defined(ARROW_HAVE_RUNTIME_AVX2)
if (ctx->has_avx2()) {
Expand Down Expand Up @@ -165,10 +163,11 @@ void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
CompareBinaryColumnToRowHelper<use_selection>(
offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
left_to_right_map, ctx, col, rows, match_bytevector,
[](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
util::CheckAlignment<uint16_t>(left_base);
util::CheckAlignment<uint16_t>(right_base + offset_right);
[=](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
util::CheckAlignment<uint16_t>(left_base, are_cols_in_encoding_order);
util::CheckAlignment<uint16_t>(right_base + offset_right,
are_cols_in_encoding_order);
uint16_t left = reinterpret_cast<const uint16_t*>(left_base)[irow_left];
uint16_t right = *reinterpret_cast<const uint16_t*>(right_base + offset_right);
return left == right ? 0xff : 0;
Expand All @@ -177,10 +176,11 @@ void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
CompareBinaryColumnToRowHelper<use_selection>(
offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
left_to_right_map, ctx, col, rows, match_bytevector,
[](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
util::CheckAlignment<uint32_t>(left_base);
util::CheckAlignment<uint32_t>(right_base + offset_right);
[=](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
util::CheckAlignment<uint32_t>(left_base, are_cols_in_encoding_order);
util::CheckAlignment<uint32_t>(right_base + offset_right,
are_cols_in_encoding_order);
uint32_t left = reinterpret_cast<const uint32_t*>(left_base)[irow_left];
uint32_t right = *reinterpret_cast<const uint32_t*>(right_base + offset_right);
return left == right ? 0xff : 0;
Expand All @@ -189,10 +189,11 @@ void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
CompareBinaryColumnToRowHelper<use_selection>(
offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
left_to_right_map, ctx, col, rows, match_bytevector,
[](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
util::CheckAlignment<uint64_t>(left_base);
util::CheckAlignment<uint64_t>(right_base + offset_right);
[=](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
util::CheckAlignment<uint64_t>(left_base, are_cols_in_encoding_order);
util::CheckAlignment<uint64_t>(right_base + offset_right,
are_cols_in_encoding_order);
uint64_t left = reinterpret_cast<const uint64_t*>(left_base)[irow_left];
uint64_t right = *reinterpret_cast<const uint64_t*>(right_base + offset_right);
return left == right ? 0xff : 0;
Expand All @@ -201,8 +202,8 @@ void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
CompareBinaryColumnToRowHelper<use_selection>(
offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
left_to_right_map, ctx, col, rows, match_bytevector,
[&col](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
[=, &col](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
uint32_t offset_right) {
uint32_t length = col.metadata().fixed_length;

// Non-zero length guarantees no underflow
Expand All @@ -212,7 +213,8 @@ void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,

const uint64_t* key_left_ptr =
reinterpret_cast<const uint64_t*>(left_base + irow_left * length);
util::CheckAlignment<uint64_t>(right_base + offset_right);
util::CheckAlignment<uint64_t>(right_base + offset_right,
are_cols_in_encoding_order);
const uint64_t* key_right_ptr =
reinterpret_cast<const uint64_t*>(right_base + offset_right);
uint64_t result_or = 0;
Expand Down Expand Up @@ -370,7 +372,7 @@ void KeyCompare::CompareColumnsToRows(
if (sel_left_maybe_null) {
CompareBinaryColumnToRow<true>(
offset_within_row, num_rows_to_compare, sel_left_maybe_null,
left_to_right_map, ctx, col, rows,
left_to_right_map, ctx, col, rows, are_cols_in_encoding_order,
is_first_column ? match_bytevector_A : match_bytevector_B);
NullUpdateColumnToRow<true>(
static_cast<uint32_t>(icol), num_rows_to_compare, sel_left_maybe_null,
Expand All @@ -380,7 +382,7 @@ void KeyCompare::CompareColumnsToRows(
// Version without using selection vector
CompareBinaryColumnToRow<false>(
offset_within_row, num_rows_to_compare, sel_left_maybe_null,
left_to_right_map, ctx, col, rows,
left_to_right_map, ctx, col, rows, are_cols_in_encoding_order,
is_first_column ? match_bytevector_A : match_bytevector_B);
NullUpdateColumnToRow<false>(
static_cast<uint32_t>(icol), num_rows_to_compare, sel_left_maybe_null,
Expand Down
12 changes: 5 additions & 7 deletions cpp/src/arrow/compute/row/compare_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,11 @@ class ARROW_EXPORT KeyCompare {
const RowTableImpl& rows, uint8_t* match_bytevector, COMPARE_FN compare_fn);

template <bool use_selection>
static void CompareBinaryColumnToRow(uint32_t offset_within_row,
uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map,
LightContext* ctx, const KeyColumnArray& col,
const RowTableImpl& rows,
uint8_t* match_bytevector);
static void CompareBinaryColumnToRow(
uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
bool are_cols_in_encoding_order, uint8_t* match_bytevector);

template <bool use_selection, bool is_first_varbinary_col>
static void CompareVarBinaryColumnToRowHelper(
Expand Down
64 changes: 64 additions & 0 deletions cpp/src/arrow/compute/row/compare_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,5 +164,69 @@ TEST(KeyCompare, CompareColumnsToRowsTempStackUsage) {
}
}

TEST(KeyCompare, CompareColumnsWithEncodingOrder) {
const int num_rows = 5;

for (auto are_cols_sorted : {true, false}) {
SCOPED_TRACE("are_cols_sorted = " + std::to_string(are_cols_sorted));

MemoryPool* pool = default_memory_pool();
TempVectorStack stack;
ASSERT_OK(stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows)));

auto i32_col = ArrayFromJSON(int32(), "[0, 1, 2, 3, 4]");
auto i64_col = ArrayFromJSON(int64(), "[7, 8, 9, 10, 11]");

// resorted order in RowTableMetadata will be : i64_col, i32_col
ExecBatch batch_right({i32_col, i64_col}, num_rows);

std::vector<KeyColumnMetadata> r_col_metas;
ASSERT_OK(ColumnMetadatasFromExecBatch(batch_right, &r_col_metas));

RowTableMetadata r_table_meta;
r_table_meta.FromColumnMetadataVector(r_col_metas, sizeof(uint64_t), sizeof(uint64_t),
are_cols_sorted);

std::vector<KeyColumnArray> r_column_arrays;
ASSERT_OK(ColumnArraysFromExecBatch(batch_right, &r_column_arrays));

RowTableImpl row_table;
ASSERT_OK(row_table.Init(pool, r_table_meta));

RowTableEncoder row_encoder;
row_encoder.Init(r_col_metas, sizeof(uint64_t), sizeof(uint64_t), are_cols_sorted);
row_encoder.PrepareEncodeSelected(0, num_rows, r_column_arrays);

std::vector<uint16_t> r_row_ids(num_rows);
std::iota(r_row_ids.begin(), r_row_ids.end(), 0);
ASSERT_OK(row_encoder.EncodeSelected(&row_table, num_rows, r_row_ids.data()));

ExecBatch batch_left;
if (are_cols_sorted) {
batch_left.values = {i64_col, i32_col};
} else {
batch_left.values = {i32_col, i64_col};
}
batch_left.length = num_rows;

std::vector<KeyColumnArray> l_column_arrays;
ASSERT_OK(ColumnArraysFromExecBatch(batch_left, &l_column_arrays));

std::vector<uint32_t> l_row_ids(num_rows);
std::iota(l_row_ids.begin(), l_row_ids.end(), 0);

LightContext ctx{CpuInfo::GetInstance()->hardware_flags(), &stack};

uint32_t num_rows_no_match;
std::vector<uint16_t> row_ids_out(num_rows);
KeyCompare::CompareColumnsToRows(
num_rows, NULLPTR, l_row_ids.data(), &ctx, &num_rows_no_match, row_ids_out.data(),
l_column_arrays, row_table, are_cols_sorted, NULLPTR);
// Because the data of batch_left and batch_right are the same, their comparison
// results should be the same regardless of whether are_cols_sorted is true or false.
ASSERT_EQ(num_rows_no_match, 0);
}
}

} // namespace compute
} // namespace arrow
5 changes: 3 additions & 2 deletions cpp/src/arrow/compute/row/encode_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ namespace arrow {
namespace compute {

void RowTableEncoder::Init(const std::vector<KeyColumnMetadata>& cols, int row_alignment,
int string_alignment) {
row_metadata_.FromColumnMetadataVector(cols, row_alignment, string_alignment);
int string_alignment, bool are_columns_sorted) {
row_metadata_.FromColumnMetadataVector(cols, row_alignment, string_alignment,
are_columns_sorted);
uint32_t num_cols = row_metadata_.num_cols();
uint32_t num_varbinary_cols = row_metadata_.num_varbinary_cols();
batch_all_cols_.resize(num_cols);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/row/encode_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace compute {
class ARROW_EXPORT RowTableEncoder {
public:
void Init(const std::vector<KeyColumnMetadata>& cols, int row_alignment,
int string_alignment);
int string_alignment, bool are_columns_sorted = true);

const RowTableMetadata& row_metadata() { return row_metadata_; }
// GrouperFastImpl right now needs somewhat intrusive visibility into RowTableEncoder
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/compute/row/grouper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@ struct GrouperFastImpl : public Grouper {

impl->encoder_.Init(impl->col_metadata_,
/* row_alignment = */ sizeof(uint64_t),
/* string_alignment = */ sizeof(uint64_t));
/* string_alignment = */ sizeof(uint64_t),
/* are_columns_sorted = */ true);
RETURN_NOT_OK(impl->rows_.Init(ctx->memory_pool(), impl->encoder_.row_metadata()));
RETURN_NOT_OK(
impl->rows_minibatch_.Init(ctx->memory_pool(), impl->encoder_.row_metadata()));
Expand All @@ -583,7 +584,7 @@ struct GrouperFastImpl : public Grouper {
num_keys_to_compare, selection_may_be_null, group_ids,
&impl_ptr->encode_ctx_, out_num_keys_mismatch, out_selection_mismatch,
impl_ptr->encoder_.batch_all_cols(), impl_ptr->rows_,
/* are_cols_in_encoding_order=*/true);
impl_ptr->rows_.metadata().are_cols_sorted);
};
impl->map_append_impl_ = [impl_ptr](int num_keys, const uint16_t* selection, void*) {
RETURN_NOT_OK(impl_ptr->encoder_.EncodeSelected(&impl_ptr->rows_minibatch_,
Expand Down
94 changes: 49 additions & 45 deletions cpp/src/arrow/compute/row/row_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,68 +54,72 @@ bool RowTableMetadata::is_compatible(const RowTableMetadata& other) const {

void RowTableMetadata::FromColumnMetadataVector(
const std::vector<KeyColumnMetadata>& cols, int in_row_alignment,
int in_string_alignment) {
int in_string_alignment, bool in_are_cols_sorted) {
column_metadatas.resize(cols.size());
for (size_t i = 0; i < cols.size(); ++i) {
column_metadatas[i] = cols[i];
}

const auto num_cols = static_cast<uint32_t>(cols.size());

// Sort columns.
//
// Columns are sorted based on the size in bytes of their fixed-length part.
// For the varying-length column, the fixed-length part is the 32-bit field storing
// cumulative length of varying-length fields. This is to make the memory access of
// each individual column within the encoded row alignment-friendly.
//
// The rules are:
//
// a) Boolean column, marked with fixed-length 0, is considered to have fixed-length
// part of 1 byte.
//
// b) Columns with fixed-length part being power of 2 or multiple of row
// alignment precede other columns. They are sorted in decreasing order of the size of
// their fixed-length part.
//
// c) Fixed-length columns precede varying-length columns when
// both have the same size fixed-length part.
//
column_order.resize(num_cols);
for (uint32_t i = 0; i < num_cols; ++i) {
column_order[i] = i;
}
std::sort(
column_order.begin(), column_order.end(), [&cols](uint32_t left, uint32_t right) {
bool is_left_pow2 =
!cols[left].is_fixed_length || ARROW_POPCOUNT64(cols[left].fixed_length) <= 1;
bool is_right_pow2 = !cols[right].is_fixed_length ||
ARROW_POPCOUNT64(cols[right].fixed_length) <= 1;
bool is_left_fixedlen = cols[left].is_fixed_length;
bool is_right_fixedlen = cols[right].is_fixed_length;
uint32_t width_left =
cols[left].is_fixed_length ? cols[left].fixed_length : sizeof(uint32_t);
uint32_t width_right =
cols[right].is_fixed_length ? cols[right].fixed_length : sizeof(uint32_t);
if (is_left_pow2 != is_right_pow2) {
return is_left_pow2;
}
if (!is_left_pow2) {

if (in_are_cols_sorted) {
// Sort columns.
//
// Columns are sorted based on the size in bytes of their fixed-length part.
// For the varying-length column, the fixed-length part is the 32-bit field storing
// cumulative length of varying-length fields. This is to make the memory access of
// each individual column within the encoded row alignment-friendly.
//
// The rules are:
//
// a) Boolean column, marked with fixed-length 0, is considered to have fixed-length
// part of 1 byte.
//
// b) Columns with fixed-length part being power of 2 or multiple of row
// alignment precede other columns. They are sorted in decreasing order of the size of
// their fixed-length part.
//
// c) Fixed-length columns precede varying-length columns when
// both have the same size fixed-length part.
//
std::sort(
column_order.begin(), column_order.end(), [&cols](uint32_t left, uint32_t right) {
bool is_left_pow2 = !cols[left].is_fixed_length ||
ARROW_POPCOUNT64(cols[left].fixed_length) <= 1;
bool is_right_pow2 = !cols[right].is_fixed_length ||
ARROW_POPCOUNT64(cols[right].fixed_length) <= 1;
bool is_left_fixedlen = cols[left].is_fixed_length;
bool is_right_fixedlen = cols[right].is_fixed_length;
uint32_t width_left =
cols[left].is_fixed_length ? cols[left].fixed_length : sizeof(uint32_t);
uint32_t width_right =
cols[right].is_fixed_length ? cols[right].fixed_length : sizeof(uint32_t);
if (is_left_pow2 != is_right_pow2) {
return is_left_pow2;
}
if (!is_left_pow2) {
return left < right;
}
if (width_left != width_right) {
return width_left > width_right;
}
if (is_left_fixedlen != is_right_fixedlen) {
return is_left_fixedlen;
}
return left < right;
}
if (width_left != width_right) {
return width_left > width_right;
}
if (is_left_fixedlen != is_right_fixedlen) {
return is_left_fixedlen;
}
return left < right;
});
});
}
inverse_column_order.resize(num_cols);
for (uint32_t i = 0; i < num_cols; ++i) {
inverse_column_order[column_order[i]] = i;
}

are_cols_sorted = in_are_cols_sorted;
row_alignment = in_row_alignment;
string_alignment = in_string_alignment;
varbinary_end_array_offset = 0;
Expand Down
8 changes: 7 additions & 1 deletion cpp/src/arrow/compute/row/row_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ struct ARROW_EXPORT RowTableMetadata {
/// Offsets within a row to fields in their encoding order.
std::vector<uint32_t> column_offsets;

/// \brief True if columns are sorted based on the size in bytes of
/// their fixed-length part. This is to make the memory access of
/// each individual column within the encoded row alignment-friendly
bool are_cols_sorted;

/// Rounding up offset to the nearest multiple of alignment value.
/// Alignment must be a power of 2.
static inline uint32_t padding_for_alignment(uint32_t offset, int required_alignment) {
Expand Down Expand Up @@ -144,7 +149,8 @@ struct ARROW_EXPORT RowTableMetadata {

/// \brief Populate this instance to describe `cols` with the given alignment
void FromColumnMetadataVector(const std::vector<KeyColumnMetadata>& cols,
int in_row_alignment, int in_string_alignment);
int in_row_alignment, int in_string_alignment,
bool in_are_cols_sorted = true);

/// \brief True if `other` has the same number of columns
/// and each column has the same width (two variable length
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/compute/util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ namespace arrow {
namespace util {

template <typename T>
void CheckAlignment(const void* ptr) {
ARROW_DCHECK(reinterpret_cast<uint64_t>(ptr) % sizeof(T) == 0);
void CheckAlignment(const void* ptr, bool do_check = true) {
if (ARROW_PREDICT_TRUE(do_check)) {
ARROW_DCHECK(reinterpret_cast<uint64_t>(ptr) % sizeof(T) == 0);
}
}

/// Storage used to allocate temporary vectors of a batch size.
Expand Down

0 comments on commit ff8fd7d

Please sign in to comment.