Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhangHuiGui committed Mar 13, 2024
1 parent d12066b commit 560de84
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 53 deletions.
68 changes: 20 additions & 48 deletions cpp/src/arrow/compute/key_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,15 @@ void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,

constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
const uint32_t alloc_batch_size = std::min(num_rows, max_batch_size);
const int64_t estimate_alloc_size = EstimateBatchStackSize<uint32_t>(alloc_batch_size);

util::TempVectorStack temp_stack;
if (!ctx->stack) {
ARROW_CHECK_OK(temp_stack.Init(default_memory_pool(), estimate_alloc_size));
ctx->stack = &temp_stack;
} else {
ctx->stack->CheckAllocSizeValid(estimate_alloc_size);
}

auto hash_temp_buf = util::TempVectorHolder<uint32_t>(ctx->stack, alloc_batch_size);
uint32_t* hash_temp = hash_temp_buf.mutable_data();
Expand Down Expand Up @@ -473,30 +482,7 @@ Status Hashing32::HashBatch(const ExecBatch& key_batch, uint32_t* hashes,

LightContext ctx;
ctx.hardware_flags = hardware_flags;

const int64_t alloc_entry_length = column_arrays[0].length();
auto estimate_size = [&] {
// An estimate TempVectorStack usage size for Hashing32::HashMultiColumm.
const int64_t alloc_size1 =
2 * (alloc_entry_length * sizeof(uint32_t) + util::TempVectorStack::meta_size());
const int64_t alloc_size2 =
alloc_entry_length * sizeof(uint16_t) + util::TempVectorStack::meta_size();
return alloc_size1 + alloc_size2;
};

if (!temp_stack) {
util::TempVectorStack stack;
RETURN_NOT_OK(stack.Init(default_memory_pool(), estimate_size()));
ctx.stack = std::move(&stack);
} else {
auto estimate_alloc_size = estimate_size();
ARROW_CHECK_GE(temp_stack->buffer_size(), estimate_alloc_size)
<< "TempVectorStack's init"
" size is not enough. (actual "
<< temp_stack->buffer_size() << "Bytes, expect " << estimate_alloc_size
<< "Bytes)";
ctx.stack = temp_stack;
}
ctx.stack = temp_stack;

HashMultiColumn(column_arrays, &ctx, hashes);
return Status::OK();
Expand Down Expand Up @@ -853,6 +839,15 @@ void Hashing64::HashMultiColumn(const std::vector<KeyColumnArray>& cols,

constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
const uint32_t alloc_batch_size = std::min(num_rows, max_batch_size);
const uint64_t estimate_alloc_size = EstimateBatchStackSize<uint64_t>(alloc_batch_size);

util::TempVectorStack temp_stack;
if (!ctx->stack) {
ARROW_CHECK_OK(temp_stack.Init(default_memory_pool(), estimate_alloc_size));
ctx->stack = &temp_stack;
} else {
ctx->stack->CheckAllocSizeValid(estimate_alloc_size);
}

auto null_indices_buf = util::TempVectorHolder<uint16_t>(ctx->stack, alloc_batch_size);
uint16_t* null_indices = null_indices_buf.mutable_data();
Expand Down Expand Up @@ -936,30 +931,7 @@ Status Hashing64::HashBatch(const ExecBatch& key_batch, uint64_t* hashes,

LightContext ctx;
ctx.hardware_flags = hardware_flags;

const int64_t alloc_entry_length = column_arrays[0].length();
auto estimate_size = [&] {
// An estimate TempVectorStack usage size for Hashing64::HashMultiColumm.
const int64_t alloc_size1 =
alloc_entry_length * sizeof(uint64_t) + util::TempVectorStack::meta_size();
const int64_t alloc_size2 =
alloc_entry_length * sizeof(uint16_t) + util::TempVectorStack::meta_size();
return alloc_size1 + alloc_size2;
};

if (!temp_stack) {
util::TempVectorStack stack;
RETURN_NOT_OK(stack.Init(default_memory_pool(), estimate_size()));
ctx.stack = std::move(&stack);
} else {
auto estimate_alloc_size = estimate_size();
ARROW_CHECK_GE(temp_stack->buffer_size(), estimate_alloc_size)
<< "TempVectorStack's init"
" size is not enough. (actual "
<< temp_stack->buffer_size() << "Bytes, expect " << estimate_alloc_size
<< "Bytes)";
ctx.stack = temp_stack;
}
ctx.stack = temp_stack;

HashMultiColumn(column_arrays, &ctx, hashes);
return Status::OK();
Expand Down
19 changes: 19 additions & 0 deletions cpp/src/arrow/compute/key_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,5 +219,24 @@ class ARROW_EXPORT Hashing64 {
const uint8_t* keys, uint64_t* hashes);
};

template <typename T = uint32_t>
static int64_t EstimateBatchStackSize(int32_t batch_size) {
if (sizeof(T) == sizeof(uint32_t)) {
const int64_t alloc_for_hash_temp_buf =
util::TempVectorStack::EstimateAllocSize(batch_size * sizeof(uint32_t));
const int64_t alloc_for_null_hash_temp_buf = alloc_for_hash_temp_buf;
const int64_t alloc_for_null_indices_buf =
util::TempVectorStack::EstimateAllocSize(batch_size * sizeof(uint16_t));
return alloc_for_hash_temp_buf + alloc_for_null_hash_temp_buf +
alloc_for_null_indices_buf;
} else {
const int64_t alloc_for_null_hash_temp_buf =
util::TempVectorStack::EstimateAllocSize(batch_size * sizeof(uint64_t));
const int64_t alloc_for_null_indices_buf =
util::TempVectorStack::EstimateAllocSize(batch_size * sizeof(uint16_t));
return alloc_for_null_hash_temp_buf + alloc_for_null_indices_buf;
}
}

} // namespace compute
} // namespace arrow
27 changes: 27 additions & 0 deletions cpp/src/arrow/compute/key_hash_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,5 +311,32 @@ TEST(VectorHash, FixedLengthTailByteSafety) {
HashFixedLengthFrom(/*key_length=*/19, /*num_rows=*/64, /*start_row=*/63);
}

TEST(HashBatch, AllocTempStackAsNeeded) {
auto arr = arrow::ArrayFromJSON(arrow::int32(), "[9,2,6]");
const int batch_len = arr->length();
arrow::compute::ExecBatch exec_batch({arr}, batch_len);
auto ctx = arrow::compute::default_exec_context();
std::vector<arrow::compute::KeyColumnArray> temp_column_arrays;

// alloc stack by HashBatch internal
std::vector<uint32_t> h1(batch_len);
ASSERT_OK(arrow::compute::Hashing32::HashBatch(
exec_batch, h1.data(), temp_column_arrays, ctx->cpu_info()->hardware_flags(),
nullptr, 0, batch_len));

// alloc stack as HashBatch needed.
util::TempVectorStack stack;
ASSERT_OK(
stack.Init(default_memory_pool(), EstimateBatchStackSize<int32_t>(batch_len)));
std::vector<uint32_t> h2(batch_len);
ASSERT_OK(arrow::compute::Hashing32::HashBatch(
exec_batch, h2.data(), temp_column_arrays, ctx->cpu_info()->hardware_flags(),
&stack, 0, batch_len));

for (int i = 0; i < batch_len; i++) {
EXPECT_EQ(h1[i], h2[i]);
}
}

} // namespace compute
} // namespace arrow
9 changes: 8 additions & 1 deletion cpp/src/arrow/compute/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) {
int64_t new_top = top_ + PaddedAllocationSize(num_bytes) + 2 * sizeof(uint64_t);
// Stack overflow check (see GH-39582).
// XXX cannot return a regular Status because most consumers do not either.
ARROW_CHECK_LE(new_top, buffer_size_) << "TempVectorStack::alloc overflow";
CheckAllocSizeValid(new_top);
*data = buffer_->mutable_data() + top_ + sizeof(uint64_t);
// We set 8 bytes before the beginning of the allocated range and
// 8 bytes after the end to check for stack overflow (which would
Expand All @@ -58,6 +58,13 @@ void TempVectorStack::release(int id, uint32_t num_bytes) {
--num_vectors_;
}

void TempVectorStack::CheckAllocSizeValid(int64_t estimate_alloc_size) {
ARROW_DCHECK_LE(estimate_alloc_size, buffer_size_)
<< "TempVectorStack alloc overflow."
"(Actual "
<< buffer_size_ << "Bytes, expect " << estimate_alloc_size << "Bytes)";
}

namespace bit_util {

inline uint64_t SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes) {
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/arrow/compute/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,23 @@ class ARROW_EXPORT TempVectorStack {
Status Init(MemoryPool* pool, int64_t size) {
num_vectors_ = 0;
top_ = 0;
buffer_size_ = PaddedAllocationSize(size) + kPadding + 2 * sizeof(uint64_t);
buffer_size_ = PaddedAllocationSize(size) + 2 * sizeof(uint64_t);
ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool));
// Ensure later operations don't accidentally read uninitialized memory.
std::memset(buffer->mutable_data(), 0xFF, size);
buffer_ = std::move(buffer);
return Status::OK();
}

const int64_t buffer_size() const { return buffer_size_; }
static int64_t meta_size() { return kPadding + 2 * sizeof(uint64_t); }
static int64_t EstimateAllocSize(int64_t size) {
return PaddedAllocationSize(size) + 2 * sizeof(uint64_t);
}

int64_t StackBufferSize() const { return buffer_size_; }
void CheckAllocSizeValid(int64_t estimate_alloc_size);

private:
int64_t PaddedAllocationSize(int64_t num_bytes) {
static int64_t PaddedAllocationSize(int64_t num_bytes) {
// Round up allocation size to multiple of 8 bytes
// to avoid returning temp vectors with unaligned address.
//
Expand Down

0 comments on commit 560de84

Please sign in to comment.