diff --git a/cpp/src/codegen/arrow_compute/ext/code_generator_base.h b/cpp/src/codegen/arrow_compute/ext/code_generator_base.h index dd3df0894..1f8da6162 100644 --- a/cpp/src/codegen/arrow_compute/ext/code_generator_base.h +++ b/cpp/src/codegen/arrow_compute/ext/code_generator_base.h @@ -17,6 +17,7 @@ #pragma once #include "codegen/common/result_iterator.h" +#include "precompile/array.h" class SortRelation; class GandivaProjector; @@ -45,6 +46,10 @@ class CodeGenBase { return arrow::Status::NotImplemented("CodeGenBase Finish is an abstract interface."); } + virtual arrow::Status FinishInternal(std::shared_ptr* out) { + return arrow::Status::NotImplemented("CodeGenBase FinishInternal is an abstract interface."); + } + virtual arrow::Status MakeResultIterator( std::shared_ptr schema, std::shared_ptr>* out) { diff --git a/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc b/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc index b8297dc9a..aab22c419 100644 --- a/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc +++ b/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc @@ -1844,6 +1844,500 @@ class SortMultiplekeyKernel : public SortArraysToIndicesKernel::Impl { }; }; +/////////////// SortMultiplekeyCodegenKernel //////////////// +class SortMultiplekeyCodegenKernel : public SortArraysToIndicesKernel::Impl { + public: + SortMultiplekeyCodegenKernel(arrow::compute::FunctionContext* ctx, + std::shared_ptr result_schema, + std::shared_ptr key_projector, + std::vector> projected_types, + std::vector> key_field_list, + std::vector sort_directions, + std::vector nulls_order, + bool NaN_check) + : ctx_(ctx), + nulls_order_(nulls_order), + sort_directions_(sort_directions), + result_schema_(result_schema), + key_projector_(key_projector), + key_field_list_(key_field_list), + projected_types_(projected_types), + NaN_check_(NaN_check) { + #ifdef DEBUG + std::cout << "use SortMultiplekeyCodegenKernel" << std::endl; + #endif + for (auto field : key_field_list) { + auto indices = result_schema->GetAllFieldIndices(field->name()); + if (indices.size() != 1) { + std::cout << "[ERROR] SortMultiplekeyCodegenKernel can't find key " + << field->ToString() << " from " << result_schema->ToString() + << std::endl; + throw; + } + key_index_list_.push_back(indices[0]); + } + col_num_ = result_schema->num_fields(); + if (!key_projector) { + auto status = LoadJITFunction(key_field_list); + if (!status.ok()) { + std::cout << "LoadJITFunction failed, msg is " << status.message() << std::endl; + throw; + } + } else { + int i = 0; + for (auto type : projected_types) { + auto field = arrow::field(std::to_string(i), type); + projected_field_list_.push_back(field); + i++; + } + auto status = LoadJITFunction(projected_field_list_); + if (!status.ok()) { + std::cout << "LoadJITFunction failed, msg is " << status.message() << std::endl; + throw; + } + } + } + ~SortMultiplekeyCodegenKernel(){} + + arrow::Status LoadJITFunction( + std::vector> key_field_list) { + // generate ddl signature + std::stringstream func_args_ss; + for (int i = 0; i < sort_directions_.size(); i++) { + func_args_ss << "[Sorter]" << (nulls_order_[i] ? "nulls_first" : "nulls_last") + << "|" << (sort_directions_[i] ? "asc" : "desc"); + } + for (int i = 0; i < key_field_list.size(); i++) { + auto field = key_field_list[i]; + func_args_ss << "[Type]" << field->type()->ToString(); + } + +#ifdef DEBUG + std::cout << "func_args_ss is " << func_args_ss.str() << std::endl; +#endif + + std::stringstream signature_ss; + signature_ss << std::hex << std::hash{}(func_args_ss.str()); + signature_ = signature_ss.str(); + + auto file_lock = FileSpinLock(); + auto status = LoadLibrary(signature_, ctx_, &sorter_); + if (!status.ok()) { + // process + auto codes = ProduceCodes(); + // compile codes + RETURN_NOT_OK(CompileCodes(codes, signature_)); + RETURN_NOT_OK(LoadLibrary(signature_, ctx_, &sorter_)); + } + FileSpinUnLock(file_lock); + return arrow::Status::OK(); + } + + arrow::Status Evaluate(const ArrayList& in) override { + num_batches_++; + if (cached_.size() <= col_num_) { + cached_.resize(col_num_ + 1); + } + for (int i = 0; i < col_num_; i++) { + cached_[i].push_back(in[i]); + } + if (!key_projector_) { + ArrayList key_cols; + for (auto idx : key_index_list_) { + key_cols.push_back(in[idx]); + } + sorter_->Evaluate(key_cols); + } else { + std::vector> projected_batch; + // do projection here, and the projected arrays are used for comparison + auto length = in.size() > 0 ? in[0]->length() : 0; + auto in_batch = arrow::RecordBatch::Make(result_schema_, length, in); + RETURN_NOT_OK( + key_projector_->Evaluate(*in_batch, ctx_->memory_pool(), &projected_batch)); + sorter_->Evaluate(projected_batch); + } + items_total_ += in[0]->length(); + length_list_.push_back(in[0]->length()); + return arrow::Status::OK(); + } + + arrow::Status MakeResultIterator( + std::shared_ptr schema, + std::shared_ptr>* out) override { + std::shared_ptr indices_out; + RETURN_NOT_OK(sorter_->FinishInternal(&indices_out)); + *out = std::make_shared(ctx_, schema, indices_out, cached_); + return arrow::Status::OK(); + } + + private: + std::shared_ptr sorter_; + std::vector cached_; + arrow::compute::FunctionContext* ctx_; + std::shared_ptr result_schema_; + std::shared_ptr key_projector_; + std::vector> key_field_list_; + std::vector> projected_types_; + std::vector> projected_field_list_; + std::vector length_list_; + uint64_t num_batches_ = 0; + uint64_t items_total_ = 0; + std::vector key_index_list_; + std::vector nulls_order_; + std::vector sort_directions_; + bool NaN_check_; + int col_num_; + + class TypedSorterCodeGenImpl { + public: + TypedSorterCodeGenImpl(std::string indice, std::shared_ptr data_type) + : indice_(indice), data_type_(data_type) {} + std::string GetCachedVariablesDefine() { + std::stringstream ss; + ss << "using ArrayType_" << indice_ << " = " << GetTypeString(data_type_, "Array") + << ";" << std::endl; + ss << "std::vector> cached_" << indice_ + << "_;" << std::endl; + return ss.str(); + } + + private: + std::string indice_; + std::shared_ptr data_type_; + }; + + std::string ProduceCodes() { + int indice = 0; + std::vector> key_typed_codegen_list; + if (key_projector_) { + for (auto field : projected_field_list_) { + auto codegen = std::make_shared( + std::to_string(indice), field->type()); + key_typed_codegen_list.push_back(codegen); + indice++; + } + } else { + for (auto field : key_field_list_) { + auto codegen = std::make_shared( + std::to_string(indice), field->type()); + key_typed_codegen_list.push_back(codegen); + indice++; + } + } + + std::string cached_insert_str = GetCachedInsert(); + + std::string comp_func_str = GetCompFunction(); + + std::string sort_func_str = GetSortFunction(); + + std::string cached_variables_define_str = + GetCachedVariablesDefine(key_typed_codegen_list); + + return BaseCodes() + R"( +#include + +#include +#include + +#include "codegen/arrow_compute/ext/array_item_index.h" +#include "codegen/common/sort_relation.h" +#include "precompile/builder.h" +#include "precompile/type.h" +#include "third_party/ska_sort.hpp" +#include "third_party/timsort.hpp" +using namespace sparkcolumnarplugin::precompile; + +class TypedSorterImpl : public CodeGenBase { + public: + TypedSorterImpl(arrow::compute::FunctionContext* ctx) : ctx_(ctx) {} + + arrow::Status Evaluate(const ArrayList& in) override { + num_batches_++; + )" + cached_insert_str + + R"( + auto cur = cached_0_[cached_0_.size() - 1]; + items_total_ += cur->length(); + length_list_.push_back(cur->length()); + + return arrow::Status::OK(); + } + + arrow::Status FinishInternal(std::shared_ptr* out) { + // we should support nulls first and nulls last here + // we should also support desc and asc here + )" + comp_func_str + + R"( + // initiate buffer for all arrays + std::shared_ptr indices_buf; + int64_t buf_size = items_total_ * sizeof(ArrayItemIndexS); + RETURN_NOT_OK(arrow::AllocateBuffer(ctx_->memory_pool(), buf_size, &indices_buf)); + + ArrayItemIndexS* indices_begin = + reinterpret_cast(indices_buf->mutable_data()); + ArrayItemIndexS* indices_end = indices_begin + items_total_; + + int64_t indices_i = 0; + for (int array_id = 0; array_id < num_batches_; array_id++) { + for (int64_t i = 0; i < length_list_[array_id]; i++) { + (indices_begin + indices_i)->array_id = array_id; + (indices_begin + indices_i)->id = i; + indices_i++; + } + } + + )" + sort_func_str + + R"( + std::shared_ptr out_type; + RETURN_NOT_OK(MakeFixedSizeBinaryType(sizeof(ArrayItemIndexS) / sizeof(int32_t), &out_type)); + RETURN_NOT_OK(MakeFixedSizeBinaryArray(out_type, items_total_, indices_buf, out)); + return arrow::Status::OK(); + } + + private: + )" + cached_variables_define_str + + R"( + std::vector length_list_; + arrow::compute::FunctionContext* ctx_; + uint64_t num_batches_ = 0; + uint64_t items_total_ = 0; +}; + +extern "C" void MakeCodeGen(arrow::compute::FunctionContext* ctx, + std::shared_ptr* out) { + *out = std::make_shared(ctx); +} + + )"; + } + std::string GetCachedInsert() { + std::stringstream ss; + for (int i = 0; i < key_index_list_.size(); i++) { + ss << "cached_" << i << "_.push_back(std::make_shared(in[" << i << "]));" << std::endl; + } + return ss.str(); + } + std::string GetSortFunction() { + return "gfx::timsort(indices_begin, indices_begin + " + "items_total_, " + "comp);"; + } + std::string GetCompFunction() { + std::stringstream ss; + bool projected; + if (key_projector_) { + projected = true; + } else { + projected = false; + } + ss << "auto comp = [this](ArrayItemIndexS x, ArrayItemIndexS y) {" + << GetCompFunction_(0, projected, key_field_list_, + projected_types_, sort_directions_, nulls_order_) + << "};"; + return ss.str(); + } + std::string GetCompFunction_( + int cur_key_idx, bool projected, + const std::vector>& key_field_list, + const std::vector>& projected_types, + const std::vector& sort_directions, const std::vector& nulls_order) { + std::string comp_str; + bool asc = sort_directions[cur_key_idx]; + bool nulls_first = nulls_order[cur_key_idx]; + std::shared_ptr data_type; + std::string array; + // if projected, use projected batch to compare, and use projected type + array = "cached_"; + if (projected) { + data_type = projected_types[cur_key_idx]; + } else { + data_type = key_field_list[cur_key_idx]->type(); + } + + auto x_num_value = + array + std::to_string(cur_key_idx) + "_[x.array_id]->GetView(x.id)"; + auto x_str_value = + array + std::to_string(cur_key_idx) + "_[x.array_id]->GetString(x.id)"; + auto y_num_value = + array + std::to_string(cur_key_idx) + "_[y.array_id]->GetView(y.id)"; + auto y_str_value = + array + std::to_string(cur_key_idx) + "_[y.array_id]->GetString(y.id)"; + auto is_x_null = array + std::to_string(cur_key_idx) + "_[x.array_id]->IsNull(x.id)"; + auto is_y_null = array + std::to_string(cur_key_idx) + "_[y.array_id]->IsNull(y.id)"; + auto is_x_nan = "std::isnan(" + x_num_value + ")"; + auto is_y_nan = "std::isnan(" + y_num_value + ")"; + + // Multiple keys sorting w/ nulls first/last is supported. + std::stringstream ss; + // We need to determine the position of nulls. + ss << "if (" << is_x_null << ") {\n"; + // If value accessed from x is null, return true to make nulls first. + if (nulls_first) { + ss << "return true;\n}"; + } else { + ss << "return false;\n}"; + } + // If value accessed from y is null, return false to make nulls first. + ss << " else if (" << is_y_null << ") {\n"; + if (nulls_first) { + ss << "return false;\n}"; + } else { + ss << "return true;\n}"; + } + // If datatype is floating, we need to do partition for NaN if NaN check is enabled + if (data_type->id() == arrow::Type::DOUBLE || data_type->id() == arrow::Type::FLOAT) { + if (NaN_check_) { + ss << "else if (" << is_x_nan << ") {\n"; + if (asc) { + ss << "return false;\n}"; + } else { + ss << "return true;\n}"; + } + ss << "else if (" << is_y_nan << ") {\n"; + if (asc) { + ss << "return true;\n}"; + } else { + ss << "return false;\n}"; + } + } + } + + // If values accessed from x and y are both not null + ss << " else {\n"; + + // Multiple keys sorting w/ different ordering is supported. + // For string type of data, GetString should be used instead of GetView. + if (asc) { + if (data_type->id() == arrow::Type::STRING) { + ss << "return " << x_str_value << " < " << y_str_value << ";\n}\n"; + } else { + ss << "return " << x_num_value << " < " << y_num_value << ";\n}\n"; + } + } else { + if (data_type->id() == arrow::Type::STRING) { + ss << "return " << x_str_value << " > " << y_str_value << ";\n}\n"; + } else { + ss << "return " << x_num_value << " > " << y_num_value << ";\n}\n"; + } + } + comp_str = ss.str(); + if ((cur_key_idx + 1) == sort_directions.size()) { + return comp_str; + } + // clear the contents of stringstream + ss.str(std::string()); + if (data_type->id() == arrow::Type::STRING) { + ss << "if ((" << is_x_null << " && " << is_y_null << ") || (" << x_str_value + << " == " << y_str_value << ")) {"; + } else { + if (NaN_check_ && (data_type->id() == arrow::Type::DOUBLE || + data_type->id() == arrow::Type::FLOAT)) { + // need to check NaN + ss << "if ((" << is_x_null << " && " << is_y_null << ") || (" << is_x_nan + << " && " << is_y_nan << ") || (" << x_num_value << " == " << y_num_value + << ")) {"; + } else { + ss << "if ((" << is_x_null << " && " << is_y_null << ") || (" << x_num_value + << " == " << y_num_value << ")) {"; + } + } + ss << GetCompFunction_(cur_key_idx + 1, projected, key_field_list, + projected_types, sort_directions, nulls_order) + << "} else { " << comp_str << "}"; + return ss.str(); + } + std::string GetCachedVariablesDefine( + std::vector> shuffle_typed_codegen_list) { + std::stringstream ss; + for (auto codegen : shuffle_typed_codegen_list) { + ss << codegen->GetCachedVariablesDefine() << std::endl; + } + return ss.str(); + } + + class SorterResultIterator : public ResultIterator { + public: + SorterResultIterator(arrow::compute::FunctionContext* ctx, + std::shared_ptr schema, + std::shared_ptr indices_in, + std::vector& cached) + : ctx_(ctx), + schema_(schema), + indices_in_cache_(indices_in), + total_length_(indices_in->length()), + cached_in_(cached) { + col_num_ = schema->num_fields(); + indices_begin_ = (ArrayItemIndexS*)indices_in->value_data(); + // appender_type won't be used + AppenderBase::AppenderType appender_type = AppenderBase::left; + for (int i = 0; i < col_num_; i++) { + auto field = schema->field(i); + std::shared_ptr appender; + MakeAppender(ctx_, field->type(), appender_type, &appender); + appender_list_.push_back(appender); + } + for (int i = 0; i < col_num_; i++) { + arrow::ArrayVector array_vector = cached_in_[i]; + int array_num = array_vector.size(); + for (int array_id = 0; array_id < array_num; array_id++) { + auto arr = array_vector[array_id]; + appender_list_[i]->AddArray(arr); + } + } + batch_size_ = GetBatchSize(); + } + ~SorterResultIterator(){} + + std::string ToString() override { return "SortArraysToIndicesResultIterator"; } + + bool HasNext() override { + if (offset_ >= total_length_) { + return false; + } + return true; + } + + arrow::Status Next(std::shared_ptr* out) { + auto length = (total_length_ - offset_) > batch_size_ ? batch_size_ + : (total_length_ - offset_); + uint64_t count = 0; + for (int i = 0; i < col_num_; i++) { + while (count < length) { + auto item = indices_begin_ + offset_ + count++; + RETURN_NOT_OK(appender_list_[i]->Append(item->array_id, item->id)); + } + count = 0; + } + offset_ += length; + ArrayList arrays; + for (int i = 0; i < col_num_; i++) { + std::shared_ptr out_array; + RETURN_NOT_OK(appender_list_[i]->Finish(&out_array)); + arrays.push_back(out_array); + appender_list_[i]->Reset(); + } + + *out = arrow::RecordBatch::Make(schema_, length, arrays); + return arrow::Status::OK(); + } + + private: + uint64_t offset_ = 0; + const uint64_t total_length_; + std::shared_ptr schema_; + arrow::compute::FunctionContext* ctx_; + uint64_t batch_size_; + int col_num_; + ArrayItemIndexS* indices_begin_; + std::vector cached_in_; + std::vector> type_list_; + std::vector> appender_list_; + std::vector> array_list_; + std::shared_ptr indices_in_cache_; + }; +}; + arrow::Status SortArraysToIndicesKernel::Make( arrow::compute::FunctionContext* ctx, std::shared_ptr result_schema, @@ -1988,17 +2482,12 @@ SortArraysToIndicesKernel::SortArraysToIndicesKernel( } else { if (do_codegen) { // Will use Sort Codegen for multiple-key sort - impl_.reset(new Impl(ctx, result_schema, key_projector, projected_types, - key_field_list, sort_directions, nulls_order, NaN_check)); - auto status = impl_->LoadJITFunction(key_field_list, result_schema); - if (!status.ok()) { - std::cout << "LoadJITFunction failed, msg is " << status.message() << std::endl; - throw; - } + impl_.reset(new SortMultiplekeyCodegenKernel(ctx, result_schema, key_projector, + projected_types, key_field_list, sort_directions, nulls_order, NaN_check)); } else { // Will use Sort without Codegen for multiple-key sort impl_.reset(new SortMultiplekeyKernel(ctx, result_schema, key_projector, - projected_types, key_field_list, sort_directions, nulls_order, NaN_check)); + projected_types, key_field_list, sort_directions, nulls_order, NaN_check)); } } kernel_name_ = "SortArraysToIndicesKernel";