diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index c2e102b2d31c7c..d83b1a394e6388 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -24,6 +24,7 @@ set(VEC_FILES aggregate_functions/aggregate_function_null.cpp aggregate_functions/aggregate_function_sum.cpp aggregate_functions/aggregate_function_min_max.cpp + aggregate_functions/aggregate_function_avg.cpp columns/collator.cpp columns/column.cpp columns/column_const.cpp diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.cpp b/be/src/vec/aggregate_functions/aggregate_function_avg.cpp new file mode 100644 index 00000000000000..579f69866162b9 --- /dev/null +++ b/be/src/vec/aggregate_functions/aggregate_function_avg.cpp @@ -0,0 +1,50 @@ +#include "vec/aggregate_functions/aggregate_function_simple_factory.h" +#include "vec/aggregate_functions/aggregate_function_avg.h" +#include "vec/aggregate_functions/helpers.h" +#include "vec/aggregate_functions/factory_helpers.h" + +namespace doris::vectorized +{ + +namespace +{ + +template +struct Avg +{ + using FieldType = std::conditional_t, Decimal128, NearestFieldType>; + using Function = AggregateFunctionAvg>; +}; + +template +using AggregateFuncAvg = typename Avg::Function; + +AggregateFunctionPtr createAggregateFunctionAvg(const std::string & name, const DataTypes & argument_types, const Array & parameters) +{ + assertNoParameters(name, parameters); + assertUnary(name, argument_types); + + AggregateFunctionPtr res; + DataTypePtr data_type = argument_types[0]; + if (isDecimal(data_type)) + res.reset(createWithDecimalType(*data_type, *data_type, argument_types)); + else + res.reset(createWithNumericType(*data_type, argument_types)); + + if (!res) + throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + return res; +} + +} + +//void registerAggregateFunctionAvg(AggregateFunctionFactory & factory) +//{ +// factory.registerFunction("avg", createAggregateFunctionAvg, AggregateFunctionFactory::CaseInsensitive); +//} + +void registerAggregateFunctionAvg(AggregateFunctionSimpleFactory& factory) { + factory.registerFunction("avg", createAggregateFunctionAvg); +} +} diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h b/be/src/vec/aggregate_functions/aggregate_function_avg.h new file mode 100644 index 00000000000000..711c311f1fe61a --- /dev/null +++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h @@ -0,0 +1,115 @@ +#pragma once + +#include "vec/data_types/data_types_number.h" +#include "vec/data_types/data_types_decimal.h" +#include "vec/columns/columns_number.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/io/io_helper.h" + +namespace doris::vectorized +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +template +struct AggregateFunctionAvgData +{ + T sum = 0; + UInt64 count = 0; + + template + ResultT NO_SANITIZE_UNDEFINED result() const + { + if constexpr (std::is_floating_point_v) + if constexpr (std::numeric_limits::is_iec559) + return static_cast(sum) / count; /// allow division by zero + + if (!count) + throw Exception("AggregateFunctionAvg with zero values", ErrorCodes::LOGICAL_ERROR); + return static_cast(sum) / count; + } + + void write(std::ostream& buf) const { + writeBinary(sum, buf); + writeBinary(count, buf); + } + + void read(std::istream& buf) { + readBinary(sum, buf); + readBinary(count, buf); + } +}; + + +/// Calculates arithmetic mean of numbers. +template +class AggregateFunctionAvg final : public IAggregateFunctionDataHelper> +{ +public: + using ResultType = std::conditional_t, Decimal128, Float64>; + using ResultDataType = std::conditional_t, DataTypeDecimal, DataTypeNumber>; + using ColVecType = std::conditional_t, ColumnDecimal, ColumnVector>; + using ColVecResult = std::conditional_t, ColumnDecimal, ColumnVector>; + + /// ctor for native types + AggregateFunctionAvg(const DataTypes & argument_types_) + : IAggregateFunctionDataHelper>(argument_types_, {}) + , scale(0) + {} + + /// ctor for Decimals + AggregateFunctionAvg(const IDataType & data_type, const DataTypes & argument_types_) + : IAggregateFunctionDataHelper>(argument_types_, {}) + , scale(getDecimalScale(data_type)) + {} + + String getName() const override { return "avg"; } + + DataTypePtr getReturnType() const override + { + if constexpr (IsDecimalNumber) + return std::make_shared(ResultDataType::maxPrecision(), scale); + else + return std::make_shared(); + } + + void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override + { + const auto & column = static_cast(*columns[0]); + this->data(place).sum += column.getData()[row_num]; + ++this->data(place).count; + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override + { + this->data(place).sum += this->data(rhs).sum; + this->data(place).count += this->data(rhs).count; + } + + void serialize(ConstAggregateDataPtr place, std::ostream& buf) const override + { + this->data(place).write(buf); + } + + void deserialize(AggregateDataPtr place, std::istream& buf, Arena *) const override + { + this->data(place).read(buf); + } + + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + { + auto & column = static_cast(to); + column.getData().push_back(this->data(place).template result()); + } + + const char * getHeaderFilePath() const override { return __FILE__; } + +private: + UInt32 scale; +}; + + +} diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.cpp b/be/src/vec/aggregate_functions/aggregate_function_null.cpp index d411e90443663d..495c47cbbffbca 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_null.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_null.cpp @@ -104,6 +104,7 @@ void registerAggregateFunctionCombinatorNull(AggregateFunctionSimpleFactory& fac factory.registerFunction("sum", creator, true); factory.registerFunction("max", creator, true); factory.registerFunction("min", creator, true); + factory.registerFunction("avg", creator, true); } } // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h index a2ebd155784fa5..6f721cc05ed971 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h +++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h @@ -33,6 +33,7 @@ class AggregateFunctionSimpleFactory; void registerAggregateFunctionSum(AggregateFunctionSimpleFactory& factory); void registerAggregateFunctionCombinatorNull(AggregateFunctionSimpleFactory& factory); void registerAggregateFunctionMinMax(AggregateFunctionSimpleFactory& factory); +void registerAggregateFunctionAvg(AggregateFunctionSimpleFactory& factory); using DataTypePtr = std::shared_ptr; using DataTypes = std::vector; @@ -82,6 +83,7 @@ class AggregateFunctionSimpleFactory { std::call_once(oc, [&]() { registerAggregateFunctionSum(instance); registerAggregateFunctionMinMax(instance); + registerAggregateFunctionAvg(instance); registerAggregateFunctionCombinatorNull(instance); }); return instance; diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index c30c29841799ef..ab2fa262b89b87 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -159,13 +159,34 @@ class ColumnNullable final : public COWHelper { void checkConsistency() const; bool has_null() const { - auto begin = getNullMapData().begin(); - auto end = getNullMapData().end(); - while (begin < end) { - if (*begin != 0) { - return *begin; + size_t size = getNullMapData().size(); + const UInt8* null_pos = getNullMapData().data(); + const UInt8* null_pos_end = getNullMapData().data() + size; + #ifdef __SSE2__ + /** A slightly more optimized version. + * Based on the assumption that often pieces of consecutive values + * completely pass or do not pass the filter. + * Therefore, we will optimistically check the parts of `SIMD_BYTES` values. + */ + static constexpr size_t SIMD_BYTES = 16; + const __m128i zero16 = _mm_setzero_si128(); + const UInt8* null_end_sse = null_pos + size / SIMD_BYTES * SIMD_BYTES; + + while (null_pos < null_end_sse) { + int mask = _mm_movemask_epi8(_mm_cmpgt_epi8( + _mm_loadu_si128(reinterpret_cast(null_pos)), zero16)); + + if (0 != mask) { + return true; } - ++begin; + null_pos += SIMD_BYTES; + } +#endif + while (null_pos < null_pos_end) { + if (*null_pos != 0) { + return true; + } + null_pos++; } return false; } diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index 774428037ee50f..cb4cc916d91ddb 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -116,7 +116,7 @@ struct ReceiveQueueSortCursorImpl : public SortCursorImpl { desc[i].direction = is_asc_order[i] ? 1 : -1; desc[i].nulls_direction = nulls_first[i] ? 1 : -1; } - has_next_block(); + _is_eof = !has_next_block(); } bool has_next_block() override { diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 48158924c886f5..740c7d5f2a0af8 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -47,10 +47,10 @@ struct AggregationMethodSerialized { Data data; - AggregationMethodSerialized() {} + AggregationMethodSerialized() = default; template - AggregationMethodSerialized(const Other& other) : data(other.data) {} + explicit AggregationMethodSerialized(const Other& other) : data(other.data) {} using State = ColumnsHashing::HashMethodSerialized; @@ -153,6 +153,7 @@ class AggregationNode : public ::doris::ExecNode { using vectorized_execute = std::function; using vectorized_get_result = std::function; + struct executor { vectorized_execute execute; vectorized_get_result get_result; diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index c13b2dba2ff750..3a0ff9ce302163 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -124,7 +124,7 @@ Status VSortNode::sort_input(RuntimeState* state) { do { Block block; RETURN_IF_ERROR(child(0)->get_next(state, &block, &eos)); - if (!eos && block.rows() != 0) { + if ( block.rows() != 0) { RETURN_IF_ERROR(pretreat_block(block)); _sorted_blocks.emplace_back(std::move(block)); RETURN_IF_CANCELLED(state); diff --git a/be/src/vec/exec/vsort_node.h b/be/src/vec/exec/vsort_node.h index c508a5a1fa03a5..25386286c07a5e 100644 --- a/be/src/vec/exec/vsort_node.h +++ b/be/src/vec/exec/vsort_node.h @@ -28,15 +28,9 @@ namespace doris { namespace vectorized { -// Node that implements a full sort of its input with a fixed memory budget, spilling -// to disk if the input is larger than available memory. -// Uses SpillSorter and BufferedBlockMgr for the external sort implementation. -// Input rows to SortNode are materialized by the SpillSorter into a single tuple -// using the expressions specified in _sort_exec_exprs. -// In get_next(), SortNode passes in the output batch to the sorter instance created -// in open() to fill it with sorted rows. -// If a merge phase was performed in the sort, sorted rows are deep copied into -// the output batch. Otherwise, the sorter instance owns the sorted data. +// Node that implements a full sort of its input with a fixed memory budget +// In open() the input Block to VSortNode will sort firstly, using the expressions specified in _sort_exec_exprs. +// In get_next(), VSortNode do the merge sort to gather data to a new block // support spill to disk in the future class VSortNode : public doris::ExecNode { @@ -75,7 +69,7 @@ class VSortNode : public doris::ExecNode { // Number of rows to skip. int64_t _offset; - // Expressions and parameters used for tuple materialization and tuple comparison. + // Expressions and parameters used for build _sort_description VSortExecExprs _vsort_exec_exprs; std::vector _is_asc_order; std::vector _nulls_first; @@ -85,11 +79,9 @@ class VSortNode : public doris::ExecNode { std::vector _sorted_blocks; std::priority_queue _priority_queue; + // TODO: Not using now, maybe should be delete // Keeps track of the number of rows skipped for handling _offset. int64_t _num_rows_skipped; - - // END: Members that must be reset() - ///////////////////////////////////////// }; } diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 4d1942f0b5a0a3..782a361f20f1e3 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -31,17 +31,12 @@ class RuntimeProfile; namespace vectorized { class Block; -// VSortedRunMerger is used to merge multiple sorted runs of tuples. A run is a sorted -// sequence of row batches, which are fetched from a BlockSupplier function object. +// VSortedRunMerger is used to merge multiple sorted runs of blocks. A run is a sorted +// sequence of blocks, which are fetched from a BlockSupplier function object. // Merging is implemented using a binary min-heap that maintains the run with the next -// tuple in sorted order at the top of the heap. +// rows in sorted order at the top of the heap. // -// Merged batches of rows are retrieved from VSortedRunMerger via calls to get_next(). -// The merger is constructed with a boolean flag deep_copy_input. -// If true, sorted output rows are deep copied into the data pool of the output batch. -// If false, get_next() only copies tuple pointers (TupleRows) into the output batch, -// and transfers resource ownership from the input batches to the output batch when -// an input batch is processed. +// Merged block of rows are retrieved from VSortedRunMerger via calls to get_next(). class VSortedRunMerger { public: // Function that returns the next block of rows from an input sorted run. The batch @@ -57,10 +52,10 @@ class VSortedRunMerger { // the priority queue. Status prepare(const std::vector& input_runs, bool parallel = false); - // Return the next batch of sorted rows from this merger. + // Return the next block of sorted rows from this merger. Status get_next(Block* output_block, bool *eos); - // Only Child class implement this Method, Return the next batch of sorted rows from this merger. + // Do not support now virtual Status get_batch(RowBatch **output_batch) { return Status::InternalError("no support method get_batch(RowBatch** output_batch)"); } @@ -80,9 +75,6 @@ class VSortedRunMerger { Block _empty_block; - // Pool of BatchedRowSupplier instances. - ObjectPool _pool; - // Times calls to get_next(). RuntimeProfile::Counter *_get_next_timer;