Skip to content

Commit

Permalink
apacheGH-17211: refresh history for updates to key_hash
Browse files Browse the repository at this point in the history
This commit pulls the latest changes to key_hash.h and implementations
in light_array without the burden of a long development history.

The only change in key_hash.h is the addition of a friend function which
is used in scalar_hash_test.cc.

Changes in light_array.[h,cc] are to accommodate two scenarios: (1) the
use of ArraySpan, which was introduced after light_array was written;
and (2) the need for a KeyColumnArray to allocate data for the purposes
of interpreting (or decoding) the structure of a nested type.

The main reason for the 2nd scenario is that a ListArray may have many
values represented in a single row which should be hashed together;
however, if the ListArray has a nested ListArray or other type, the row
may have further structure. In the simplest interpretation, only the
highest-level structure (the "outer" ListArray) needs to be preserved,
and any further nested structures must be explicitly handled by custom
kernels (or any future options, etc. that are upstreamed into Arrow).

In trying to efficiently interpret complex nested types, ArraySpan can
be useful because it is non-owning, thus the main reason for the 1st
aforementioned scenario.

Although unfinished, any tests added to light_array_test.cc should
accommodate the 2 scenarios above.
  • Loading branch information
drin committed Jan 12, 2024
1 parent 4fce794 commit 5029e24
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 37 deletions.
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/key_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class ARROW_EXPORT Hashing64 {
friend void TestBloomLargeHashHelper(int64_t, int64_t, const std::vector<uint64_t>&,
int64_t, int, T*);
friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool);
friend void TestHashVarLen(bool should_incr, uint32_t row_count,
const uint32_t* var_offsets, const uint8_t* var_data,
uint64_t* hash_results);

public:
static void HashMultiColumn(const std::vector<KeyColumnArray>& cols, LightContext* ctx,
Expand Down
259 changes: 226 additions & 33 deletions cpp/src/arrow/compute/light_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length
KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
uint8_t* validity_buffer, uint8_t* fixed_length_buffer,
uint8_t* var_length_buffer, int bit_offset_validity,
int bit_offset_fixed) {
int bit_offset_fixed,
const util::TempVectorStack* alloc) {
metadata_ = metadata;
length_ = length;
buffers_[kValidityBuffer] = mutable_buffers_[kValidityBuffer] = validity_buffer;
buffers_[kFixedLengthBuffer] = mutable_buffers_[kFixedLengthBuffer] =
fixed_length_buffer;
buffers_[kVariableLengthBuffer] = mutable_buffers_[kVariableLengthBuffer] =
var_length_buffer;
bit_offset_[kValidityBuffer] = bit_offset_validity;
length_ = length;
buffers_[kValidityBuffer] = mutable_buffers_[kValidityBuffer] = validity_buffer;
buffers_[kFixedLengthBuffer] = mutable_buffers_[kFixedLengthBuffer] = fixed_length_buffer;
buffers_[kVariableLengthBuffer] = mutable_buffers_[kVariableLengthBuffer] = var_length_buffer;
bit_offset_[kValidityBuffer] = bit_offset_validity;
bit_offset_[kFixedLengthBuffer] = bit_offset_fixed;
arena_alloc = alloc;
}

KeyColumnArray KeyColumnArray::WithBufferFrom(const KeyColumnArray& other,
Expand All @@ -81,6 +81,8 @@ KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const {
sliced.metadata_ = metadata_;
sliced.length_ = length;
uint32_t fixed_size = metadata_.fixed_length;
// TODO: see if this is necessary
// uint32_t fixed_size = !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length;

sliced.buffers_[0] =
buffers_[0] ? buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr;
Expand Down Expand Up @@ -112,42 +114,229 @@ KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const {
return sliced;
}

Result<KeyColumnMetadata> ColumnMetadataFromDataType(
const std::shared_ptr<DataType>& type) {
const bool is_extension = type->id() == Type::EXTENSION;
const std::shared_ptr<DataType>& typ =
is_extension
? arrow::internal::checked_pointer_cast<ExtensionType>(type->GetSharedPtr())
->storage_type()
: type;

if (typ->id() == Type::DICTIONARY) {
Result<KeyColumnMetadata> ColumnMetadataFromDataType(const std::shared_ptr<DataType>& type) {
// "ptype" is the "physical" type
const DataType* ptype = type->GetSharedPtr().get();

// For ExtensionType, use the backing physical type (storage_type() is a shared ptr)
if (ARROW_PREDICT_FALSE(type->id() == Type::EXTENSION)) {
const ExtensionType* ext_type = static_cast<const ExtensionType*>(type);
ptype = ext_type->storage_type().get();
}

if (ptype->id() == Type::DICTIONARY) {
auto bit_width =
arrow::internal::checked_cast<const FixedWidthType&>(*typ).bit_width();
arrow::internal::checked_cast<const FixedWidthType&>(*ptype).bit_width();
ARROW_DCHECK(bit_width % 8 == 0);
return KeyColumnMetadata(true, bit_width / 8);
}
if (typ->id() == Type::BOOL) {
if (ptype->id() == Type::BOOL) {
return KeyColumnMetadata(true, 0);
}
if (is_fixed_width(typ->id())) {
if (is_fixed_width(ptype->id())) {
return KeyColumnMetadata(
true, arrow::internal::checked_cast<const FixedWidthType&>(*typ).bit_width() / 8);
true,
arrow::internal::checked_cast<const FixedWidthType&>(*ptype).bit_width() / 8);
}
if (is_binary_like(typ->id())) {
if (is_binary_like(ptype->id())) {
return KeyColumnMetadata(false, sizeof(uint32_t));
}
if (is_large_binary_like(typ->id())) {
if (is_large_binary_like(ptype->id())) {
return KeyColumnMetadata(false, sizeof(uint64_t));
}
if (typ->id() == Type::NA) {
if (ptype->id() == Type::NA) {
return KeyColumnMetadata(true, 0, true);
}
// Caller attempted to create a KeyColumnArray from an invalid type
return Status::TypeError("Unsupported column data type ", typ->name(),
return Status::TypeError("Unsupported column data type ", ptype->name(),
" used with KeyColumnMetadata");
}

Result<KeyColumnMetadata> ColumnMetadataFromDataType(
const std::shared_ptr<DataType>& type) {
return ColumnMetadataFromDataType(type.get());
}

/**
* Constructs metadata that tells hashing functions how to iterate over the
* KeyColumnArray.
*
* This function assumes ColumnMetadataFromDataType has already failed, which makes this
* function distinct because it should only be called when the input Array is flattened in
* a particular way.
*/
Result<KeyColumnMetadata> ColumnMetadataFromListType(const DataType* type) {
if (type->id() == Type::LIST || type->id() == Type::MAP) {
return KeyColumnMetadata(false, sizeof(uint32_t));
}
else if (type->id() == Type::LARGE_LIST) {
return KeyColumnMetadata(false, sizeof(uint64_t));
}
// Caller attempted to create a KeyColumnArray from an invalid type
return Status::TypeError("Unsupported column data type ", type->name(),
" used with KeyColumnMetadata");
}

Result<KeyColumnMetadata> ColumnMetadataFromListType(
const std::shared_ptr<DataType>& type) {
return ColumnMetadataFromListType(type.get());
}

/**
* Coalesces children of a StructArray into a flattened list of KeyColumnArrays. When
* hashing a StructArray, we want to co-index a list of KeyColumnArrays so that values in
* the same row are combined.
*/
Result<KeyColumnVector> ColumnArraysFromStructArray(const ArraySpan& array_span,
int64_t num_rows) {
KeyColumnVector flattened_spans;
flattened_spans.reserve(array_span.child_data.size());

// Recurse on each child of the ArraySpan in DFS-order
for (size_t child_ndx = 0; child_ndx < array_span.child_data.size(); ++child_ndx) {
auto child_span = array_span.child_data[child_ndx];
ARROW_ASSIGN_OR_RAISE(auto child_keycols,
ColumnArraysFromArraySpan(child_span, num_rows));

flattened_spans.insert(flattened_spans.end(), child_keycols.begin(),
child_keycols.end());
}

return flattened_spans;
}

/**
* Flattens the data in a ListArray into a list of KeyColumnArrays so that each element in
* the ListArray is properly treated as a row value. Due to semantics of nulls in nested
* arrays and their non-impact on a hash value, nulls in nested arrays are dropped when
* flattened. If a list is null, then that row is considered null and is preserved.
*
* The values buffer of a list type should be propagated to the caller as is, but the
* parent offsets and offsets of the current ArraySpan must be coalesced. Essentially, for
* the purposes of hashing, we don't care about internal structure of a row value so we
* flatten the offsets.
*/

// TODO: recurse to: (1) flatten offsets, (2) eventually bottom out and grab var data
template <typename OffsetType>
Result<KeyColumnArray> ColumnArraysFromListArray(const ArraySpan& array_span,
int64_t num_rows,
const OffsetType* parent_offsets) {
// Construct KeyColumnMetadata
ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata,
ColumnMetadataFromListType(array_span.type));

ARROW_LOG(INFO) << "[ListArray] Child count: "
<< std::to_string(array_span.child_data.size());

// ListArrays have only 1 child
auto child_span = array_span.child_data[0];
uint8_t* buffer_validity = nullptr;
/*
* TODO: Currently unsupported.
* figure out how the validity bitmap affects flattened lists
*if (child_span.GetBuffer(0) != nullptr) {
* buffer_validity = (uint8_t*)child_span.GetBuffer(0)->data();
*}
**/

// For simple lists or lists containing only list types, this should point to the child
// buffer with all of the values
uint8_t* buffer_varlength = nullptr;
if (child_span.num_buffers() > 2 && child_span.GetBuffer(2) != NULLPTR) {
ARROW_LOG(INFO) << "found list array data";
buffer_varlength = (uint8_t*)child_span.GetBuffer(2)->data();
}
else {
ARROW_LOG(INFO) << "child array does not have list array data";
}

// TODO
// Lists get flattened to 1 KeyColumnArray; Maps get flattened to 2 (key and value)
KeyColumnArray column_array =
KeyColumnArray(metadata, child_span.offset + num_rows, buffer_validity,
child_span.GetBuffer(1)->data(), buffer_varlength);

return column_array;
}


Result<KeyColumnArray> ColumnArrayFromArraySpan(const ArraySpan& array_span,
int64_t num_rows) {
ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata,
ColumnMetadataFromDataType(array_span.type));

uint8_t* buffer_validity = nullptr;
if (array_span.GetBuffer(0) != nullptr) {
buffer_validity = (uint8_t*)array_span.GetBuffer(0)->data();
}

uint8_t* buffer_varlength = nullptr;
if (array_span.num_buffers() > 2 && array_span.GetBuffer(2) != NULLPTR) {
buffer_varlength = (uint8_t*)array_span.GetBuffer(2)->data();
}

KeyColumnArray column_array =
KeyColumnArray(metadata, array_span.offset + num_rows, buffer_validity,
array_span.GetBuffer(1)->data(), buffer_varlength);

return column_array.Slice(array_span.offset, num_rows);
}

Result<KeyColumnVector> ColumnArraysFromArraySpan(const ArraySpan& array_span,
int64_t num_rows) {
KeyColumnVector flattened_spans;
flattened_spans.reserve(1 + array_span.child_data.size());

// Construct a KeyColumnArray from the given ArraySpan
auto keycol_result = ColumnArrayFromArraySpan(array_span, num_rows);
if (keycol_result.ok()) {
flattened_spans.push_back(*keycol_result);
}

// If ArraySpan data type is not supported, check for supported nested types.
else if (is_nested(array_span.type->id())) {
switch (array_span.type->id()) {
case Type::LIST:
case Type::MAP: {
const uint32_t* list_offsets = (const uint32_t*) array_span.GetBuffer(1)->data();
ARROW_ASSIGN_OR_RAISE(auto list_keycol,
ColumnArraysFromListArray(array_span, num_rows,
list_offsets));

flattened_spans.push_back(list_keycol);
break;
}

case Type::LARGE_LIST: {
const uint64_t* list_offsets = (const uint64_t*) array_span.GetBuffer(1)->data();
ARROW_ASSIGN_OR_RAISE(auto list_keycol,
ColumnArraysFromListArray(array_span, num_rows,
list_offsets));

flattened_spans.push_back(list_keycol);
break;
}

case Type::STRUCT: {
ARROW_ASSIGN_OR_RAISE(auto struct_keycols,
ColumnArraysFromStructArray(array_span, num_rows));

flattened_spans.insert(flattened_spans.end(), struct_keycols.begin(),
struct_keycols.end());
break;
}

default:
// unsupported types include: unions, fixed size list
ARROW_WARN_NOT_OK(keycol_result.status(), "Unsupported nested type for hashing");
break;
}
}

return flattened_spans;
}

Result<KeyColumnArray> ColumnArrayFromArrayData(
const std::shared_ptr<ArrayData>& array_data, int64_t start_row, int64_t num_rows) {
ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata,
Expand All @@ -159,12 +348,14 @@ KeyColumnArray ColumnArrayFromArrayDataAndMetadata(
const std::shared_ptr<ArrayData>& array_data, const KeyColumnMetadata& metadata,
int64_t start_row, int64_t num_rows) {
KeyColumnArray column_array = KeyColumnArray(
metadata, array_data->offset + start_row + num_rows,
array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr,
array_data->buffers[1]->data(),
(array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR)
metadata
,array_data->offset + start_row + num_rows
,array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr
,array_data->buffers[1]->data()
,(array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR)
? array_data->buffers[2]->data()
: nullptr);
: nullptr
);
return column_array.Slice(array_data->offset + start_row, num_rows);
}

Expand Down Expand Up @@ -199,10 +390,12 @@ Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t start_row,

Status ColumnArraysFromExecBatch(const ExecBatch& batch,
std::vector<KeyColumnArray>* column_arrays) {
return ColumnArraysFromExecBatch(batch, 0, static_cast<int>(batch.length),
column_arrays);
return ColumnArraysFromExecBatch(
batch, 0, static_cast<int>(batch.length), column_arrays
);
}


void ResizableArrayData::Init(const std::shared_ptr<DataType>& data_type,
MemoryPool* pool, int log_num_rows_min) {
#ifndef NDEBUG
Expand Down
24 changes: 20 additions & 4 deletions cpp/src/arrow/compute/light_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class ARROW_EXPORT KeyColumnArray {
public:
/// \brief Create an uninitialized KeyColumnArray
KeyColumnArray() = default;

/// \brief Create a read-only view from buffers
///
/// This is a view only and does not take ownership of the buffers. The lifetime
Expand All @@ -96,10 +97,14 @@ class ARROW_EXPORT KeyColumnArray {
///
/// This is a view only and does not take ownership of the buffers. The lifetime
/// of the buffers must exceed the lifetime of this view
KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
uint8_t* validity_buffer, uint8_t* fixed_length_buffer,
uint8_t* var_length_buffer, int bit_offset_validity = 0,
int bit_offset_fixed = 0);
KeyColumnArray( const KeyColumnMetadata& metadata
,int64_t length
,uint8_t* validity_buffer
,uint8_t* fixed_length_buffer
,uint8_t* var_length_buffer
,int bit_offset_validity = 0
,int bit_offset_fixed = 0
,const util::TempVectorStack *alloc = nullptr);
/// \brief Create a sliced view of `this`
///
/// The number of rows used in offset must be divisible by 8
Expand Down Expand Up @@ -183,6 +188,7 @@ class ARROW_EXPORT KeyColumnArray {
// Starting bit offset within the first byte (between 0 and 7)
// to be used when accessing buffers that store bit vectors.
int bit_offset_[kMaxBuffers - 1];
const util::TempVectorStack* arena_alloc;

bool is_bool_type() const {
return metadata_.is_fixed_length && metadata_.fixed_length == 0 &&
Expand Down Expand Up @@ -220,6 +226,15 @@ class ARROW_EXPORT KeyColumnArray {
ARROW_EXPORT Result<KeyColumnMetadata> ColumnMetadataFromDataType(
const std::shared_ptr<DataType>& type);

ARROW_EXPORT Result<KeyColumnMetadata> ColumnMetadataFromDataType(const DataType* type);

/// \brief Create KeyColumnArray from ArraySpan
///
/// The caller should ensure this is only called on "key" columns.
/// \see ColumnMetadataFromDataType for details
ARROW_EXPORT Result<KeyColumnVector> ColumnArraysFromArraySpan(
const ArraySpan& array_span, int64_t num_rows);

/// \brief Create KeyColumnArray from ArrayData
///
/// If `type` is a dictionary type then this will return the KeyColumnArray for
Expand Down Expand Up @@ -269,6 +284,7 @@ ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t st
ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch,
std::vector<KeyColumnArray>* column_arrays);


/// A lightweight resizable array for "key" columns
///
/// Unlike KeyColumnArray this instance owns its buffers
Expand Down
Loading

0 comments on commit 5029e24

Please sign in to comment.