forked from apache/arrow
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
apacheGH-17211: refresh history for scalar_hash kernel
This commit ports the latest state of scalar_hash kernels without pulling a long development history. This kernel is an element-wise function that uses an xxHash-like algorithm, prioritizing speed and not suitable for cryptographic purposes. The function is implemented by the `FastHashScalar` struct which is templated by the output type (which is assumed to be either UInt32 or UInt64, but there is no validation of that at the moment). The benchmarks in scalar_hash_benchmark.cc uses the hashing_benchmark.cc file as a reference (in cpp/src/arrow/util/), but only covers various input types and the key hashing functions (from key_hash.h). The tests in scalar_hash_test.cc use a simplified version of hashing based on what is implemented in the key_hash.cc. The idea being that the high-level entry points for high-level types should eventually reach an expected application of the low-level hash functions on simple data types; the tests do this exact comparison. At the moment, the tests pass for simple cases, but they do not work for nested types with non-trivial row layouts (e.g. ListTypes). Issue: ARROW-8991 Issue: apacheGH-17211
- Loading branch information
Showing
3 changed files
with
590 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
/** | ||
* @file scalar_hash.cc | ||
* @brief Element-wise (scalar) kernels for hashing values. | ||
*/ | ||
|
||
#include <algorithm> | ||
//#include <iostream> | ||
|
||
#include "arrow/array/array_base.h" | ||
#include "arrow/array/builder_primitive.h" | ||
#include "arrow/compute/key_hash.h" | ||
#include "arrow/compute/util.h" | ||
#include "arrow/compute/kernels/common_internal.h" | ||
#include "arrow/compute/light_array.h" | ||
#include "arrow/result.h" | ||
|
||
namespace arrow { | ||
namespace compute { | ||
namespace internal { | ||
|
||
// Define symbols visible within `arrow::compute::internal` in this file; | ||
// these symbols are not visible outside of this file. | ||
namespace { | ||
|
||
// Function documentation | ||
const FunctionDoc hash_64_doc{ | ||
"Construct a hash for every element of the input argument", | ||
("An element-wise function that uses an xxHash-like algorithm.\n" | ||
"This function is not suitable for cryptographic purposes.\n" | ||
"Hash results are 64-bit and emitted for each valid row.\n" | ||
"Null (or invalid) rows emit a null in the output."), | ||
{"hash_input"}}; | ||
|
||
// ------------------------------ | ||
// Kernel implementations | ||
// It is expected that HashArrowType is either UInt32Type or UInt64Type (default) | ||
template <typename HashArrowType = UInt64Type> | ||
struct FastHashScalar { | ||
using OutputCType = typename TypeTraits<HashArrowType>::CType; | ||
using KeyColumnArrayVec = std::vector<KeyColumnArray>; | ||
|
||
// Internal wrapper functions to resolve Hashing32 vs Hashing64 using parameter types | ||
static void FastHashMultiColumn(KeyColumnArrayVec& cols, LightContext* ctx, | ||
uint32_t* hashes) { | ||
Hashing32::HashMultiColumn(cols, ctx, hashes); | ||
} | ||
|
||
static void FastHashMultiColumn(KeyColumnArrayVec& cols, LightContext* ctx, | ||
uint64_t* hashes) { | ||
Hashing64::HashMultiColumn(cols, ctx, hashes); | ||
} | ||
|
||
static Status Exec(KernelContext* ctx, const ExecSpan& input_arg, ExecResult* out) { | ||
if (input_arg.num_values() != 1 || !input_arg[0].is_array()) { | ||
return Status::Invalid("FastHash currently supports a single array input"); | ||
} | ||
ArraySpan hash_input = input_arg[0].array; | ||
|
||
auto exec_ctx = default_exec_context(); | ||
if (ctx && ctx->exec_context()) { | ||
exec_ctx = ctx->exec_context(); | ||
} | ||
|
||
// Initialize stack-based memory allocator used by Hashing32 and Hashing64 | ||
util::TempVectorStack stack_memallocator; | ||
ARROW_RETURN_NOT_OK( | ||
stack_memallocator.Init(exec_ctx->memory_pool(), | ||
3 * sizeof(int32_t) * util::MiniBatch::kMiniBatchLength)); | ||
|
||
// Prepare context used by Hashing32 and Hashing64 | ||
LightContext hash_ctx; | ||
hash_ctx.hardware_flags = exec_ctx->cpu_info()->hardware_flags(); | ||
hash_ctx.stack = &stack_memallocator; | ||
|
||
// Construct vector<KeyColumnArray> from input ArraySpan; this essentially | ||
// flattens the input array span, lifting nested Array buffers into a single level | ||
ARROW_ASSIGN_OR_RAISE(KeyColumnArrayVec input_keycols, | ||
ColumnArraysFromArraySpan(hash_input, hash_input.length)); | ||
|
||
// Call the hashing function, overloaded based on OutputCType | ||
ArraySpan* result_span = out->array_span_mutable(); | ||
FastHashMultiColumn(input_keycols, &hash_ctx, result_span->GetValues<OutputCType>(1)); | ||
|
||
return Status::OK(); | ||
} | ||
}; | ||
|
||
// ------------------------------ | ||
// Function construction and kernel registration | ||
std::shared_ptr<ScalarFunction> RegisterKernelsFastHash64() { | ||
// Create function instance | ||
auto fn_hash_64 = | ||
std::make_shared<ScalarFunction>("hash_64", Arity::Unary(), hash_64_doc); | ||
|
||
// Associate kernel with function | ||
for (auto& simple_inputtype : PrimitiveTypes()) { | ||
DCHECK_OK(fn_hash_64->AddKernel({InputType(simple_inputtype)}, OutputType(uint64()), | ||
FastHashScalar<UInt64Type>::Exec)); | ||
} | ||
|
||
for (const auto nested_type : | ||
{Type::STRUCT, Type::DENSE_UNION, Type::SPARSE_UNION, Type::LIST, | ||
Type::FIXED_SIZE_LIST, Type::MAP, Type::DICTIONARY}) { | ||
DCHECK_OK(fn_hash_64->AddKernel({InputType(nested_type)}, OutputType(uint64()), | ||
FastHashScalar<UInt64Type>::Exec)); | ||
} | ||
|
||
// Return function to be registered | ||
return fn_hash_64; | ||
} | ||
|
||
} // namespace | ||
|
||
void RegisterScalarHash(FunctionRegistry* registry) { | ||
auto fn_scalarhash64 = RegisterKernelsFastHash64(); | ||
DCHECK_OK(registry->AddFunction(std::move(fn_scalarhash64))); | ||
} | ||
|
||
} // namespace internal | ||
} // namespace compute | ||
} // namespace arrow |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#include <algorithm> | ||
#include <cstdint> | ||
#include <limits> | ||
#include <random> | ||
#include <string> | ||
#include <vector> | ||
|
||
#include "benchmark/benchmark.h" | ||
|
||
#include "arrow/testing/gtest_util.h" | ||
#include "arrow/testing/random.h" | ||
#include "arrow/util/hashing.h" | ||
|
||
#include "arrow/array/array_nested.h" | ||
#include "arrow/compute/exec.h" | ||
|
||
namespace arrow { | ||
namespace internal { | ||
|
||
// ------------------------------ | ||
// Anonymous namespace with global params | ||
|
||
namespace { | ||
// copied from scalar_string_benchmark | ||
constexpr auto kSeed = 0x94378165; | ||
constexpr double null_prob = 0.2; | ||
|
||
static random::RandomArrayGenerator hashing_rng(kSeed); | ||
} // namespace | ||
|
||
// ------------------------------ | ||
// Convenience functions | ||
|
||
static Result<std::shared_ptr<StructArray>> MakeStructArray(int64_t n_values, | ||
int32_t min_strlen, | ||
int32_t max_strlen) { | ||
auto vals_first = hashing_rng.Int64(n_values, 0, std::numeric_limits<int64_t>::max()); | ||
auto vals_second = hashing_rng.String(n_values, min_strlen, max_strlen, null_prob); | ||
auto vals_third = hashing_rng.Int64(n_values, 0, std::numeric_limits<int64_t>::max()); | ||
|
||
return arrow::StructArray::Make( | ||
arrow::ArrayVector{vals_first, vals_second, vals_third}, | ||
arrow::FieldVector{arrow::field("first", arrow::int64()), | ||
arrow::field("second", arrow::utf8()), | ||
arrow::field("third", arrow::int64())}); | ||
} | ||
|
||
// ------------------------------ | ||
// Benchmark implementations | ||
|
||
static void Hash64Int64(benchmark::State& state) { // NOLINT non-const reference | ||
auto test_vals = hashing_rng.Int64(10000, 0, std::numeric_limits<int64_t>::max()); | ||
|
||
while (state.KeepRunning()) { | ||
ASSERT_OK_AND_ASSIGN(Datum hash_result, | ||
compute::CallFunction("hash_64", {test_vals})); | ||
benchmark::DoNotOptimize(hash_result); | ||
} | ||
|
||
state.SetBytesProcessed(state.iterations() * test_vals->length() * sizeof(int64_t)); | ||
state.SetItemsProcessed(state.iterations() * test_vals->length()); | ||
} | ||
|
||
static void Hash64StructSmallStrings( | ||
benchmark::State& state) { // NOLINT non-const reference | ||
ASSERT_OK_AND_ASSIGN(std::shared_ptr<StructArray> values_array, | ||
MakeStructArray(10000, 2, 20)); | ||
|
||
// 2nd column (index 1) is a string column, which has offset type of int32_t | ||
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> values_second, | ||
values_array->GetFlattenedField(1)); | ||
std::shared_ptr<StringArray> str_vals = | ||
std::static_pointer_cast<StringArray>(values_second); | ||
int32_t total_string_size = str_vals->total_values_length(); | ||
|
||
while (state.KeepRunning()) { | ||
ASSERT_OK_AND_ASSIGN(Datum hash_result, | ||
compute::CallFunction("hash_64", {values_array})); | ||
benchmark::DoNotOptimize(hash_result); | ||
} | ||
|
||
state.SetBytesProcessed(state.iterations() * | ||
((values_array->length() * sizeof(int64_t)) + | ||
(total_string_size) + | ||
(values_array->length() * sizeof(int64_t)))); | ||
state.SetItemsProcessed(state.iterations() * 3 * values_array->length()); | ||
} | ||
|
||
static void Hash64StructMediumStrings( | ||
benchmark::State& state) { // NOLINT non-const reference | ||
ASSERT_OK_AND_ASSIGN(std::shared_ptr<StructArray> values_array, | ||
MakeStructArray(10000, 20, 120)); | ||
|
||
// 2nd column (index 1) is a string column, which has offset type of int32_t | ||
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> values_second, | ||
values_array->GetFlattenedField(1)); | ||
std::shared_ptr<StringArray> str_vals = | ||
std::static_pointer_cast<StringArray>(values_second); | ||
int32_t total_string_size = str_vals->total_values_length(); | ||
|
||
while (state.KeepRunning()) { | ||
ASSERT_OK_AND_ASSIGN(Datum hash_result, | ||
compute::CallFunction("hash_64", {values_array})); | ||
benchmark::DoNotOptimize(hash_result); | ||
} | ||
|
||
state.SetBytesProcessed(state.iterations() * | ||
((values_array->length() * sizeof(int64_t)) + | ||
(total_string_size) + | ||
(values_array->length() * sizeof(int64_t)))); | ||
state.SetItemsProcessed(state.iterations() * 3 * values_array->length()); | ||
} | ||
|
||
static void Hash64StructLargeStrings( | ||
benchmark::State& state) { // NOLINT non-const reference | ||
ASSERT_OK_AND_ASSIGN(std::shared_ptr<StructArray> values_array, | ||
MakeStructArray(10000, 120, 2000)); | ||
|
||
// 2nd column (index 1) is a string column, which has offset type of int32_t | ||
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> values_second, | ||
values_array->GetFlattenedField(1)); | ||
std::shared_ptr<StringArray> str_vals = | ||
std::static_pointer_cast<StringArray>(values_second); | ||
int32_t total_string_size = str_vals->total_values_length(); | ||
|
||
while (state.KeepRunning()) { | ||
ASSERT_OK_AND_ASSIGN(Datum hash_result, | ||
compute::CallFunction("hash_64", {values_array})); | ||
benchmark::DoNotOptimize(hash_result); | ||
} | ||
|
||
state.SetBytesProcessed(state.iterations() * | ||
((values_array->length() * sizeof(int64_t)) + | ||
(total_string_size) + | ||
(values_array->length() * sizeof(int64_t)))); | ||
state.SetItemsProcessed(state.iterations() * 3 * values_array->length()); | ||
} | ||
|
||
static void Hash64Map(benchmark::State& state) { // NOLINT non-const reference | ||
constexpr int64_t test_size = 10000; | ||
auto test_keys = hashing_rng.String(test_size, 2, 20, /*null_probability=*/0); | ||
auto test_vals = hashing_rng.Int64(test_size, 0, std::numeric_limits<int64_t>::max()); | ||
auto test_keyvals = hashing_rng.Map(test_keys, test_vals, test_size); | ||
|
||
auto key_arr = std::static_pointer_cast<StringArray>(test_keys); | ||
int32_t total_key_size = key_arr->total_values_length(); | ||
int32_t total_val_size = test_size * sizeof(int64_t); | ||
|
||
while (state.KeepRunning()) { | ||
ASSERT_OK_AND_ASSIGN(Datum hash_result, | ||
compute::CallFunction("hash_64", {test_keyvals})); | ||
benchmark::DoNotOptimize(hash_result); | ||
} | ||
|
||
state.SetBytesProcessed(state.iterations() * (total_key_size + total_val_size)); | ||
state.SetItemsProcessed(state.iterations() * 2 * test_size); | ||
} | ||
|
||
// ------------------------------ | ||
// Benchmark declarations | ||
|
||
// Uses "FastHash" compute functions (wraps KeyHash functions) | ||
BENCHMARK(Hash64Int64); | ||
|
||
BENCHMARK(Hash64StructSmallStrings); | ||
BENCHMARK(Hash64StructMediumStrings); | ||
BENCHMARK(Hash64StructLargeStrings); | ||
|
||
BENCHMARK(Hash64Map); | ||
|
||
} // namespace internal | ||
} // namespace arrow |
Oops, something went wrong.