Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-53]Partial fix Q24a/Q24b tail SHJ task materialization performance issue #54

Merged
merged 2 commits into from
Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 168 additions & 11 deletions cpp/src/codegen/arrow_compute/ext/array_appender.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <cstdint>
#include <vector>

#include "codegen/arrow_compute/ext/array_item_index.h"

namespace sparkcolumnarplugin {
namespace codegen {
namespace arrowcompute {
Expand All @@ -46,6 +48,15 @@ class AppenderBase {
return arrow::Status::NotImplemented("AppenderBase Append is abstract.");
}

virtual arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id,
int repeated) {
return arrow::Status::NotImplemented("AppenderBase Append is abstract.");
}

virtual arrow::Status Append(const std::vector<ArrayItemIndex>& index_list) {
return arrow::Status::NotImplemented("AppenderBase Append is abstract.");
}

virtual arrow::Status AppendNull() {
return arrow::Status::NotImplemented("AppenderBase AppendNull is abstract.");
}
Expand All @@ -67,10 +78,14 @@ template <typename DataType, typename Enable = void>
class ArrayAppender {};

template <typename T>
using enable_if_not_boolean = std::enable_if_t<!arrow::is_boolean_type<T>::value>;
using is_number_or_date = std::integral_constant<bool, arrow::is_number_type<T>::value ||
arrow::is_date_type<T>::value>;

template <typename DataType, typename R = void>
using enable_if_number_or_date = std::enable_if_t<is_number_or_date<DataType>::value, R>;

template <typename DataType>
class ArrayAppender<DataType, enable_if_not_boolean<DataType>> : public AppenderBase {
class ArrayAppender<DataType, enable_if_number_or_date<DataType>> : public AppenderBase {
public:
ArrayAppender(arrow::compute::FunctionContext* ctx, AppenderType type = left)
: ctx_(ctx), type_(type) {
Expand All @@ -85,27 +100,139 @@ class ArrayAppender<DataType, enable_if_not_boolean<DataType>> : public Appender
arrow::Status AddArray(const std::shared_ptr<arrow::Array>& arr) override {
auto typed_arr_ = std::dynamic_pointer_cast<ArrayType_>(arr);
cached_arr_.emplace_back(typed_arr_);
if (typed_arr_->null_count() > 0) has_null_ = true;
return arrow::Status::OK();
}

arrow::Status PopArray() override {
cached_arr_.pop_back();
has_null_ = false;
return arrow::Status::OK();
}

arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id) override {
if (!cached_arr_[array_id]->IsNull(item_id)) {
if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
RETURN_NOT_OK(builder_->Append(cached_arr_[array_id]->GetView(item_id)));
}
return arrow::Status::OK();
}

arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id,
int repeated) override {
if (repeated == 0) return arrow::Status::OK();
if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) {
RETURN_NOT_OK(builder_->AppendNulls(repeated));
} else {
auto val = cached_arr_[array_id]->GetView(item_id);
return builder_->Append(cached_arr_[array_id]->GetView(item_id));
std::vector<CType> values;
values.resize(repeated, val);
RETURN_NOT_OK(builder_->AppendValues(values.data(), repeated));
}
return arrow::Status::OK();
}

arrow::Status Append(const std::vector<ArrayItemIndex>& index_list) {
for (auto tmp : index_list) {
if (has_null_ && cached_arr_[tmp.array_id]->IsNull(tmp.id)) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
RETURN_NOT_OK(builder_->Append(cached_arr_[tmp.array_id]->GetView(tmp.id)));
}
}
return arrow::Status::OK();
}

arrow::Status AppendNull() override { return builder_->AppendNull(); }

arrow::Status Finish(std::shared_ptr<arrow::Array>* out_) override {
auto status = builder_->Finish(out_);
return status;
}

arrow::Status Reset() override {
builder_->Reset();
return arrow::Status::OK();
}

private:
using BuilderType_ = typename arrow::TypeTraits<DataType>::BuilderType;
using ArrayType_ = typename arrow::TypeTraits<DataType>::ArrayType;
using CType = typename arrow::TypeTraits<DataType>::CType;
std::unique_ptr<BuilderType_> builder_;
std::vector<std::shared_ptr<ArrayType_>> cached_arr_;
arrow::compute::FunctionContext* ctx_;
AppenderType type_;
bool has_null_ = false;
};

template <typename DataType>
class ArrayAppender<DataType, arrow::enable_if_string_like<DataType>>
: public AppenderBase {
public:
ArrayAppender(arrow::compute::FunctionContext* ctx, AppenderType type = left)
: ctx_(ctx), type_(type) {
std::unique_ptr<arrow::ArrayBuilder> array_builder;
arrow::MakeBuilder(ctx_->memory_pool(), arrow::TypeTraits<DataType>::type_singleton(),
&array_builder);
builder_.reset(arrow::internal::checked_cast<BuilderType_*>(array_builder.release()));
}
~ArrayAppender() {}

AppenderType GetType() override { return type_; }
arrow::Status AddArray(const std::shared_ptr<arrow::Array>& arr) override {
auto typed_arr_ = std::dynamic_pointer_cast<ArrayType_>(arr);
cached_arr_.emplace_back(typed_arr_);
if (typed_arr_->null_count() > 0) has_null_ = true;
return arrow::Status::OK();
}

arrow::Status PopArray() override {
cached_arr_.pop_back();
has_null_ = false;
return arrow::Status::OK();
}

arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id) override {
if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
RETURN_NOT_OK(builder_->Append(cached_arr_[array_id]->GetView(item_id)));
}
return arrow::Status::OK();
}

arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id,
int repeated) override {
if (repeated == 0) return arrow::Status::OK();
if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) {
RETURN_NOT_OK(builder_->AppendNulls(repeated));
} else {
return builder_->AppendNull();
auto val = cached_arr_[array_id]->GetView(item_id);
for (int i = 0; i < repeated; i++) {
RETURN_NOT_OK(builder_->Append(val));
}
}
return arrow::Status::OK();
}

arrow::Status Append(const std::vector<ArrayItemIndex>& index_list) {
for (auto tmp : index_list) {
if (has_null_ && cached_arr_[tmp.array_id]->IsNull(tmp.id)) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
RETURN_NOT_OK(builder_->Append(cached_arr_[tmp.array_id]->GetView(tmp.id)));
}
}
return arrow::Status::OK();
}

arrow::Status AppendNull() override { return builder_->AppendNull(); }

arrow::Status Finish(std::shared_ptr<arrow::Array>* out_) override {
return builder_->Finish(out_);
auto status = builder_->Finish(out_);
return status;
}

arrow::Status Reset() override {
Expand All @@ -120,6 +247,7 @@ class ArrayAppender<DataType, enable_if_not_boolean<DataType>> : public Appender
std::vector<std::shared_ptr<ArrayType_>> cached_arr_;
arrow::compute::FunctionContext* ctx_;
AppenderType type_;
bool has_null_ = false;
};

template <typename DataType>
Expand All @@ -138,29 +266,57 @@ class ArrayAppender<DataType, arrow::enable_if_boolean<DataType>> : public Appen
arrow::Status AddArray(const std::shared_ptr<arrow::Array>& arr) override {
auto typed_arr_ = std::dynamic_pointer_cast<ArrayType_>(arr);
cached_arr_.emplace_back(typed_arr_);
if (typed_arr_->null_count() > 0) has_null_ = true;
return arrow::Status::OK();
}

arrow::Status PopArray() override {
cached_arr_.pop_back();
has_null_ = false;
return arrow::Status::OK();
}

arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id) override {
if (!cached_arr_[array_id]->IsNull(item_id)) {
auto val = cached_arr_[array_id]->GetView(item_id);
return builder_->Append(cached_arr_[array_id]->GetView(item_id));
if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
return builder_->AppendNull();
RETURN_NOT_OK(builder_->Append(cached_arr_[array_id]->GetView(item_id)));
}
return arrow::Status::OK();
}

arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id,
int repeated) override {
if (repeated == 0) return arrow::Status::OK();
if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) {
RETURN_NOT_OK(builder_->AppendNulls(repeated));
} else {
auto val = cached_arr_[array_id]->GetView(item_id);
for (int i = 0; i < repeated; i++) {
RETURN_NOT_OK(builder_->Append(val));
}
}
return arrow::Status::OK();
}

arrow::Status Append(const std::vector<ArrayItemIndex>& index_list) {
for (auto tmp : index_list) {
if (has_null_ && cached_arr_[tmp.array_id]->IsNull(tmp.id)) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
RETURN_NOT_OK(builder_->Append(cached_arr_[tmp.array_id]->GetView(tmp.id)));
}
}
return arrow::Status::OK();
}

arrow::Status AppendNull() override { return builder_->AppendNull(); }

arrow::Status AppendExistence(bool is_exist) { return builder_->Append(is_exist); }

arrow::Status Finish(std::shared_ptr<arrow::Array>* out_) override {
return builder_->Finish(out_);
auto status = builder_->Finish(out_);
return status;
}

arrow::Status Reset() override {
Expand All @@ -175,6 +331,7 @@ class ArrayAppender<DataType, arrow::enable_if_boolean<DataType>> : public Appen
std::vector<std::shared_ptr<ArrayType_>> cached_arr_;
arrow::compute::FunctionContext* ctx_;
AppenderType type_;
bool has_null_ = false;
};

#define PROCESS_SUPPORTED_TYPES(PROCESS) \
Expand Down
61 changes: 29 additions & 32 deletions cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ class ConditionedProbeKernel::Impl {
: hash_relation_(hash_relation), appender_list_(appender_list) {}
uint64_t Evaluate(std::shared_ptr<arrow::Array> key_array,
const arrow::ArrayVector& key_payloads) override {
struct timespec start, end;
auto typed_key_array = std::dynamic_pointer_cast<ArrayType>(key_array);
std::vector<std::shared_ptr<UnsafeArray>> payloads;
int i = 0;
Expand Down Expand Up @@ -705,16 +706,15 @@ class ConditionedProbeKernel::Impl {
if (index == -1) {
continue;
}
for (auto tmp : hash_relation_->GetItemListByIndex(index)) {
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(tmp.array_id, tmp.id));
} else {
THROW_NOT_OK(appender->Append(0, i));
}
auto index_list = hash_relation_->GetItemListByIndex(index);
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(index_list));
} else {
THROW_NOT_OK(appender->Append(0, i, index_list.size()));
}
out_length += 1;
}
out_length += index_list.size();
}
return out_length;
}
Expand Down Expand Up @@ -831,16 +831,15 @@ class ConditionedProbeKernel::Impl {
out_length += 1;
continue;
}
for (auto tmp : hash_relation_->GetItemListByIndex(index)) {
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(tmp.array_id, tmp.id));
} else {
THROW_NOT_OK(appender->Append(0, i));
}
auto index_list = hash_relation_->GetItemListByIndex(index);
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(index_list));
} else {
THROW_NOT_OK(appender->Append(0, i, index_list.size()));
}
out_length += 1;
}
out_length += index_list.size();
}
return out_length;
}
Expand Down Expand Up @@ -1228,16 +1227,15 @@ class ConditionedProbeKernel::Impl {
if (index == -1) {
continue;
}
for (auto tmp : typed_hash_relation_->GetItemListByIndex(index)) {
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(tmp.array_id, tmp.id));
} else {
THROW_NOT_OK(appender->Append(0, i));
}
auto index_list = typed_hash_relation_->GetItemListByIndex(index);
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(index_list));
} else {
THROW_NOT_OK(appender->Append(0, i, index_list.size()));
}
out_length += 1;
}
out_length += index_list.size();
}
return out_length;
}
Expand Down Expand Up @@ -1278,16 +1276,15 @@ class ConditionedProbeKernel::Impl {
out_length += 1;
continue;
}
for (auto tmp : typed_hash_relation_->GetItemListByIndex(index)) {
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(tmp.array_id, tmp.id));
} else {
THROW_NOT_OK(appender->Append(0, i));
}
auto index_list = typed_hash_relation_->GetItemListByIndex(index);
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(index_list));
} else {
THROW_NOT_OK(appender->Append(0, i, index_list.size()));
}
out_length += 1;
}
out_length += index_list.size();
}
return out_length;
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/precompile/type_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ template <typename T>
using is_number_like_type =
std::integral_constant<bool, is_number_like<T>::value || is_date_type<T>::value>;

template <typename T>
using enable_if_boolean = std::enable_if_t<is_boolean_type<T>::value>;

template <typename T>
using enable_if_number = std::enable_if_t<is_number_like_type<T>::value>;

Expand Down
Loading