From 560de84329a395789a650ca6ddb18286eabe48c2 Mon Sep 17 00:00:00 2001 From: ZhangHuiGui Date: Wed, 13 Mar 2024 17:13:07 +0800 Subject: [PATCH] fix --- cpp/src/arrow/compute/key_hash.cc | 68 ++++++++------------------ cpp/src/arrow/compute/key_hash.h | 19 +++++++ cpp/src/arrow/compute/key_hash_test.cc | 27 ++++++++++ cpp/src/arrow/compute/util.cc | 9 +++- cpp/src/arrow/compute/util.h | 12 +++-- 5 files changed, 82 insertions(+), 53 deletions(-) diff --git a/cpp/src/arrow/compute/key_hash.cc b/cpp/src/arrow/compute/key_hash.cc index cae8ac4a3d4c8..f8fcc1be82cad 100644 --- a/cpp/src/arrow/compute/key_hash.cc +++ b/cpp/src/arrow/compute/key_hash.cc @@ -384,6 +384,15 @@ void Hashing32::HashMultiColumn(const std::vector& cols, constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength; const uint32_t alloc_batch_size = std::min(num_rows, max_batch_size); + const int64_t estimate_alloc_size = EstimateBatchStackSize(alloc_batch_size); + + util::TempVectorStack temp_stack; + if (!ctx->stack) { + ARROW_CHECK_OK(temp_stack.Init(default_memory_pool(), estimate_alloc_size)); + ctx->stack = &temp_stack; + } else { + ctx->stack->CheckAllocSizeValid(estimate_alloc_size); + } auto hash_temp_buf = util::TempVectorHolder(ctx->stack, alloc_batch_size); uint32_t* hash_temp = hash_temp_buf.mutable_data(); @@ -473,30 +482,7 @@ Status Hashing32::HashBatch(const ExecBatch& key_batch, uint32_t* hashes, LightContext ctx; ctx.hardware_flags = hardware_flags; - - const int64_t alloc_entry_length = column_arrays[0].length(); - auto estimate_size = [&] { - // An estimate TempVectorStack usage size for Hashing32::HashMultiColumm. - const int64_t alloc_size1 = - 2 * (alloc_entry_length * sizeof(uint32_t) + util::TempVectorStack::meta_size()); - const int64_t alloc_size2 = - alloc_entry_length * sizeof(uint16_t) + util::TempVectorStack::meta_size(); - return alloc_size1 + alloc_size2; - }; - - if (!temp_stack) { - util::TempVectorStack stack; - RETURN_NOT_OK(stack.Init(default_memory_pool(), estimate_size())); - ctx.stack = std::move(&stack); - } else { - auto estimate_alloc_size = estimate_size(); - ARROW_CHECK_GE(temp_stack->buffer_size(), estimate_alloc_size) - << "TempVectorStack's init" - " size is not enough. (actual " - << temp_stack->buffer_size() << "Bytes, expect " << estimate_alloc_size - << "Bytes)"; - ctx.stack = temp_stack; - } + ctx.stack = temp_stack; HashMultiColumn(column_arrays, &ctx, hashes); return Status::OK(); @@ -853,6 +839,15 @@ void Hashing64::HashMultiColumn(const std::vector& cols, constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength; const uint32_t alloc_batch_size = std::min(num_rows, max_batch_size); + const uint64_t estimate_alloc_size = EstimateBatchStackSize(alloc_batch_size); + + util::TempVectorStack temp_stack; + if (!ctx->stack) { + ARROW_CHECK_OK(temp_stack.Init(default_memory_pool(), estimate_alloc_size)); + ctx->stack = &temp_stack; + } else { + ctx->stack->CheckAllocSizeValid(estimate_alloc_size); + } auto null_indices_buf = util::TempVectorHolder(ctx->stack, alloc_batch_size); uint16_t* null_indices = null_indices_buf.mutable_data(); @@ -936,30 +931,7 @@ Status Hashing64::HashBatch(const ExecBatch& key_batch, uint64_t* hashes, LightContext ctx; ctx.hardware_flags = hardware_flags; - - const int64_t alloc_entry_length = column_arrays[0].length(); - auto estimate_size = [&] { - // An estimate TempVectorStack usage size for Hashing64::HashMultiColumm. - const int64_t alloc_size1 = - alloc_entry_length * sizeof(uint64_t) + util::TempVectorStack::meta_size(); - const int64_t alloc_size2 = - alloc_entry_length * sizeof(uint16_t) + util::TempVectorStack::meta_size(); - return alloc_size1 + alloc_size2; - }; - - if (!temp_stack) { - util::TempVectorStack stack; - RETURN_NOT_OK(stack.Init(default_memory_pool(), estimate_size())); - ctx.stack = std::move(&stack); - } else { - auto estimate_alloc_size = estimate_size(); - ARROW_CHECK_GE(temp_stack->buffer_size(), estimate_alloc_size) - << "TempVectorStack's init" - " size is not enough. (actual " - << temp_stack->buffer_size() << "Bytes, expect " << estimate_alloc_size - << "Bytes)"; - ctx.stack = temp_stack; - } + ctx.stack = temp_stack; HashMultiColumn(column_arrays, &ctx, hashes); return Status::OK(); diff --git a/cpp/src/arrow/compute/key_hash.h b/cpp/src/arrow/compute/key_hash.h index 1173df5ed103e..dcb3f867980f9 100644 --- a/cpp/src/arrow/compute/key_hash.h +++ b/cpp/src/arrow/compute/key_hash.h @@ -219,5 +219,24 @@ class ARROW_EXPORT Hashing64 { const uint8_t* keys, uint64_t* hashes); }; +template +static int64_t EstimateBatchStackSize(int32_t batch_size) { + if (sizeof(T) == sizeof(uint32_t)) { + const int64_t alloc_for_hash_temp_buf = + util::TempVectorStack::EstimateAllocSize(batch_size * sizeof(uint32_t)); + const int64_t alloc_for_null_hash_temp_buf = alloc_for_hash_temp_buf; + const int64_t alloc_for_null_indices_buf = + util::TempVectorStack::EstimateAllocSize(batch_size * sizeof(uint16_t)); + return alloc_for_hash_temp_buf + alloc_for_null_hash_temp_buf + + alloc_for_null_indices_buf; + } else { + const int64_t alloc_for_null_hash_temp_buf = + util::TempVectorStack::EstimateAllocSize(batch_size * sizeof(uint64_t)); + const int64_t alloc_for_null_indices_buf = + util::TempVectorStack::EstimateAllocSize(batch_size * sizeof(uint16_t)); + return alloc_for_null_hash_temp_buf + alloc_for_null_indices_buf; + } +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/key_hash_test.cc b/cpp/src/arrow/compute/key_hash_test.cc index c998df7169c4a..1cc0c19876ff6 100644 --- a/cpp/src/arrow/compute/key_hash_test.cc +++ b/cpp/src/arrow/compute/key_hash_test.cc @@ -311,5 +311,32 @@ TEST(VectorHash, FixedLengthTailByteSafety) { HashFixedLengthFrom(/*key_length=*/19, /*num_rows=*/64, /*start_row=*/63); } +TEST(HashBatch, AllocTempStackAsNeeded) { + auto arr = arrow::ArrayFromJSON(arrow::int32(), "[9,2,6]"); + const int batch_len = arr->length(); + arrow::compute::ExecBatch exec_batch({arr}, batch_len); + auto ctx = arrow::compute::default_exec_context(); + std::vector temp_column_arrays; + + // alloc stack by HashBatch internal + std::vector h1(batch_len); + ASSERT_OK(arrow::compute::Hashing32::HashBatch( + exec_batch, h1.data(), temp_column_arrays, ctx->cpu_info()->hardware_flags(), + nullptr, 0, batch_len)); + + // alloc stack as HashBatch needed. + util::TempVectorStack stack; + ASSERT_OK( + stack.Init(default_memory_pool(), EstimateBatchStackSize(batch_len))); + std::vector h2(batch_len); + ASSERT_OK(arrow::compute::Hashing32::HashBatch( + exec_batch, h2.data(), temp_column_arrays, ctx->cpu_info()->hardware_flags(), + &stack, 0, batch_len)); + + for (int i = 0; i < batch_len; i++) { + EXPECT_EQ(h1[i], h2[i]); + } +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/util.cc b/cpp/src/arrow/compute/util.cc index 2058ba9f30757..078292e7827f5 100644 --- a/cpp/src/arrow/compute/util.cc +++ b/cpp/src/arrow/compute/util.cc @@ -35,7 +35,7 @@ void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) { int64_t new_top = top_ + PaddedAllocationSize(num_bytes) + 2 * sizeof(uint64_t); // Stack overflow check (see GH-39582). // XXX cannot return a regular Status because most consumers do not either. - ARROW_CHECK_LE(new_top, buffer_size_) << "TempVectorStack::alloc overflow"; + CheckAllocSizeValid(new_top); *data = buffer_->mutable_data() + top_ + sizeof(uint64_t); // We set 8 bytes before the beginning of the allocated range and // 8 bytes after the end to check for stack overflow (which would @@ -58,6 +58,13 @@ void TempVectorStack::release(int id, uint32_t num_bytes) { --num_vectors_; } +void TempVectorStack::CheckAllocSizeValid(int64_t estimate_alloc_size) { + ARROW_DCHECK_LE(estimate_alloc_size, buffer_size_) + << "TempVectorStack alloc overflow." + "(Actual " + << buffer_size_ << "Bytes, expect " << estimate_alloc_size << "Bytes)"; +} + namespace bit_util { inline uint64_t SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes) { diff --git a/cpp/src/arrow/compute/util.h b/cpp/src/arrow/compute/util.h index cccb1930a0fa1..c2334db032884 100644 --- a/cpp/src/arrow/compute/util.h +++ b/cpp/src/arrow/compute/util.h @@ -89,7 +89,7 @@ class ARROW_EXPORT TempVectorStack { Status Init(MemoryPool* pool, int64_t size) { num_vectors_ = 0; top_ = 0; - buffer_size_ = PaddedAllocationSize(size) + kPadding + 2 * sizeof(uint64_t); + buffer_size_ = PaddedAllocationSize(size) + 2 * sizeof(uint64_t); ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool)); // Ensure later operations don't accidentally read uninitialized memory. std::memset(buffer->mutable_data(), 0xFF, size); @@ -97,11 +97,15 @@ class ARROW_EXPORT TempVectorStack { return Status::OK(); } - const int64_t buffer_size() const { return buffer_size_; } - static int64_t meta_size() { return kPadding + 2 * sizeof(uint64_t); } + static int64_t EstimateAllocSize(int64_t size) { + return PaddedAllocationSize(size) + 2 * sizeof(uint64_t); + } + + int64_t StackBufferSize() const { return buffer_size_; } + void CheckAllocSizeValid(int64_t estimate_alloc_size); private: - int64_t PaddedAllocationSize(int64_t num_bytes) { + static int64_t PaddedAllocationSize(int64_t num_bytes) { // Round up allocation size to multiple of 8 bytes // to avoid returning temp vectors with unaligned address. //