Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-53]Partial fix Q24a/Q24b tail SHJ task materialization performan…
Browse files Browse the repository at this point in the history
…ce issue (#54)

* Partial fix Q24a/Q24b tail SHJ task materialization performance issue

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* split w/wo repeated api for sort and join

Signed-off-by: Chendi Xue <chendi.xue@intel.com>
  • Loading branch information
xuechendi authored Jan 25, 2021
1 parent a9ae3d8 commit 14012ec
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 91 deletions.
179 changes: 168 additions & 11 deletions cpp/src/codegen/arrow_compute/ext/array_appender.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <cstdint>
#include <vector>

#include "codegen/arrow_compute/ext/array_item_index.h"

namespace sparkcolumnarplugin {
namespace codegen {
namespace arrowcompute {
Expand All @@ -46,6 +48,15 @@ class AppenderBase {
return arrow::Status::NotImplemented("AppenderBase Append is abstract.");
}

virtual arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id,
int repeated) {
return arrow::Status::NotImplemented("AppenderBase Append is abstract.");
}

virtual arrow::Status Append(const std::vector<ArrayItemIndex>& index_list) {
return arrow::Status::NotImplemented("AppenderBase Append is abstract.");
}

virtual arrow::Status AppendNull() {
return arrow::Status::NotImplemented("AppenderBase AppendNull is abstract.");
}
Expand All @@ -67,10 +78,14 @@ template <typename DataType, typename Enable = void>
class ArrayAppender {};

template <typename T>
using enable_if_not_boolean = std::enable_if_t<!arrow::is_boolean_type<T>::value>;
using is_number_or_date = std::integral_constant<bool, arrow::is_number_type<T>::value ||
arrow::is_date_type<T>::value>;

template <typename DataType, typename R = void>
using enable_if_number_or_date = std::enable_if_t<is_number_or_date<DataType>::value, R>;

template <typename DataType>
class ArrayAppender<DataType, enable_if_not_boolean<DataType>> : public AppenderBase {
class ArrayAppender<DataType, enable_if_number_or_date<DataType>> : public AppenderBase {
public:
ArrayAppender(arrow::compute::FunctionContext* ctx, AppenderType type = left)
: ctx_(ctx), type_(type) {
Expand All @@ -85,27 +100,139 @@ class ArrayAppender<DataType, enable_if_not_boolean<DataType>> : public Appender
arrow::Status AddArray(const std::shared_ptr<arrow::Array>& arr) override {
auto typed_arr_ = std::dynamic_pointer_cast<ArrayType_>(arr);
cached_arr_.emplace_back(typed_arr_);
if (typed_arr_->null_count() > 0) has_null_ = true;
return arrow::Status::OK();
}

arrow::Status PopArray() override {
cached_arr_.pop_back();
has_null_ = false;
return arrow::Status::OK();
}

arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id) override {
if (!cached_arr_[array_id]->IsNull(item_id)) {
if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
RETURN_NOT_OK(builder_->Append(cached_arr_[array_id]->GetView(item_id)));
}
return arrow::Status::OK();
}

arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id,
int repeated) override {
if (repeated == 0) return arrow::Status::OK();
if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) {
RETURN_NOT_OK(builder_->AppendNulls(repeated));
} else {
auto val = cached_arr_[array_id]->GetView(item_id);
return builder_->Append(cached_arr_[array_id]->GetView(item_id));
std::vector<CType> values;
values.resize(repeated, val);
RETURN_NOT_OK(builder_->AppendValues(values.data(), repeated));
}
return arrow::Status::OK();
}

arrow::Status Append(const std::vector<ArrayItemIndex>& index_list) {
for (auto tmp : index_list) {
if (has_null_ && cached_arr_[tmp.array_id]->IsNull(tmp.id)) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
RETURN_NOT_OK(builder_->Append(cached_arr_[tmp.array_id]->GetView(tmp.id)));
}
}
return arrow::Status::OK();
}

arrow::Status AppendNull() override { return builder_->AppendNull(); }

arrow::Status Finish(std::shared_ptr<arrow::Array>* out_) override {
auto status = builder_->Finish(out_);
return status;
}

arrow::Status Reset() override {
builder_->Reset();
return arrow::Status::OK();
}

private:
using BuilderType_ = typename arrow::TypeTraits<DataType>::BuilderType;
using ArrayType_ = typename arrow::TypeTraits<DataType>::ArrayType;
using CType = typename arrow::TypeTraits<DataType>::CType;
std::unique_ptr<BuilderType_> builder_;
std::vector<std::shared_ptr<ArrayType_>> cached_arr_;
arrow::compute::FunctionContext* ctx_;
AppenderType type_;
bool has_null_ = false;
};

template <typename DataType>
class ArrayAppender<DataType, arrow::enable_if_string_like<DataType>>
: public AppenderBase {
public:
ArrayAppender(arrow::compute::FunctionContext* ctx, AppenderType type = left)
: ctx_(ctx), type_(type) {
std::unique_ptr<arrow::ArrayBuilder> array_builder;
arrow::MakeBuilder(ctx_->memory_pool(), arrow::TypeTraits<DataType>::type_singleton(),
&array_builder);
builder_.reset(arrow::internal::checked_cast<BuilderType_*>(array_builder.release()));
}
~ArrayAppender() {}

AppenderType GetType() override { return type_; }
arrow::Status AddArray(const std::shared_ptr<arrow::Array>& arr) override {
auto typed_arr_ = std::dynamic_pointer_cast<ArrayType_>(arr);
cached_arr_.emplace_back(typed_arr_);
if (typed_arr_->null_count() > 0) has_null_ = true;
return arrow::Status::OK();
}

arrow::Status PopArray() override {
cached_arr_.pop_back();
has_null_ = false;
return arrow::Status::OK();
}

arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id) override {
if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
RETURN_NOT_OK(builder_->Append(cached_arr_[array_id]->GetView(item_id)));
}
return arrow::Status::OK();
}

arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id,
int repeated) override {
if (repeated == 0) return arrow::Status::OK();
if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) {
RETURN_NOT_OK(builder_->AppendNulls(repeated));
} else {
return builder_->AppendNull();
auto val = cached_arr_[array_id]->GetView(item_id);
for (int i = 0; i < repeated; i++) {
RETURN_NOT_OK(builder_->Append(val));
}
}
return arrow::Status::OK();
}

arrow::Status Append(const std::vector<ArrayItemIndex>& index_list) {
for (auto tmp : index_list) {
if (has_null_ && cached_arr_[tmp.array_id]->IsNull(tmp.id)) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
RETURN_NOT_OK(builder_->Append(cached_arr_[tmp.array_id]->GetView(tmp.id)));
}
}
return arrow::Status::OK();
}

arrow::Status AppendNull() override { return builder_->AppendNull(); }

arrow::Status Finish(std::shared_ptr<arrow::Array>* out_) override {
return builder_->Finish(out_);
auto status = builder_->Finish(out_);
return status;
}

arrow::Status Reset() override {
Expand All @@ -120,6 +247,7 @@ class ArrayAppender<DataType, enable_if_not_boolean<DataType>> : public Appender
std::vector<std::shared_ptr<ArrayType_>> cached_arr_;
arrow::compute::FunctionContext* ctx_;
AppenderType type_;
bool has_null_ = false;
};

template <typename DataType>
Expand All @@ -138,29 +266,57 @@ class ArrayAppender<DataType, arrow::enable_if_boolean<DataType>> : public Appen
arrow::Status AddArray(const std::shared_ptr<arrow::Array>& arr) override {
auto typed_arr_ = std::dynamic_pointer_cast<ArrayType_>(arr);
cached_arr_.emplace_back(typed_arr_);
if (typed_arr_->null_count() > 0) has_null_ = true;
return arrow::Status::OK();
}

arrow::Status PopArray() override {
cached_arr_.pop_back();
has_null_ = false;
return arrow::Status::OK();
}

arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id) override {
if (!cached_arr_[array_id]->IsNull(item_id)) {
auto val = cached_arr_[array_id]->GetView(item_id);
return builder_->Append(cached_arr_[array_id]->GetView(item_id));
if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
return builder_->AppendNull();
RETURN_NOT_OK(builder_->Append(cached_arr_[array_id]->GetView(item_id)));
}
return arrow::Status::OK();
}

arrow::Status Append(const uint16_t& array_id, const uint16_t& item_id,
int repeated) override {
if (repeated == 0) return arrow::Status::OK();
if (has_null_ && cached_arr_[array_id]->IsNull(item_id)) {
RETURN_NOT_OK(builder_->AppendNulls(repeated));
} else {
auto val = cached_arr_[array_id]->GetView(item_id);
for (int i = 0; i < repeated; i++) {
RETURN_NOT_OK(builder_->Append(val));
}
}
return arrow::Status::OK();
}

arrow::Status Append(const std::vector<ArrayItemIndex>& index_list) {
for (auto tmp : index_list) {
if (has_null_ && cached_arr_[tmp.array_id]->IsNull(tmp.id)) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
RETURN_NOT_OK(builder_->Append(cached_arr_[tmp.array_id]->GetView(tmp.id)));
}
}
return arrow::Status::OK();
}

arrow::Status AppendNull() override { return builder_->AppendNull(); }

arrow::Status AppendExistence(bool is_exist) { return builder_->Append(is_exist); }

arrow::Status Finish(std::shared_ptr<arrow::Array>* out_) override {
return builder_->Finish(out_);
auto status = builder_->Finish(out_);
return status;
}

arrow::Status Reset() override {
Expand All @@ -175,6 +331,7 @@ class ArrayAppender<DataType, arrow::enable_if_boolean<DataType>> : public Appen
std::vector<std::shared_ptr<ArrayType_>> cached_arr_;
arrow::compute::FunctionContext* ctx_;
AppenderType type_;
bool has_null_ = false;
};

#define PROCESS_SUPPORTED_TYPES(PROCESS) \
Expand Down
61 changes: 29 additions & 32 deletions cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ class ConditionedProbeKernel::Impl {
: hash_relation_(hash_relation), appender_list_(appender_list) {}
uint64_t Evaluate(std::shared_ptr<arrow::Array> key_array,
const arrow::ArrayVector& key_payloads) override {
struct timespec start, end;
auto typed_key_array = std::dynamic_pointer_cast<ArrayType>(key_array);
std::vector<std::shared_ptr<UnsafeArray>> payloads;
int i = 0;
Expand Down Expand Up @@ -705,16 +706,15 @@ class ConditionedProbeKernel::Impl {
if (index == -1) {
continue;
}
for (auto tmp : hash_relation_->GetItemListByIndex(index)) {
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(tmp.array_id, tmp.id));
} else {
THROW_NOT_OK(appender->Append(0, i));
}
auto index_list = hash_relation_->GetItemListByIndex(index);
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(index_list));
} else {
THROW_NOT_OK(appender->Append(0, i, index_list.size()));
}
out_length += 1;
}
out_length += index_list.size();
}
return out_length;
}
Expand Down Expand Up @@ -831,16 +831,15 @@ class ConditionedProbeKernel::Impl {
out_length += 1;
continue;
}
for (auto tmp : hash_relation_->GetItemListByIndex(index)) {
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(tmp.array_id, tmp.id));
} else {
THROW_NOT_OK(appender->Append(0, i));
}
auto index_list = hash_relation_->GetItemListByIndex(index);
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(index_list));
} else {
THROW_NOT_OK(appender->Append(0, i, index_list.size()));
}
out_length += 1;
}
out_length += index_list.size();
}
return out_length;
}
Expand Down Expand Up @@ -1228,16 +1227,15 @@ class ConditionedProbeKernel::Impl {
if (index == -1) {
continue;
}
for (auto tmp : typed_hash_relation_->GetItemListByIndex(index)) {
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(tmp.array_id, tmp.id));
} else {
THROW_NOT_OK(appender->Append(0, i));
}
auto index_list = typed_hash_relation_->GetItemListByIndex(index);
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(index_list));
} else {
THROW_NOT_OK(appender->Append(0, i, index_list.size()));
}
out_length += 1;
}
out_length += index_list.size();
}
return out_length;
}
Expand Down Expand Up @@ -1278,16 +1276,15 @@ class ConditionedProbeKernel::Impl {
out_length += 1;
continue;
}
for (auto tmp : typed_hash_relation_->GetItemListByIndex(index)) {
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(tmp.array_id, tmp.id));
} else {
THROW_NOT_OK(appender->Append(0, i));
}
auto index_list = typed_hash_relation_->GetItemListByIndex(index);
for (auto appender : appender_list_) {
if (appender->GetType() == AppenderBase::left) {
THROW_NOT_OK(appender->Append(index_list));
} else {
THROW_NOT_OK(appender->Append(0, i, index_list.size()));
}
out_length += 1;
}
out_length += index_list.size();
}
return out_length;
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/precompile/type_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ template <typename T>
using is_number_like_type =
std::integral_constant<bool, is_number_like<T>::value || is_date_type<T>::value>;

template <typename T>
using enable_if_boolean = std::enable_if_t<is_boolean_type<T>::value>;

template <typename T>
using enable_if_number = std::enable_if_t<is_number_like_type<T>::value>;

Expand Down
Loading

0 comments on commit 14012ec

Please sign in to comment.