Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick](branch-30) pick prs (#41779) (#41623) #44406

Open
wants to merge 2 commits into
base: branch-3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ void NestedLoopJoinProbeLocalState::_process_left_child_block(
.get_data()
.resize_fill(origin_sz + max_added_rows, 0);
} else {
// TODO: for cross join, maybe could insert one row, and wrap for a const column
dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, max_added_rows);
}
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ class PipelineTask {

RuntimeState* runtime_state() const { return _state; }

RuntimeProfile* get_task_profile() const { return _task_profile.get(); }

std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); }

void stop_if_finished() {
Expand Down
9 changes: 9 additions & 0 deletions be/src/udf/udf.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <vector>

#include "runtime/types.h"
#include "util/runtime_profile.h"
#include "vec/common/arena.h"

namespace doris {
Expand Down Expand Up @@ -88,6 +89,12 @@ class FunctionContext {
_jsonb_string_as_string = jsonb_string_as_string;
}

void set_udf_execute_timer(RuntimeProfile::Counter* udf_execute_timer) {
_udf_execute_timer = udf_execute_timer;
}

RuntimeProfile::Counter* get_udf_execute_timer() { return _udf_execute_timer; }

// Cast flag, when enable string_as_jsonb_string, string casting to jsonb will not parse string
// instead just insert a string literal
bool string_as_jsonb_string() const { return _string_as_jsonb_string; }
Expand Down Expand Up @@ -176,6 +183,8 @@ class FunctionContext {

std::vector<std::shared_ptr<doris::ColumnPtrWrapper>> _constant_cols;

//udf execute timer
RuntimeProfile::Counter* _udf_execute_timer = nullptr;
bool _check_overflow_for_decimal = false;

bool _string_as_jsonb_string = false;
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,12 @@ void ColumnArray::insert_indices_from(const IColumn& src, const uint32_t* indice
}
}

void ColumnArray::insert_many_from(const IColumn& src, size_t position, size_t length) {
for (auto x = 0; x != length; ++x) {
ColumnArray::insert_from(src, position);
}
}

ColumnPtr ColumnArray::replicate(const IColumn::Offsets& replicate_offsets) const {
if (replicate_offsets.empty()) return clone_empty();

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/columns/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
size_t byte_size() const override;
size_t allocated_bytes() const override;
ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override;
void insert_many_from(const IColumn& src, size_t position, size_t length) override;

ColumnPtr convert_to_full_column_if_const() const override;

Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/columns/column_complex.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ class ColumnComplexType final : public COWHelper<IColumn, ColumnComplexType<T>>
}
}

void insert_many_from(const IColumn& src, size_t position, size_t length) override {
const Self& src_vec = assert_cast<const Self&>(src);
auto val = src_vec.get_element(position);
for (uint32_t i = 0; i < length; ++i) {
data.emplace_back(val);
}
}

void pop_back(size_t n) override { data.erase(data.end() - n, data.end()); }
// it's impossible to use ComplexType as key , so we don't have to implement them
[[noreturn]] StringRef serialize_value_into_arena(size_t n, Arena& arena,
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/columns/column_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
s += length;
}

void insert_many_from(const IColumn& src, size_t position, size_t length) override {
s += length;
}

void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
const uint32_t* indices_end) override {
s += (indices_end - indices_begin);
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/columns/column_decimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,14 @@ void ColumnDecimal<T>::insert_many_fix_len_data(const char* data_ptr, size_t num
}
}

template <typename T>
void ColumnDecimal<T>::insert_many_from(const IColumn& src, size_t position, size_t length) {
auto old_size = data.size();
data.resize(old_size + length);
auto& vals = assert_cast<const Self&>(src).get_data();
std::fill(&data[old_size], &data[old_size + length], vals[position]);
}

template <typename T>
void ColumnDecimal<T>::insert_range_from(const IColumn& src, size_t start, size_t length) {
const ColumnDecimal& src_vec = assert_cast<const ColumnDecimal&>(src);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class ColumnDecimal final : public COWHelper<IColumn, ColumnDecimal<T>> {
memset(data.data() + old_size, 0, length * sizeof(data[0]));
}

void insert_many_from(const IColumn& src, size_t position, size_t length) override;

void pop_back(size_t n) override { data.resize_assume_reserved(data.size() - n); }

StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override;
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ void ColumnMap::insert_indices_from(const IColumn& src, const uint32_t* indices_
}
}

void ColumnMap::insert_many_from(const IColumn& src, size_t position, size_t length) {
for (auto x = 0; x != length; ++x) {
ColumnMap::insert_from(src, position);
}
}

StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const {
size_t array_size = size_at(n);
size_t offset = offset_at(n);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
const uint32_t* indices_end) override;

void insert_many_from(const IColumn& src, size_t position, size_t length) override;

void append_data_by_selector(MutableColumnPtr& res,
const IColumn::Selector& selector) const override {
return append_data_by_selector_impl<ColumnMap>(res, selector);
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ void ColumnNullable::insert_many_strings(const StringRef* strings, size_t num) {
}
}

void ColumnNullable::insert_many_from(const IColumn& src, size_t position, size_t length) {
const auto& nullable_col = assert_cast<const ColumnNullable&>(src);
get_null_map_column().insert_many_from(nullable_col.get_null_map_column(), position, length);
get_nested_column().insert_many_from(*nullable_col.nested_column, position, length);
}

StringRef ColumnNullable::serialize_value_into_arena(size_t n, Arena& arena,
char const*& begin) const {
const auto& arr = get_null_map_data();
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable>, public N
void insert(const Field& x) override;
void insert_from(const IColumn& src, size_t n) override;

void insert_many_from(const IColumn& src, size_t position, size_t length) override;

template <typename ColumnType>
void insert_from_with_type(const IColumn& src, size_t n) {
const auto& src_concrete = assert_cast<const ColumnNullable&>(src);
Expand Down
22 changes: 22 additions & 0 deletions be/src/vec/columns/column_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,28 @@ void ColumnStr<T>::insert_range_from(const IColumn& src, size_t start, size_t le
}
}

template <typename T>
void ColumnStr<T>::insert_many_from(const IColumn& src, size_t position, size_t length) {
const auto& string_column = assert_cast<const ColumnStr<T>&>(src);
auto [data_val, data_length] = string_column.get_data_at(position);

size_t old_chars_size = chars.size();
check_chars_length(old_chars_size + data_length * length, offsets.size() + length);
chars.resize(old_chars_size + data_length * length);

auto old_size = offsets.size();
offsets.resize(old_size + length);

auto start_pos = old_size;
auto end_pos = old_size + length;
auto prev_pos = old_chars_size;
for (; start_pos < end_pos; ++start_pos) {
memcpy(&chars[prev_pos], data_val, data_length);
offsets[start_pos] = prev_pos + data_length;
prev_pos = prev_pos + data_length;
}
}

template <typename T>
void ColumnStr<T>::insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
const uint32_t* indices_end) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ class ColumnStr final : public COWHelper<IColumn, ColumnStr<T>> {
offsets.push_back(new_size);
}

void insert_many_from(const IColumn& src, size_t position, size_t length) override;

bool is_column_string64() const override { return sizeof(T) == sizeof(uint64_t); }

void insert_from(const IColumn& src_, size_t n) override {
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/columns/column_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,13 @@ void ColumnStruct::insert_indices_from(const IColumn& src, const uint32_t* indic
}
}

void ColumnStruct::insert_many_from(const IColumn& src, size_t position, size_t length) {
const auto& src_concrete = assert_cast<const ColumnStruct&>(src);
for (size_t i = 0; i < columns.size(); ++i) {
columns[i]->insert_many_from(src_concrete.get_column(i), position, length);
}
}

void ColumnStruct::insert_range_from(const IColumn& src, size_t start, size_t length) {
const size_t tuple_size = columns.size();
for (size_t i = 0; i < tuple_size; ++i) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
const uint32_t* indices_end) override;

void insert_many_from(const IColumn& src, size_t position, size_t length) override;

void append_data_by_selector(MutableColumnPtr& res, const Selector& selector) const override {
return append_data_by_selector_impl<ColumnStruct>(res, selector);
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/columns/column_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,14 @@ size_t ColumnVector<T>::filter(const IColumn::Filter& filter) {
return new_size;
}

template <typename T>
void ColumnVector<T>::insert_many_from(const IColumn& src, size_t position, size_t length) {
auto old_size = data.size();
data.resize(old_size + length);
auto& vals = assert_cast<const Self&>(src).get_data();
std::fill(&data[old_size], &data[old_size + length], vals[position]);
}

template <typename T>
ColumnPtr ColumnVector<T>::permute(const IColumn::Permutation& perm, size_t limit) const {
size_t size = data.size();
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ class ColumnVector final : public COWHelper<IColumn, ColumnVector<T>> {
std::fill(data.data() + old_size, data.data() + old_size + n, val);
}

void insert_many_from(const IColumn& src, size_t position, size_t length) override;

void insert_range_of_integer(T begin, T end) {
if constexpr (std::is_integral_v<T>) {
auto old_size = data.size();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exprs/vcase_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Status VCaseExpr::open(RuntimeState* state, VExprContext* context,
for (auto& i : _children) {
RETURN_IF_ERROR(i->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exprs/vcast_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* context,
for (auto& i : _children) {
RETURN_IF_ERROR(i->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exprs/vectorized_fn_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "common/config.h"
#include "common/consts.h"
#include "common/status.h"
#include "pipeline/pipeline_task.h"
#include "runtime/runtime_state.h"
#include "udf/udf.h"
#include "vec/columns/column.h"
Expand Down Expand Up @@ -125,7 +126,7 @@ Status VectorizedFnCall::open(RuntimeState* state, VExprContext* context,
for (auto& i : _children) {
RETURN_IF_ERROR(i->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
Expand Down
9 changes: 8 additions & 1 deletion be/src/vec/exprs/vexpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/config.h"
#include "common/exception.h"
#include "common/status.h"
#include "pipeline/pipeline_task.h"
#include "runtime/define_primitive_type.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
Expand Down Expand Up @@ -562,7 +563,7 @@ void VExpr::register_function_context(RuntimeState* state, VExprContext* context
_fn_context_index = context->register_function_context(state, _type, arg_types);
}

Status VExpr::init_function_context(VExprContext* context,
Status VExpr::init_function_context(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope,
const FunctionBasePtr& function) const {
FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
Expand All @@ -574,6 +575,12 @@ Status VExpr::init_function_context(VExprContext* context,
constant_cols.push_back(const_col);
}
fn_ctx->set_constant_cols(constant_cols);
} else {
if (function->is_udf_function()) {
auto* timer = ADD_TIMER(state->get_task()->get_task_profile(),
"UDF[" + function->get_name() + "]");
fn_ctx->set_udf_execute_timer(timer);
}
}

if (scope == FunctionContext::FRAGMENT_LOCAL) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ class VExpr {
/// 1. Set constant columns result of function arguments.
/// 2. Call function's prepare() to initialize function state, fragment-local or
/// thread-local according the input `FunctionStateScope` argument.
Status init_function_context(VExprContext* context, FunctionContext::FunctionStateScope scope,
Status init_function_context(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope,
const FunctionBasePtr& function) const;

/// Helper function to close function context, fragment-local or thread-local according
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exprs/vin_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Status VInPredicate::open(RuntimeState* state, VExprContext* context,
for (auto& child : _children) {
RETURN_IF_ERROR(child->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exprs/vmatch_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Status VMatchPredicate::open(RuntimeState* state, VExprContext* context,
for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
RETURN_IF_ERROR(VExpr::init_function_context(state, context, scope, _function));
if (scope == FunctionContext::THREAD_LOCAL || scope == FunctionContext::FRAGMENT_LOCAL) {
context->fn_context(_fn_context_index)->set_function_state(scope, _inverted_index_ctx);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/functions/function.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ class IFunctionBase {

virtual bool is_use_default_implementation_for_constants() const = 0;

virtual bool is_udf_function() const { return false; }

/// The property of monotonicity for a certain range.
struct Monotonicity {
bool is_monotonic = false; /// Is the function monotonous (nondecreasing or nonincreasing).
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/functions/function_java_udf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));

if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) {
SCOPED_TIMER(context->get_udf_execute_timer());
std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>();
context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);

Expand Down Expand Up @@ -96,7 +97,7 @@ Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block,
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
JniContext* jni_ctx = reinterpret_cast<JniContext*>(
context->get_function_state(FunctionContext::THREAD_LOCAL));

SCOPED_TIMER(context->get_udf_execute_timer());
std::unique_ptr<long[]> input_table;
RETURN_IF_ERROR(JniConnector::to_java_table(&block, num_rows, arguments, input_table));
auto input_table_schema = JniConnector::parse_table_schema(&block, arguments, true);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/functions/function_java_udf.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class JavaFunctionCall : public IFunctionBase {

bool is_use_default_implementation_for_constants() const override { return true; }

bool is_udf_function() const override { return true; }

private:
const TFunction& fn_;
const DataTypes _argument_types;
Expand Down
Loading