From a788f067be3b5bde003090bc6764cfeb6864bd8c Mon Sep 17 00:00:00 2001 From: Aldrin Montana Date: Fri, 12 Jan 2024 10:44:36 -0800 Subject: [PATCH] GH-17211: refresh history for updates to key_hash This commit pulls the latest changes to key_hash.h and implementations in light_array without the burden of a long development history. The only change in key_hash.h is the addition of a friend function which is used in scalar_hash_test.cc. Changes in light_array.[h,cc] are to accommodate two scenarios: (1) the use of ArraySpan, which was introduced after light_array was written; and (2) the need for a KeyColumnArray to allocate data for the purposes of interpreting (or decoding) the structure of a nested type. The main reason for the 2nd scenario is that a ListArray may have many values represented in a single row which should be hashed together; however, if the ListArray has a nested ListArray or other type, the row may have further structure. In the simplest interpretation, only the highest-level structure (the "outer" ListArray) needs to be preserved, and any further nested structures must be explicitly handled by custom kernels (or any future options, etc. that are upstreamed into Arrow). In trying to efficiently interpret complex nested types, ArraySpan can be useful because it is non-owning, thus the main reason for the 1st aforementioned scenario. Although unfinished, any tests added to light_array_test.cc should accommodate the 2 scenarios above. --- cpp/src/arrow/compute/key_hash.h | 3 + cpp/src/arrow/compute/light_array.cc | 259 +++++++++++++++++++--- cpp/src/arrow/compute/light_array.h | 24 +- cpp/src/arrow/compute/light_array_test.cc | 34 +++ 4 files changed, 283 insertions(+), 37 deletions(-) diff --git a/cpp/src/arrow/compute/key_hash.h b/cpp/src/arrow/compute/key_hash.h index 1173df5ed103e..ebcd2c02fc1bd 100644 --- a/cpp/src/arrow/compute/key_hash.h +++ b/cpp/src/arrow/compute/key_hash.h @@ -156,6 +156,9 @@ class ARROW_EXPORT Hashing64 { friend void TestBloomLargeHashHelper(int64_t, int64_t, const std::vector&, int64_t, int, T*); friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool); + friend void TestHashVarLen(bool should_incr, uint32_t row_count, + const uint32_t* var_offsets, const uint8_t* var_data, + uint64_t* hash_results); public: static void HashMultiColumn(const std::vector& cols, LightContext* ctx, diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc index b225e04b05cea..d7ca58e40e84e 100644 --- a/cpp/src/arrow/compute/light_array.cc +++ b/cpp/src/arrow/compute/light_array.cc @@ -48,16 +48,16 @@ KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* validity_buffer, uint8_t* fixed_length_buffer, uint8_t* var_length_buffer, int bit_offset_validity, - int bit_offset_fixed) { + int bit_offset_fixed, + const util::TempVectorStack* alloc) { metadata_ = metadata; - length_ = length; - buffers_[kValidityBuffer] = mutable_buffers_[kValidityBuffer] = validity_buffer; - buffers_[kFixedLengthBuffer] = mutable_buffers_[kFixedLengthBuffer] = - fixed_length_buffer; - buffers_[kVariableLengthBuffer] = mutable_buffers_[kVariableLengthBuffer] = - var_length_buffer; - bit_offset_[kValidityBuffer] = bit_offset_validity; + length_ = length; + buffers_[kValidityBuffer] = mutable_buffers_[kValidityBuffer] = validity_buffer; + buffers_[kFixedLengthBuffer] = mutable_buffers_[kFixedLengthBuffer] = fixed_length_buffer; + buffers_[kVariableLengthBuffer] = mutable_buffers_[kVariableLengthBuffer] = var_length_buffer; + bit_offset_[kValidityBuffer] = bit_offset_validity; bit_offset_[kFixedLengthBuffer] = bit_offset_fixed; + arena_alloc = alloc; } KeyColumnArray KeyColumnArray::WithBufferFrom(const KeyColumnArray& other, @@ -83,6 +83,8 @@ KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const { sliced.metadata_ = metadata_; sliced.length_ = length; uint32_t fixed_size = metadata_.fixed_length; + // TODO: see if this is necessary + // uint32_t fixed_size = !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length; sliced.buffers_[0] = buffers_[0] ? buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr; @@ -114,42 +116,229 @@ KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const { return sliced; } -Result ColumnMetadataFromDataType( - const std::shared_ptr& type) { - const bool is_extension = type->id() == Type::EXTENSION; - const std::shared_ptr& typ = - is_extension - ? arrow::internal::checked_pointer_cast(type->GetSharedPtr()) - ->storage_type() - : type; - - if (typ->id() == Type::DICTIONARY) { +Result ColumnMetadataFromDataType(const std::shared_ptr& type) { + // "ptype" is the "physical" type + const DataType* ptype = type->GetSharedPtr().get(); + + // For ExtensionType, use the backing physical type (storage_type() is a shared ptr) + if (ARROW_PREDICT_FALSE(type->id() == Type::EXTENSION)) { + const ExtensionType* ext_type = static_cast(type); + ptype = ext_type->storage_type().get(); + } + + if (ptype->id() == Type::DICTIONARY) { auto bit_width = - arrow::internal::checked_cast(*typ).bit_width(); + arrow::internal::checked_cast(*ptype).bit_width(); ARROW_DCHECK(bit_width % 8 == 0); return KeyColumnMetadata(true, bit_width / 8); } - if (typ->id() == Type::BOOL) { + if (ptype->id() == Type::BOOL) { return KeyColumnMetadata(true, 0); } - if (is_fixed_width(typ->id())) { + if (is_fixed_width(ptype->id())) { return KeyColumnMetadata( - true, arrow::internal::checked_cast(*typ).bit_width() / 8); + true, + arrow::internal::checked_cast(*ptype).bit_width() / 8); } - if (is_binary_like(typ->id())) { + if (is_binary_like(ptype->id())) { return KeyColumnMetadata(false, sizeof(uint32_t)); } - if (is_large_binary_like(typ->id())) { + if (is_large_binary_like(ptype->id())) { return KeyColumnMetadata(false, sizeof(uint64_t)); } - if (typ->id() == Type::NA) { + if (ptype->id() == Type::NA) { return KeyColumnMetadata(true, 0, true); } // Caller attempted to create a KeyColumnArray from an invalid type - return Status::TypeError("Unsupported column data type ", typ->name(), + return Status::TypeError("Unsupported column data type ", ptype->name(), + " used with KeyColumnMetadata"); +} + +Result ColumnMetadataFromDataType( + const std::shared_ptr& type) { + return ColumnMetadataFromDataType(type.get()); +} + +/** + * Constructs metadata that tells hashing functions how to iterate over the + * KeyColumnArray. + * + * This function assumes ColumnMetadataFromDataType has already failed, which makes this + * function distinct because it should only be called when the input Array is flattened in + * a particular way. + */ +Result ColumnMetadataFromListType(const DataType* type) { + if (type->id() == Type::LIST || type->id() == Type::MAP) { + return KeyColumnMetadata(false, sizeof(uint32_t)); + } + else if (type->id() == Type::LARGE_LIST) { + return KeyColumnMetadata(false, sizeof(uint64_t)); + } + // Caller attempted to create a KeyColumnArray from an invalid type + return Status::TypeError("Unsupported column data type ", type->name(), " used with KeyColumnMetadata"); } +Result ColumnMetadataFromListType( + const std::shared_ptr& type) { + return ColumnMetadataFromListType(type.get()); +} + +/** + * Coalesces children of a StructArray into a flattened list of KeyColumnArrays. When + * hashing a StructArray, we want to co-index a list of KeyColumnArrays so that values in + * the same row are combined. + */ +Result ColumnArraysFromStructArray(const ArraySpan& array_span, + int64_t num_rows) { + KeyColumnVector flattened_spans; + flattened_spans.reserve(array_span.child_data.size()); + + // Recurse on each child of the ArraySpan in DFS-order + for (size_t child_ndx = 0; child_ndx < array_span.child_data.size(); ++child_ndx) { + auto child_span = array_span.child_data[child_ndx]; + ARROW_ASSIGN_OR_RAISE(auto child_keycols, + ColumnArraysFromArraySpan(child_span, num_rows)); + + flattened_spans.insert(flattened_spans.end(), child_keycols.begin(), + child_keycols.end()); + } + + return flattened_spans; +} + +/** + * Flattens the data in a ListArray into a list of KeyColumnArrays so that each element in + * the ListArray is properly treated as a row value. Due to semantics of nulls in nested + * arrays and their non-impact on a hash value, nulls in nested arrays are dropped when + * flattened. If a list is null, then that row is considered null and is preserved. + * + * The values buffer of a list type should be propagated to the caller as is, but the + * parent offsets and offsets of the current ArraySpan must be coalesced. Essentially, for + * the purposes of hashing, we don't care about internal structure of a row value so we + * flatten the offsets. + */ + +// TODO: recurse to: (1) flatten offsets, (2) eventually bottom out and grab var data +template +Result ColumnArraysFromListArray(const ArraySpan& array_span, + int64_t num_rows, + const OffsetType* parent_offsets) { + // Construct KeyColumnMetadata + ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata, + ColumnMetadataFromListType(array_span.type)); + + ARROW_LOG(INFO) << "[ListArray] Child count: " + << std::to_string(array_span.child_data.size()); + + // ListArrays have only 1 child + auto child_span = array_span.child_data[0]; + uint8_t* buffer_validity = nullptr; + /* + * TODO: Currently unsupported. + * figure out how the validity bitmap affects flattened lists + *if (child_span.GetBuffer(0) != nullptr) { + * buffer_validity = (uint8_t*)child_span.GetBuffer(0)->data(); + *} + **/ + + // For simple lists or lists containing only list types, this should point to the child + // buffer with all of the values + uint8_t* buffer_varlength = nullptr; + if (child_span.num_buffers() > 2 && child_span.GetBuffer(2) != NULLPTR) { + ARROW_LOG(INFO) << "found list array data"; + buffer_varlength = (uint8_t*)child_span.GetBuffer(2)->data(); + } + else { + ARROW_LOG(INFO) << "child array does not have list array data"; + } + + // TODO + // Lists get flattened to 1 KeyColumnArray; Maps get flattened to 2 (key and value) + KeyColumnArray column_array = + KeyColumnArray(metadata, child_span.offset + num_rows, buffer_validity, + child_span.GetBuffer(1)->data(), buffer_varlength); + + return column_array; +} + + +Result ColumnArrayFromArraySpan(const ArraySpan& array_span, + int64_t num_rows) { + ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata, + ColumnMetadataFromDataType(array_span.type)); + + uint8_t* buffer_validity = nullptr; + if (array_span.GetBuffer(0) != nullptr) { + buffer_validity = (uint8_t*)array_span.GetBuffer(0)->data(); + } + + uint8_t* buffer_varlength = nullptr; + if (array_span.num_buffers() > 2 && array_span.GetBuffer(2) != NULLPTR) { + buffer_varlength = (uint8_t*)array_span.GetBuffer(2)->data(); + } + + KeyColumnArray column_array = + KeyColumnArray(metadata, array_span.offset + num_rows, buffer_validity, + array_span.GetBuffer(1)->data(), buffer_varlength); + + return column_array.Slice(array_span.offset, num_rows); +} + +Result ColumnArraysFromArraySpan(const ArraySpan& array_span, + int64_t num_rows) { + KeyColumnVector flattened_spans; + flattened_spans.reserve(1 + array_span.child_data.size()); + + // Construct a KeyColumnArray from the given ArraySpan + auto keycol_result = ColumnArrayFromArraySpan(array_span, num_rows); + if (keycol_result.ok()) { + flattened_spans.push_back(*keycol_result); + } + + // If ArraySpan data type is not supported, check for supported nested types. + else if (is_nested(array_span.type->id())) { + switch (array_span.type->id()) { + case Type::LIST: + case Type::MAP: { + const uint32_t* list_offsets = (const uint32_t*) array_span.GetBuffer(1)->data(); + ARROW_ASSIGN_OR_RAISE(auto list_keycol, + ColumnArraysFromListArray(array_span, num_rows, + list_offsets)); + + flattened_spans.push_back(list_keycol); + break; + } + + case Type::LARGE_LIST: { + const uint64_t* list_offsets = (const uint64_t*) array_span.GetBuffer(1)->data(); + ARROW_ASSIGN_OR_RAISE(auto list_keycol, + ColumnArraysFromListArray(array_span, num_rows, + list_offsets)); + + flattened_spans.push_back(list_keycol); + break; + } + + case Type::STRUCT: { + ARROW_ASSIGN_OR_RAISE(auto struct_keycols, + ColumnArraysFromStructArray(array_span, num_rows)); + + flattened_spans.insert(flattened_spans.end(), struct_keycols.begin(), + struct_keycols.end()); + break; + } + + default: + // unsupported types include: unions, fixed size list + ARROW_WARN_NOT_OK(keycol_result.status(), "Unsupported nested type for hashing"); + break; + } + } + + return flattened_spans; +} + Result ColumnArrayFromArrayData( const std::shared_ptr& array_data, int64_t start_row, int64_t num_rows) { ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata, @@ -161,12 +350,14 @@ KeyColumnArray ColumnArrayFromArrayDataAndMetadata( const std::shared_ptr& array_data, const KeyColumnMetadata& metadata, int64_t start_row, int64_t num_rows) { KeyColumnArray column_array = KeyColumnArray( - metadata, array_data->offset + start_row + num_rows, - array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr, - array_data->buffers[1]->data(), - (array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR) + metadata + ,array_data->offset + start_row + num_rows + ,array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr + ,array_data->buffers[1]->data() + ,(array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR) ? array_data->buffers[2]->data() - : nullptr); + : nullptr + ); return column_array.Slice(array_data->offset + start_row, num_rows); } @@ -201,10 +392,12 @@ Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t start_row, Status ColumnArraysFromExecBatch(const ExecBatch& batch, std::vector* column_arrays) { - return ColumnArraysFromExecBatch(batch, 0, static_cast(batch.length), - column_arrays); + return ColumnArraysFromExecBatch( + batch, 0, static_cast(batch.length), column_arrays + ); } + void ResizableArrayData::Init(const std::shared_ptr& data_type, MemoryPool* pool, int log_num_rows_min) { #ifndef NDEBUG diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h index 67de71bf56c92..1a2224d3cc83c 100644 --- a/cpp/src/arrow/compute/light_array.h +++ b/cpp/src/arrow/compute/light_array.h @@ -84,6 +84,7 @@ class ARROW_EXPORT KeyColumnArray { public: /// \brief Create an uninitialized KeyColumnArray KeyColumnArray() = default; + /// \brief Create a read-only view from buffers /// /// This is a view only and does not take ownership of the buffers. The lifetime @@ -96,10 +97,14 @@ class ARROW_EXPORT KeyColumnArray { /// /// This is a view only and does not take ownership of the buffers. The lifetime /// of the buffers must exceed the lifetime of this view - KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, - uint8_t* validity_buffer, uint8_t* fixed_length_buffer, - uint8_t* var_length_buffer, int bit_offset_validity = 0, - int bit_offset_fixed = 0); + KeyColumnArray( const KeyColumnMetadata& metadata + ,int64_t length + ,uint8_t* validity_buffer + ,uint8_t* fixed_length_buffer + ,uint8_t* var_length_buffer + ,int bit_offset_validity = 0 + ,int bit_offset_fixed = 0 + ,const util::TempVectorStack *alloc = nullptr); /// \brief Create a sliced view of `this` /// /// The number of rows used in offset must be divisible by 8 @@ -183,6 +188,7 @@ class ARROW_EXPORT KeyColumnArray { // Starting bit offset within the first byte (between 0 and 7) // to be used when accessing buffers that store bit vectors. int bit_offset_[kMaxBuffers - 1]; + const util::TempVectorStack* arena_alloc; bool is_bool_type() const { return metadata_.is_fixed_length && metadata_.fixed_length == 0 && @@ -220,6 +226,15 @@ class ARROW_EXPORT KeyColumnArray { ARROW_EXPORT Result ColumnMetadataFromDataType( const std::shared_ptr& type); +ARROW_EXPORT Result ColumnMetadataFromDataType(const DataType* type); + +/// \brief Create KeyColumnArray from ArraySpan +/// +/// The caller should ensure this is only called on "key" columns. +/// \see ColumnMetadataFromDataType for details +ARROW_EXPORT Result ColumnArraysFromArraySpan( + const ArraySpan& array_span, int64_t num_rows); + /// \brief Create KeyColumnArray from ArrayData /// /// If `type` is a dictionary type then this will return the KeyColumnArray for @@ -269,6 +284,7 @@ ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t st ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, std::vector* column_arrays); + /// A lightweight resizable array for "key" columns /// /// Unlike KeyColumnArray this instance owns its buffers diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index ecc5f3ad37931..7e73c7ac7e783 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -286,6 +286,40 @@ TEST(KeyColumnArray, SliceBinaryTest) { GenericTestSlice(large_binary(), json_test_strings, testCases); } +TEST(KeyColumnArray, TempAllocForHashing) { + std::unique_ptr pool = MemoryPool::CreateDefault(); + + for (const auto& type : kSampleFixedDataTypes) { + ARROW_SCOPED_TRACE("Type: ", type->ToString()); + /* TODO + int row_count = 12; + int byte_width = + arrow::internal::checked_pointer_cast(type)->bit_width() / 8; + + { + KeyColumnPseudoSpan column_span {pool.get()}; + ARROW_ASSIGN_OR_RAISE(uint8_t* buf_data, + column_span.CreateBuffer(1, byte_width * row_count)); + + // TODO: create a test that uses a KeyColumnArray constructed from a + // KeyColumnPseudoSpan + KeyColumnMetadata metadata = ColumnMetadataFromDataType(boolean()).ValueOrDie(); + + KeyColumnArray column_view { + column_span.CreateView( + + int min_bytes_needed_for_values = byte_width; + int min_bytes_needed_for_validity = 1; + int min_bytes_needed = min_bytes_needed_for_values + min_bytes_needed_for_validity; + ASSERT_LT(min_bytes_needed, pool->bytes_allocated()); + ASSERT_GT(min_bytes_needed * 2, pool->bytes_allocated()); + } + */ + // After array is destroyed buffers should be freed + ASSERT_EQ(0, pool->bytes_allocated()); + } +} + TEST(ResizableArrayData, Basic) { std::unique_ptr pool = MemoryPool::CreateDefault(); for (const auto& type : kSampleFixedDataTypes) {