From 58628d52e17bf153904442dc71ba78d2cf693dac Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 21 Jul 2025 14:52:29 +0800 Subject: [PATCH 01/21] add BE part of FunctionCrc32Internal --- be/src/vec/functions/function_string.cpp | 1 + be/src/vec/functions/function_string.h | 41 ++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/be/src/vec/functions/function_string.cpp b/be/src/vec/functions/function_string.cpp index 59bc8302f687ce..3b0fde131eb08b 100644 --- a/be/src/vec/functions/function_string.cpp +++ b/be/src/vec/functions/function_string.cpp @@ -1408,6 +1408,7 @@ void register_function_string(SimpleFunctionFactory& factory) { factory.register_function(); factory.register_function(); factory.register_function(); + factory.register_function(); factory.register_alias(FunctionLeft::name, "strleft"); factory.register_alias(FunctionRight::name, "strright"); diff --git a/be/src/vec/functions/function_string.h b/be/src/vec/functions/function_string.h index e029b83afd20fa..acc773d40fe79e 100644 --- a/be/src/vec/functions/function_string.h +++ b/be/src/vec/functions/function_string.h @@ -45,6 +45,8 @@ #include "common/exception.h" #include "common/status.h" #include "runtime/decimalv2_value.h" +#include "runtime/primitive_type.h" +#include "runtime/raw_value.h" #include "runtime/string_search.hpp" #include "util/sha.h" #include "util/string_util.h" @@ -4933,5 +4935,44 @@ class FunctionXPathString : public IFunction { return Status::OK(); } }; + +// ATTN: for debug only +// compute crc32 hash value as the same way in `VOlapTablePartitionParam::find_tablets()` +class FunctionCrc32Internal : public IFunction { +public: + static constexpr auto name = "crc32_internal"; + static FunctionPtr create() { return std::make_shared(); } + String get_name() const override { return name; } + size_t get_number_of_arguments() const override { return 1; } + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return std::make_shared(); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + DCHECK_EQ(arguments.size(), 1); + + ColumnPtr src_col = block.get_by_position(arguments[0]).column; + auto primitive_type = block.get_by_position(arguments[0]).type->get_primitive_type(); + + auto res_col = ColumnInt64::create(); + auto& res_data = res_col->get_data(); + res_data.resize_fill(input_rows_count, 0); + + for (size_t i = 0; i < input_rows_count; ++i) { + uint32_t hash_val = 0; + auto val = src_col->get_data_at(i); + if (val.data != nullptr) { + hash_val = RawValue::zlib_crc32(val.data, val.size, primitive_type, hash_val); + } else { + hash_val = HashUtil::zlib_crc_hash_null(hash_val); + } + res_data[i] = hash_val; + } + + block.replace_by_position(result, std::move(res_col)); + return Status::OK(); + } +}; #include "common/compile_check_avoid_end.h" } // namespace doris::vectorized From 252717ba69ec90cc5a2ab2da58ea0b8495b2aa36 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 21 Jul 2025 15:39:45 +0800 Subject: [PATCH 02/21] add FE part of FunctionCrc32Internal --- .../doris/catalog/BuiltinScalarFunctions.java | 2 + .../functions/scalar/Crc32Internal.java | 69 +++++++++++++++++++ .../visitor/ScalarFunctionVisitor.java | 5 ++ 3 files changed, 76 insertions(+) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java index 1adeec5fa36b29..6b1159d5a4e056 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java @@ -136,6 +136,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.CountEqual; import org.apache.doris.nereids.trees.expressions.functions.scalar.CountSubstring; import org.apache.doris.nereids.trees.expressions.functions.scalar.Crc32; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Crc32Internal; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateMap; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateNamedStruct; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateStruct; @@ -762,6 +763,7 @@ public class BuiltinScalarFunctions implements FunctionHelper { scalar(Left.class, "left", "strleft"), scalar(Length.class, "length"), scalar(Crc32.class, "crc32"), + scalar(Crc32Internal.class, "crc32_internal"), scalar(Like.class, "like"), scalar(Ln.class, "ln", "dlog1"), scalar(Locate.class, "locate"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java new file mode 100644 index 00000000000000..1018299992e014 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.scalar; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; +import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable; +import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.nereids.types.BigIntType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * for debug only, compute crc32 hash value as the same way in `VOlapTablePartitionParam::find_tablets()` + */ +public class Crc32Internal extends ScalarFunction + implements UnaryExpression, ExplicitlyCastableSignature, PropagateNullable { + + public static final List SIGNATURES = ImmutableList.of( + FunctionSignature.ret(BigIntType.INSTANCE).args(AnyDataType.INSTANCE_WITHOUT_INDEX) + ); + + /** + * constructor with 1 argument. + */ + public Crc32Internal(Expression arg) { + super("crc32_internal", arg); + } + + /** + * withChildren. + */ + @Override + public Crc32Internal withChildren(List children) { + Preconditions.checkArgument(children.size() == 1); + return new Crc32Internal(children.get(0)); + } + + @Override + public List getSignatures() { + return SIGNATURES; + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitCrc32Internal(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java index 3b8a0cd9bbccb8..9e35c3fc9eeb00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java @@ -144,6 +144,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.CountEqual; import org.apache.doris.nereids.trees.expressions.functions.scalar.CountSubstring; import org.apache.doris.nereids.trees.expressions.functions.scalar.Crc32; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Crc32Internal; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateMap; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateNamedStruct; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateStruct; @@ -1560,6 +1561,10 @@ default R visitCrc32(Crc32 crc32, C context) { return visitScalarFunction(crc32, context); } + default R visitCrc32Internal(Crc32Internal crc32Internal, C context) { + return visitScalarFunction(crc32Internal, context); + } + default R visitLike(Like like, C context) { return visitStringRegexPredicate(like, context); } From 80741b60294a61b67abcc4abafe93a437c283d37 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 21 Jul 2025 16:38:49 +0800 Subject: [PATCH 03/21] varargs --- be/src/vec/functions/function_string.h | 33 ++++++++++++++----- .../functions/scalar/Crc32Internal.java | 14 ++++---- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/be/src/vec/functions/function_string.h b/be/src/vec/functions/function_string.h index acc773d40fe79e..85a705e7753b90 100644 --- a/be/src/vec/functions/function_string.h +++ b/be/src/vec/functions/function_string.h @@ -45,6 +45,7 @@ #include "common/exception.h" #include "common/status.h" #include "runtime/decimalv2_value.h" +#include "runtime/define_primitive_type.h" #include "runtime/primitive_type.h" #include "runtime/raw_value.h" #include "runtime/string_search.hpp" @@ -4943,17 +4944,26 @@ class FunctionCrc32Internal : public IFunction { static constexpr auto name = "crc32_internal"; static FunctionPtr create() { return std::make_shared(); } String get_name() const override { return name; } - size_t get_number_of_arguments() const override { return 1; } + size_t get_number_of_arguments() const override { return 0; } + bool is_variadic() const override { return true; } DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { return std::make_shared(); } Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, uint32_t result, size_t input_rows_count) const override { - DCHECK_EQ(arguments.size(), 1); + DCHECK_GE(arguments.size(), 1); + + auto argument_size = arguments.size(); + std::vector argument_columns(argument_size); + std::vector argument_primitive_types(argument_size); - ColumnPtr src_col = block.get_by_position(arguments[0]).column; - auto primitive_type = block.get_by_position(arguments[0]).type->get_primitive_type(); + for (size_t i = 0; i < argument_size; ++i) { + argument_columns[i] = + block.get_by_position(arguments[i]).column->convert_to_full_column_if_const(); + argument_primitive_types[i] = + block.get_by_position(arguments[i]).type->get_primitive_type(); + } auto res_col = ColumnInt64::create(); auto& res_data = res_col->get_data(); @@ -4961,11 +4971,15 @@ class FunctionCrc32Internal : public IFunction { for (size_t i = 0; i < input_rows_count; ++i) { uint32_t hash_val = 0; - auto val = src_col->get_data_at(i); - if (val.data != nullptr) { - hash_val = RawValue::zlib_crc32(val.data, val.size, primitive_type, hash_val); - } else { - hash_val = HashUtil::zlib_crc_hash_null(hash_val); + for (size_t j = 0; j < argument_size; ++j) { + const auto& column = argument_columns[j]; + auto primitive_type = argument_primitive_types[j]; + auto val = column->get_data_at(i); + if (val.data != nullptr) { + hash_val = RawValue::zlib_crc32(val.data, val.size, primitive_type, hash_val); + } else { + hash_val = HashUtil::zlib_crc_hash_null(hash_val); + } } res_data[i] = hash_val; } @@ -4974,5 +4988,6 @@ class FunctionCrc32Internal : public IFunction { return Status::OK(); } }; + #include "common/compile_check_avoid_end.h" } // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java index 1018299992e014..91d5c3c3b5df79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; import org.apache.doris.nereids.types.coercion.AnyDataType; import org.apache.doris.nereids.types.BigIntType; +import org.apache.doris.nereids.util.ExpressionUtils; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -38,14 +39,14 @@ public class Crc32Internal extends ScalarFunction implements UnaryExpression, ExplicitlyCastableSignature, PropagateNullable { public static final List SIGNATURES = ImmutableList.of( - FunctionSignature.ret(BigIntType.INSTANCE).args(AnyDataType.INSTANCE_WITHOUT_INDEX) + FunctionSignature.ret(BigIntType.INSTANCE).varArgs(AnyDataType.INSTANCE_WITHOUT_INDEX) ); /** - * constructor with 1 argument. + * constructor with 1 or more arguments. */ - public Crc32Internal(Expression arg) { - super("crc32_internal", arg); + public Crc32Internal(Expression arg, Expression... varArgs) { + super("crc32_internal", ExpressionUtils.mergeArguments(arg, varArgs)); } /** @@ -53,8 +54,9 @@ public Crc32Internal(Expression arg) { */ @Override public Crc32Internal withChildren(List children) { - Preconditions.checkArgument(children.size() == 1); - return new Crc32Internal(children.get(0)); + Preconditions.checkArgument(children.size() >= 1); + return new Crc32Internal(children.get(0), + children.subList(1, children.size()).toArray(new Expression[0])); } @Override From edf4df858d89469fe3e8ee7597bf7e0362361883 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 21 Jul 2025 17:21:49 +0800 Subject: [PATCH 04/21] tmp check --- .../test_crc32_internal.groovy | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 regression-test/suites/unique_with_mow_p0/test_crc32_internal.groovy diff --git a/regression-test/suites/unique_with_mow_p0/test_crc32_internal.groovy b/regression-test/suites/unique_with_mow_p0/test_crc32_internal.groovy new file mode 100644 index 00000000000000..f857a9e68e0210 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/test_crc32_internal.groovy @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_crc32_internal") { + + int dbNum = 0 + int tableNum = 0 + def excludedDbs = ["mysql", "information_schema", "__internal_schema"].toSet() + + def checkTable = { String db, String tblName -> + sql "use ${db};" + def showStmt = sql_return_maparray("show create table ${tblName}")[0]["Create Table"] + boolean hashBucket = showStmt.toUpperCase().contains("DISTRIBUTED BY HASH") + def matcher = showStmt =~ /(?i)DISTRIBUTED BY HASH\s*\((.*?)\)/ + if (!matcher.find()) { return } + def bucketColumns = matcher.group(1) + def tabletStats = sql_return_maparray """ show tablets from ${tblName}; """ + def tabletIdList = tabletStats.collect { it.TabletId }.toList() + def tabletIds = tabletIdList.toSet() + + def matcher2 = showStmt =~ /(?i)BUCKETS\s+(\d+)/ + if (!matcher2.find()) { return } + int bucketNum = matcher2.group(1).toInteger() + if (bucketNum == 0) { return } + logger.info("""===== Begin to check table: ${db}.${tblName}, hash bucket: ${hashBucket}, bucket num: ${bucketNum}, replica num: ${tabletStats.size()}, bucket columns: ${bucketColumns}""") + ++tableNum + int replicaNum = tabletIdList.stream().filter { it == tabletIdList[0] }.count() + (0..replicaNum-1).each { replica -> + sql "set use_fix_replica=${replica};" + tabletStats.each { it2 -> + def tabletId = it2.TabletId + def res = sql "select crc32_internal(${bucketColumns}) % ${bucketNum} from ${db}.${tblName} tablet(${tabletId}) group by crc32_internal(${bucketColumns}) % ${bucketNum};" + if (res.size() > 1) { + logger.info("===== check failed, table: ${db}.${tblName}, tabletId: ${tabletId}, replica=${replica}, res.size()=${res.size()}, res=${res}") + assert res.size() == 1 + } + } + sql "set use_fix_replica=-1;" + } + logger.info("""===== Finish to check table: ${db}.${tblName}, hash bucket: ${hashBucket}, bucket num: ${bucketNum}, replica num: ${tabletStats.size()}, bucket columns: ${bucketColumns}""") + } + + def checkDb = { String db -> + sql "use ${db};" + def tables = sql "show tables" + ++dbNum + tables.each { checkTable(db, it[0]) } + } + + def allDbs = sql "show databases" + allDbs.each { + def db = it[0] + if (!excludedDbs.contains(db)) { + checkDb(db) + } + } + logger.info("===== finish to check hash bucket tables, db num: ${dbNum}, table num: ${tableNum}") +} From c9e9c5ef0293941141a044fd81d22c84b7bcc542 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 21 Jul 2025 19:20:12 +0800 Subject: [PATCH 05/21] check before quite --- .../org/apache/doris/regression/RegressionTest.groovy | 11 +++++++++-- .../check_hash_bucket_table.groovy} | 4 +++- 2 files changed, 12 insertions(+), 3 deletions(-) rename regression-test/suites/{unique_with_mow_p0/test_crc32_internal.groovy => check_hash_bucket_table/check_hash_bucket_table.groovy} (97%) diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/RegressionTest.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/RegressionTest.groovy index 65def1c50f1109..700ff4add8b845 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/RegressionTest.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/RegressionTest.groovy @@ -313,12 +313,19 @@ class RegressionTest { if (!config.withOutLoadData) { log.info('Start to run load scripts') runScripts(config, recorder, directoryFilter, - { fileName -> fileName.substring(0, fileName.lastIndexOf(".")) == "load" }) + { fileName -> { + def name = fileName.substring(0, fileName.lastIndexOf(".")) + return name == "load" && name != "check_hash_bucket_table"} }) } log.info('Start to run scripts') runScripts(config, recorder, directoryFilter, - { fileName -> fileName.substring(0, fileName.lastIndexOf(".")) != "load" }) + { fileName -> { + def name = fileName.substring(0, fileName.lastIndexOf(".")) + return name != "load" && name != "check_hash_bucket_table"} }) + log.info('Start to run check hash bucket table scripts') + runScripts(config, recorder, directoryFilter, + { fileName -> fileName.substring(0, fileName.lastIndexOf(".")) == "check_hash_bucket_table" }) return recorder } diff --git a/regression-test/suites/unique_with_mow_p0/test_crc32_internal.groovy b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy similarity index 97% rename from regression-test/suites/unique_with_mow_p0/test_crc32_internal.groovy rename to regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy index f857a9e68e0210..7bb885429e48d1 100644 --- a/regression-test/suites/unique_with_mow_p0/test_crc32_internal.groovy +++ b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. -suite("test_crc32_internal") { +suite("check_hash_bucket_table") { int dbNum = 0 int tableNum = 0 def excludedDbs = ["mysql", "information_schema", "__internal_schema"].toSet() + logger.info("===== begin to check hash bucket tables") + def checkTable = { String db, String tblName -> sql "use ${db};" def showStmt = sql_return_maparray("show create table ${tblName}")[0]["Create Table"] From 606fb5930c9b201437518a0b6a433170e5164692 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 21 Jul 2025 19:46:59 +0800 Subject: [PATCH 06/21] fix checkstyle --- .../trees/expressions/functions/scalar/Crc32Internal.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java index 91d5c3c3b5df79..1becec05c8499c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java @@ -23,8 +23,8 @@ import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable; import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression; import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; -import org.apache.doris.nereids.types.coercion.AnyDataType; import org.apache.doris.nereids.types.BigIntType; +import org.apache.doris.nereids.types.coercion.AnyDataType; import org.apache.doris.nereids.util.ExpressionUtils; import com.google.common.base.Preconditions; From 5dfe4af228b6bed733427ebb96ed26c7e64f8c74 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 22 Jul 2025 14:47:54 +0800 Subject: [PATCH 07/21] skip view and bucket=1 tables --- .../check_hash_bucket_table/check_hash_bucket_table.groovy | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy index 7bb885429e48d1..815011d8086836 100644 --- a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy +++ b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy @@ -37,7 +37,7 @@ suite("check_hash_bucket_table") { def matcher2 = showStmt =~ /(?i)BUCKETS\s+(\d+)/ if (!matcher2.find()) { return } int bucketNum = matcher2.group(1).toInteger() - if (bucketNum == 0) { return } + if (bucketNum <= 1) { return } logger.info("""===== Begin to check table: ${db}.${tblName}, hash bucket: ${hashBucket}, bucket num: ${bucketNum}, replica num: ${tabletStats.size()}, bucket columns: ${bucketColumns}""") ++tableNum int replicaNum = tabletIdList.stream().filter { it == tabletIdList[0] }.count() @@ -58,9 +58,9 @@ suite("check_hash_bucket_table") { def checkDb = { String db -> sql "use ${db};" - def tables = sql "show tables" + def tables = sql("show full tables").stream().filter{ it[1] == "BASE TABLE" }.collect{ it[0] }.toList() ++dbNum - tables.each { checkTable(db, it[0]) } + tables.each { checkTable(db, it) } } def allDbs = sql "show databases" From 57cf6532f95f900ef09f6c31b42c97ab8c4a71aa Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 23 Jul 2025 16:16:20 +0800 Subject: [PATCH 08/21] fix null values --- be/src/vec/functions/function_string.h | 1 + .../trees/expressions/functions/scalar/Crc32Internal.java | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/be/src/vec/functions/function_string.h b/be/src/vec/functions/function_string.h index 85a705e7753b90..dc0628ccf1a9c4 100644 --- a/be/src/vec/functions/function_string.h +++ b/be/src/vec/functions/function_string.h @@ -4946,6 +4946,7 @@ class FunctionCrc32Internal : public IFunction { String get_name() const override { return name; } size_t get_number_of_arguments() const override { return 0; } bool is_variadic() const override { return true; } + bool use_default_implementation_for_nulls() const override { return false; } DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { return std::make_shared(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java index 1becec05c8499c..13eeae8bbca3b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java @@ -19,8 +19,8 @@ import org.apache.doris.catalog.FunctionSignature; import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable; import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; -import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable; import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression; import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; import org.apache.doris.nereids.types.BigIntType; @@ -36,7 +36,7 @@ * for debug only, compute crc32 hash value as the same way in `VOlapTablePartitionParam::find_tablets()` */ public class Crc32Internal extends ScalarFunction - implements UnaryExpression, ExplicitlyCastableSignature, PropagateNullable { + implements UnaryExpression, ExplicitlyCastableSignature, AlwaysNotNullable { public static final List SIGNATURES = ImmutableList.of( FunctionSignature.ret(BigIntType.INSTANCE).varArgs(AnyDataType.INSTANCE_WITHOUT_INDEX) From 720b602453d3e5849c0ad3ad82be0f4224101296 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 23 Jul 2025 19:37:36 +0800 Subject: [PATCH 09/21] ignore compute error --- .../check_hash_bucket_table.groovy | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy index 815011d8086836..db65e6d4e22358 100644 --- a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy +++ b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy @@ -21,7 +21,7 @@ suite("check_hash_bucket_table") { int tableNum = 0 def excludedDbs = ["mysql", "information_schema", "__internal_schema"].toSet() - logger.info("===== begin to check hash bucket tables") + logger.info("===== [check] begin to check hash bucket tables") def checkTable = { String db, String tblName -> sql "use ${db};" @@ -38,22 +38,28 @@ suite("check_hash_bucket_table") { if (!matcher2.find()) { return } int bucketNum = matcher2.group(1).toInteger() if (bucketNum <= 1) { return } - logger.info("""===== Begin to check table: ${db}.${tblName}, hash bucket: ${hashBucket}, bucket num: ${bucketNum}, replica num: ${tabletStats.size()}, bucket columns: ${bucketColumns}""") + logger.info("""===== [check] Begin to check table: ${db}.${tblName}, hash bucket: ${hashBucket}, bucket num: ${bucketNum}, replica num: ${tabletStats.size()}, bucket columns: ${bucketColumns}""") ++tableNum int replicaNum = tabletIdList.stream().filter { it == tabletIdList[0] }.count() (0..replicaNum-1).each { replica -> sql "set use_fix_replica=${replica};" tabletStats.each { it2 -> def tabletId = it2.TabletId - def res = sql "select crc32_internal(${bucketColumns}) % ${bucketNum} from ${db}.${tblName} tablet(${tabletId}) group by crc32_internal(${bucketColumns}) % ${bucketNum};" - if (res.size() > 1) { - logger.info("===== check failed, table: ${db}.${tblName}, tabletId: ${tabletId}, replica=${replica}, res.size()=${res.size()}, res=${res}") - assert res.size() == 1 + try { + def res = sql "select crc32_internal(${bucketColumns}) % ${bucketNum} from ${db}.${tblName} tablet(${tabletId}) group by crc32_internal(${bucketColumns}) % ${bucketNum};" + if (res.size() > 1) { + logger.info("===== [check] check failed, table: ${db}.${tblName}, tabletId: ${tabletId}, replica=${replica}, res.size()=${res.size()}, res=${res}") + assert res.size() == 1 + } + } catch (AssertionError e) { + throw e + } catch (Throwable e) { + logger.info("===== [check] catch exception, table: ${db}.${tblName}, tabletId: ${tabletId}, replica=${replica}, e=${e}") } } sql "set use_fix_replica=-1;" } - logger.info("""===== Finish to check table: ${db}.${tblName}, hash bucket: ${hashBucket}, bucket num: ${bucketNum}, replica num: ${tabletStats.size()}, bucket columns: ${bucketColumns}""") + logger.info("""===== [check] Finish to check table: ${db}.${tblName}, hash bucket: ${hashBucket}, bucket num: ${bucketNum}, replica num: ${tabletStats.size()}, bucket columns: ${bucketColumns}""") } def checkDb = { String db -> @@ -70,5 +76,5 @@ suite("check_hash_bucket_table") { checkDb(db) } } - logger.info("===== finish to check hash bucket tables, db num: ${dbNum}, table num: ${tableNum}") + logger.info("===== [check] finish to check hash bucket tables, db num: ${dbNum}, table num: ${tableNum}") } From dd88da8bafe562630009209611cf1f614ce4cacc Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 24 Jul 2025 11:47:48 +0800 Subject: [PATCH 10/21] add log --- be/src/exec/tablet_info.h | 18 ++++++++++++++++++ be/src/vec/functions/function_string.h | 8 ++++++++ 2 files changed, 26 insertions(+) diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 1dceb102c78823..92f00bfc720a5d 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -235,6 +235,7 @@ class VOlapTablePartitionParam { compute_function = [this](vectorized::Block* block, uint32_t row, const VOlapTablePartition& partition) -> uint32_t { uint32_t hash_val = 0; + std::vector msgs; for (unsigned short _distributed_slot_loc : _distributed_slot_locs) { auto* slot_desc = _slots[_distributed_slot_loc]; auto& column = block->get_by_position(_distributed_slot_loc).column; @@ -246,7 +247,24 @@ class VOlapTablePartitionParam { } else { hash_val = HashUtil::zlib_crc_hash_null(hash_val); } + auto type_in_block = block->get_by_position(_distributed_slot_loc).type; + msgs.emplace_back(fmt::format( + "[slot_name: {}, type: {}, type_primitive_type: {}, type_in_block: {}, " + "type_in_block_primitive_type: {}]", + slot_desc->col_name(), slot_desc->type()->get_name(), + static_cast(slot_desc->type()->get_primitive_type()), + type_in_block->get_name(), + static_cast(type_in_block->get_primitive_type()))); } + LOG_INFO("[verbose] find_tablets") + .tag("db_id", db_id()) + .tag("table_id", table_id()) + .tag("types", fmt::format("{}", fmt::join(msgs, ","))) + .tag("row", row) + .tag("hash_val", hash_val) + .tag("partition.num_buckets", partition.num_buckets) + .tag("ret", hash_val % partition.num_buckets) + .tag("row data", block->dump_one_line(row, block->columns())); return hash_val % partition.num_buckets; }; } else { // random distribution diff --git a/be/src/vec/functions/function_string.h b/be/src/vec/functions/function_string.h index dc0628ccf1a9c4..30b47b1b9f5654 100644 --- a/be/src/vec/functions/function_string.h +++ b/be/src/vec/functions/function_string.h @@ -4964,6 +4964,13 @@ class FunctionCrc32Internal : public IFunction { block.get_by_position(arguments[i]).column->convert_to_full_column_if_const(); argument_primitive_types[i] = block.get_by_position(arguments[i]).type->get_primitive_type(); + LOG_INFO("[verbose] crc32_internal") + .tag("rows", input_rows_count) + .tag("slot_name", block.get_by_position(arguments[i]).name) + .tag("type", block.get_by_position(arguments[i]).type->get_name()) + .tag("primitive_type", + static_cast( + block.get_by_position(arguments[i]).type->get_primitive_type())); } auto res_col = ColumnInt64::create(); @@ -4982,6 +4989,7 @@ class FunctionCrc32Internal : public IFunction { hash_val = HashUtil::zlib_crc_hash_null(hash_val); } } + LOG_INFO("[verbose] crc32_internal data").tag("row", i).tag("hash_val", hash_val); res_data[i] = hash_val; } From 25b81fae4e708e9446ef84f71b17829d2a02f0bb Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 24 Jul 2025 11:47:54 +0800 Subject: [PATCH 11/21] Revert "add log" This reverts commit 40282c6033dcef9769781bfbc9f8faa21c077727. --- be/src/exec/tablet_info.h | 18 ------------------ be/src/vec/functions/function_string.h | 8 -------- 2 files changed, 26 deletions(-) diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 92f00bfc720a5d..1dceb102c78823 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -235,7 +235,6 @@ class VOlapTablePartitionParam { compute_function = [this](vectorized::Block* block, uint32_t row, const VOlapTablePartition& partition) -> uint32_t { uint32_t hash_val = 0; - std::vector msgs; for (unsigned short _distributed_slot_loc : _distributed_slot_locs) { auto* slot_desc = _slots[_distributed_slot_loc]; auto& column = block->get_by_position(_distributed_slot_loc).column; @@ -247,24 +246,7 @@ class VOlapTablePartitionParam { } else { hash_val = HashUtil::zlib_crc_hash_null(hash_val); } - auto type_in_block = block->get_by_position(_distributed_slot_loc).type; - msgs.emplace_back(fmt::format( - "[slot_name: {}, type: {}, type_primitive_type: {}, type_in_block: {}, " - "type_in_block_primitive_type: {}]", - slot_desc->col_name(), slot_desc->type()->get_name(), - static_cast(slot_desc->type()->get_primitive_type()), - type_in_block->get_name(), - static_cast(type_in_block->get_primitive_type()))); } - LOG_INFO("[verbose] find_tablets") - .tag("db_id", db_id()) - .tag("table_id", table_id()) - .tag("types", fmt::format("{}", fmt::join(msgs, ","))) - .tag("row", row) - .tag("hash_val", hash_val) - .tag("partition.num_buckets", partition.num_buckets) - .tag("ret", hash_val % partition.num_buckets) - .tag("row data", block->dump_one_line(row, block->columns())); return hash_val % partition.num_buckets; }; } else { // random distribution diff --git a/be/src/vec/functions/function_string.h b/be/src/vec/functions/function_string.h index 30b47b1b9f5654..dc0628ccf1a9c4 100644 --- a/be/src/vec/functions/function_string.h +++ b/be/src/vec/functions/function_string.h @@ -4964,13 +4964,6 @@ class FunctionCrc32Internal : public IFunction { block.get_by_position(arguments[i]).column->convert_to_full_column_if_const(); argument_primitive_types[i] = block.get_by_position(arguments[i]).type->get_primitive_type(); - LOG_INFO("[verbose] crc32_internal") - .tag("rows", input_rows_count) - .tag("slot_name", block.get_by_position(arguments[i]).name) - .tag("type", block.get_by_position(arguments[i]).type->get_name()) - .tag("primitive_type", - static_cast( - block.get_by_position(arguments[i]).type->get_primitive_type())); } auto res_col = ColumnInt64::create(); @@ -4989,7 +4982,6 @@ class FunctionCrc32Internal : public IFunction { hash_val = HashUtil::zlib_crc_hash_null(hash_val); } } - LOG_INFO("[verbose] crc32_internal data").tag("row", i).tag("hash_val", hash_val); res_data[i] = hash_val; } From 82e26811c9eb6316602837bf164f98ebbff9fc69 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 24 Jul 2025 11:58:51 +0800 Subject: [PATCH 12/21] skip decimal --- .../check_hash_bucket_table.groovy | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy index db65e6d4e22358..6a2f9898191db8 100644 --- a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy +++ b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy @@ -38,6 +38,18 @@ suite("check_hash_bucket_table") { if (!matcher2.find()) { return } int bucketNum = matcher2.group(1).toInteger() if (bucketNum <= 1) { return } + + def bucketCols = bucketColumns.findAll(/`([^`]+)`/)*.replaceAll(/`/, '') + def columnsDetail = sql_return_maparray "desc ${tblName} all;" + boolean skip = false + for (def col: bucketCols) { + def colDetail = columnsDetail.find { it.Field == col } + if (colDetail.InternalType.toLowerCase().contains("decimal")) { + logger.info("===== [check] skip to check table: ${db}.${tblName} because bucket column: ${col} is ${colDetail.InternalType}") + return + } + } + logger.info("""===== [check] Begin to check table: ${db}.${tblName}, hash bucket: ${hashBucket}, bucket num: ${bucketNum}, replica num: ${tabletStats.size()}, bucket columns: ${bucketColumns}""") ++tableNum int replicaNum = tabletIdList.stream().filter { it == tabletIdList[0] }.count() From edddd73da153a5d1bace2d33eff449a651a366a1 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 24 Jul 2025 15:45:10 +0800 Subject: [PATCH 13/21] skip async mvs --- .../check_hash_bucket_table/check_hash_bucket_table.groovy | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy index 6a2f9898191db8..0f9d7bd817fdd3 100644 --- a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy +++ b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy @@ -78,7 +78,10 @@ suite("check_hash_bucket_table") { sql "use ${db};" def tables = sql("show full tables").stream().filter{ it[1] == "BASE TABLE" }.collect{ it[0] }.toList() ++dbNum - tables.each { checkTable(db, it) } + def asyncMVs = sql_return_maparray("""select * from mv_infos("database"="${db}");""").collect{ it.Name }.toSet() + tables.each { + if (!asyncMVs.contains(it)) { checkTable(db, it) } + } } def allDbs = sql "show databases" From 687e715fabdb727c4f1f17efa3cf32a21a5bde53 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 24 Jul 2025 19:43:50 +0800 Subject: [PATCH 14/21] fix --- .../check_hash_bucket_table.groovy | 77 +++++++++++-------- 1 file changed, 46 insertions(+), 31 deletions(-) diff --git a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy index 0f9d7bd817fdd3..655e28d4db89fe 100644 --- a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy +++ b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy @@ -19,77 +19,92 @@ suite("check_hash_bucket_table") { int dbNum = 0 int tableNum = 0 + int partitionNum = 0 def excludedDbs = ["mysql", "information_schema", "__internal_schema"].toSet() logger.info("===== [check] begin to check hash bucket tables") + def checkPartition = { String db, String tblName, def info -> + int bucketNum = info["Buckets"].toInteger() + if (bucketNum <= 1) { return false} - def checkTable = { String db, String tblName -> - sql "use ${db};" - def showStmt = sql_return_maparray("show create table ${tblName}")[0]["Create Table"] - boolean hashBucket = showStmt.toUpperCase().contains("DISTRIBUTED BY HASH") - def matcher = showStmt =~ /(?i)DISTRIBUTED BY HASH\s*\((.*?)\)/ - if (!matcher.find()) { return } - def bucketColumns = matcher.group(1) - def tabletStats = sql_return_maparray """ show tablets from ${tblName}; """ - def tabletIdList = tabletStats.collect { it.TabletId }.toList() - def tabletIds = tabletIdList.toSet() - - def matcher2 = showStmt =~ /(?i)BUCKETS\s+(\d+)/ - if (!matcher2.find()) { return } - int bucketNum = matcher2.group(1).toInteger() - if (bucketNum <= 1) { return } - - def bucketCols = bucketColumns.findAll(/`([^`]+)`/)*.replaceAll(/`/, '') + def bucketColumns = info["DistributionKey"] + if (bucketColumns == "RANDOM") {return false} def columnsDetail = sql_return_maparray "desc ${tblName} all;" - boolean skip = false + + def bucketCols = bucketColumns.split(",").collect { it.trim() } for (def col: bucketCols) { def colDetail = columnsDetail.find { it.Field == col } if (colDetail.InternalType.toLowerCase().contains("decimal")) { logger.info("===== [check] skip to check table: ${db}.${tblName} because bucket column: ${col} is ${colDetail.InternalType}") - return + return false } } - - logger.info("""===== [check] Begin to check table: ${db}.${tblName}, hash bucket: ${hashBucket}, bucket num: ${bucketNum}, replica num: ${tabletStats.size()}, bucket columns: ${bucketColumns}""") - ++tableNum + def partitionName = info["PartitionName"] + def tabletIdList = sql_return_maparray(""" show replica status from ${tblName} partition(${partitionName}); """).collect { it.TabletId }.toList() + def tabletIds = tabletIdList.toSet() int replicaNum = tabletIdList.stream().filter { it == tabletIdList[0] }.count() + logger.info("""===== [check] Begin to check partition: ${db}.${tblName}, partition name: ${partitionName}, bucket num: ${bucketNum}, replica num: ${replicaNum}, bucket columns: ${bucketColumns}""") (0..replicaNum-1).each { replica -> sql "set use_fix_replica=${replica};" - tabletStats.each { it2 -> - def tabletId = it2.TabletId + tabletIds.each { it2 -> + def tabletId = it2 try { def res = sql "select crc32_internal(${bucketColumns}) % ${bucketNum} from ${db}.${tblName} tablet(${tabletId}) group by crc32_internal(${bucketColumns}) % ${bucketNum};" if (res.size() > 1) { - logger.info("===== [check] check failed, table: ${db}.${tblName}, tabletId: ${tabletId}, replica=${replica}, res.size()=${res.size()}, res=${res}") + logger.info("""===== [check] check failed: ${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId}, bucket columns: ${bucketColumns}, res.size()=${res.size()}, res=${res}""") assert res.size() == 1 } } catch (AssertionError e) { throw e } catch (Throwable e) { - logger.info("===== [check] catch exception, table: ${db}.${tblName}, tabletId: ${tabletId}, replica=${replica}, e=${e}") + logger.info("===== [check] catch exception, table: ${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId}, e=${e}") } } sql "set use_fix_replica=-1;" } - logger.info("""===== [check] Finish to check table: ${db}.${tblName}, hash bucket: ${hashBucket}, bucket num: ${bucketNum}, replica num: ${tabletStats.size()}, bucket columns: ${bucketColumns}""") + logger.info("""===== [check] Finish to check table partition: ${db}.${tblName}, partitionName: ${partitionName}, replica num: ${replicaNum}, bucket num: ${bucketNum}, bucket columns: ${bucketColumns}""") + return true + } + + def checkTable = { String db, String tblName -> + sql "use ${db};" + def showStmt = sql_return_maparray("show create table ${tblName}")[0]["Create Table"] + def partitionInfo = sql_return_maparray """ show partitions from ${tblName}; """ + int checkedPartition = 0 + partitionInfo.each { + if (checkPartition(db, tblName, it)) { + ++checkedPartition + } + } + logger.info("""===== [check] Finish to check table: ${db}.${tblName}""") + partitionNum += checkedPartition + return checkedPartition > 0 } def checkDb = { String db -> sql "use ${db};" def tables = sql("show full tables").stream().filter{ it[1] == "BASE TABLE" }.collect{ it[0] }.toList() - ++dbNum def asyncMVs = sql_return_maparray("""select * from mv_infos("database"="${db}");""").collect{ it.Name }.toSet() + int checkedTable = 0 tables.each { - if (!asyncMVs.contains(it)) { checkTable(db, it) } + if (!asyncMVs.contains(it)) { + if (checkTable(db, it)) { + checkedTable++ + } + } } + tableNum += checkedTable + return checkedTable > 0 } def allDbs = sql "show databases" allDbs.each { def db = it[0] if (!excludedDbs.contains(db)) { - checkDb(db) + if (checkDb(db)) { + ++dbNum + } } } - logger.info("===== [check] finish to check hash bucket tables, db num: ${dbNum}, table num: ${tableNum}") + logger.info("===== [check] finish to check hash bucket tables, db num: ${dbNum}, table num: ${tableNum}, partition num: ${partitionNum}") } From 76e3bb5d1cded8a9fd413c40f517acc054b87114 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 25 Jul 2025 10:18:13 +0800 Subject: [PATCH 15/21] fix --- .../check_hash_bucket_table.groovy | 42 ++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy index 655e28d4db89fe..4a693a542bc8ac 100644 --- a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy +++ b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy @@ -40,29 +40,33 @@ suite("check_hash_bucket_table") { } } def partitionName = info["PartitionName"] - def tabletIdList = sql_return_maparray(""" show replica status from ${tblName} partition(${partitionName}); """).collect { it.TabletId }.toList() - def tabletIds = tabletIdList.toSet() - int replicaNum = tabletIdList.stream().filter { it == tabletIdList[0] }.count() - logger.info("""===== [check] Begin to check partition: ${db}.${tblName}, partition name: ${partitionName}, bucket num: ${bucketNum}, replica num: ${replicaNum}, bucket columns: ${bucketColumns}""") - (0..replicaNum-1).each { replica -> - sql "set use_fix_replica=${replica};" - tabletIds.each { it2 -> - def tabletId = it2 - try { - def res = sql "select crc32_internal(${bucketColumns}) % ${bucketNum} from ${db}.${tblName} tablet(${tabletId}) group by crc32_internal(${bucketColumns}) % ${bucketNum};" - if (res.size() > 1) { - logger.info("""===== [check] check failed: ${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId}, bucket columns: ${bucketColumns}, res.size()=${res.size()}, res=${res}""") - assert res.size() == 1 + try { + def tabletIdList = sql_return_maparray(""" show replica status from ${tblName} partition(${partitionName}); """).collect { it.TabletId }.toList() + def tabletIds = tabletIdList.toSet() + int replicaNum = tabletIdList.stream().filter { it == tabletIdList[0] }.count() + logger.info("""===== [check] Begin to check partition: ${db}.${tblName}, partition name: ${partitionName}, bucket num: ${bucketNum}, replica num: ${replicaNum}, bucket columns: ${bucketColumns}""") + (0..replicaNum-1).each { replica -> + sql "set use_fix_replica=${replica};" + tabletIds.each { it2 -> + def tabletId = it2 + try { + def res = sql "select crc32_internal(${bucketColumns}) % ${bucketNum} from ${db}.${tblName} tablet(${tabletId}) group by crc32_internal(${bucketColumns}) % ${bucketNum};" + if (res.size() > 1) { + logger.info("""===== [check] check failed: ${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId}, bucket columns: ${bucketColumns}, res.size()=${res.size()}, res=${res}""") + assert res.size() == 1 + } + } catch (AssertionError e) { + throw e + } catch (Throwable e) { + logger.info("===== [check] catch exception, table: ${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId}, e=${e}") } - } catch (AssertionError e) { - throw e - } catch (Throwable e) { - logger.info("===== [check] catch exception, table: ${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId}, e=${e}") } + sql "set use_fix_replica=-1;" } - sql "set use_fix_replica=-1;" + logger.info("""===== [check] Finish to check table partition: ${db}.${tblName}, partitionName: ${partitionName}, replica num: ${replicaNum}, bucket num: ${bucketNum}, bucket columns: ${bucketColumns}""") + } catch (Throwable e) { + logger.info("===== [check] catch exception, table: ${db}.${tblName}, partition name: ${partitionName}, e=${e}") } - logger.info("""===== [check] Finish to check table partition: ${db}.${tblName}, partitionName: ${partitionName}, replica num: ${replicaNum}, bucket num: ${bucketNum}, bucket columns: ${bucketColumns}""") return true } From 9a5a2d3b15d52a5aade58673e44e61a2922145d1 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 25 Jul 2025 14:43:15 +0800 Subject: [PATCH 16/21] fix --- .../check_hash_bucket_table.groovy | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy index 4a693a542bc8ac..5958bf5582f1e6 100644 --- a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy +++ b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy @@ -15,11 +15,18 @@ // specific language governing permissions and limitations // under the License. +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + suite("check_hash_bucket_table") { - int dbNum = 0 - int tableNum = 0 - int partitionNum = 0 + AtomicInteger dbNum = new AtomicInteger(0) + AtomicInteger tableNum = new AtomicInteger(0) + AtomicInteger partitionNum = new AtomicInteger(0) + def executor = Executors.newFixedThreadPool(30) + def futures = [] + def excludedDbs = ["mysql", "information_schema", "__internal_schema"].toSet() logger.info("===== [check] begin to check hash bucket tables") @@ -39,20 +46,22 @@ suite("check_hash_bucket_table") { return false } } + + def bucketColsStr = bucketCols.collect { "`${it}`" }.join(",") def partitionName = info["PartitionName"] try { def tabletIdList = sql_return_maparray(""" show replica status from ${tblName} partition(${partitionName}); """).collect { it.TabletId }.toList() def tabletIds = tabletIdList.toSet() int replicaNum = tabletIdList.stream().filter { it == tabletIdList[0] }.count() - logger.info("""===== [check] Begin to check partition: ${db}.${tblName}, partition name: ${partitionName}, bucket num: ${bucketNum}, replica num: ${replicaNum}, bucket columns: ${bucketColumns}""") + logger.info("""===== [check] Begin to check partition: ${db}.${tblName}, partition name: ${partitionName}, bucket num: ${bucketNum}, replica num: ${replicaNum}, bucket columns: ${bucketColsStr}""") (0..replicaNum-1).each { replica -> sql "set use_fix_replica=${replica};" tabletIds.each { it2 -> def tabletId = it2 try { - def res = sql "select crc32_internal(${bucketColumns}) % ${bucketNum} from ${db}.${tblName} tablet(${tabletId}) group by crc32_internal(${bucketColumns}) % ${bucketNum};" + def res = sql "select crc32_internal(${bucketColsStr}) % ${bucketNum} from ${db}.${tblName} tablet(${tabletId}) group by crc32_internal(${bucketColsStr}) % ${bucketNum};" if (res.size() > 1) { - logger.info("""===== [check] check failed: ${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId}, bucket columns: ${bucketColumns}, res.size()=${res.size()}, res=${res}""") + logger.info("""===== [check] check failed: ${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId}, bucket columns: ${bucketColsStr}, res.size()=${res.size()}, res=${res}""") assert res.size() == 1 } } catch (AssertionError e) { @@ -63,7 +72,7 @@ suite("check_hash_bucket_table") { } sql "set use_fix_replica=-1;" } - logger.info("""===== [check] Finish to check table partition: ${db}.${tblName}, partitionName: ${partitionName}, replica num: ${replicaNum}, bucket num: ${bucketNum}, bucket columns: ${bucketColumns}""") + logger.info("""===== [check] Finish to check table partition: ${db}.${tblName}, partitionName: ${partitionName}, replica num: ${replicaNum}, bucket num: ${bucketNum}, bucket columns: ${bucketColsStr}""") } catch (Throwable e) { logger.info("===== [check] catch exception, table: ${db}.${tblName}, partition name: ${partitionName}, e=${e}") } @@ -81,34 +90,36 @@ suite("check_hash_bucket_table") { } } logger.info("""===== [check] Finish to check table: ${db}.${tblName}""") - partitionNum += checkedPartition + partitionNum.addAndGet(checkedPartition) return checkedPartition > 0 } def checkDb = { String db -> sql "use ${db};" + dbNum.incrementAndGet() def tables = sql("show full tables").stream().filter{ it[1] == "BASE TABLE" }.collect{ it[0] }.toList() def asyncMVs = sql_return_maparray("""select * from mv_infos("database"="${db}");""").collect{ it.Name }.toSet() - int checkedTable = 0 tables.each { - if (!asyncMVs.contains(it)) { - if (checkTable(db, it)) { - checkedTable++ - } + def tblName = it + if (!asyncMVs.contains(tblName)) { + futures << executor.submit({ + if (checkTable(db, tblName)) { + tableNum.incrementAndGet() + } + }) } } - tableNum += checkedTable - return checkedTable > 0 } def allDbs = sql "show databases" allDbs.each { def db = it[0] if (!excludedDbs.contains(db)) { - if (checkDb(db)) { - ++dbNum - } + checkDb(db) } } + futures.each { it.get() } + executor.shutdown() + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES) logger.info("===== [check] finish to check hash bucket tables, db num: ${dbNum}, table num: ${tableNum}, partition num: ${partitionNum}") } From e18561878f1fc4bd196764ae410f5ebddbd9468e Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 25 Jul 2025 19:00:15 +0800 Subject: [PATCH 17/21] update --- .../check_hash_bucket_table/check_hash_bucket_table.groovy | 2 ++ 1 file changed, 2 insertions(+) diff --git a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy index 5958bf5582f1e6..e72150d2be174c 100644 --- a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy +++ b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy @@ -73,6 +73,8 @@ suite("check_hash_bucket_table") { sql "set use_fix_replica=-1;" } logger.info("""===== [check] Finish to check table partition: ${db}.${tblName}, partitionName: ${partitionName}, replica num: ${replicaNum}, bucket num: ${bucketNum}, bucket columns: ${bucketColsStr}""") + } catch (AssertionError e) { + throw e } catch (Throwable e) { logger.info("===== [check] catch exception, table: ${db}.${tblName}, partition name: ${partitionName}, e=${e}") } From eb43b116409a8999ba6d6d3eab73c33e8eb3d742 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 28 Jul 2025 11:11:19 +0800 Subject: [PATCH 18/21] disable promotion --- .../trees/expressions/functions/scalar/Crc32Internal.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java index 13eeae8bbca3b7..d42cfbeaa20868 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.FunctionSignature; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable; +import org.apache.doris.nereids.trees.expressions.functions.ComputePrecision; import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression; import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; @@ -36,7 +37,7 @@ * for debug only, compute crc32 hash value as the same way in `VOlapTablePartitionParam::find_tablets()` */ public class Crc32Internal extends ScalarFunction - implements UnaryExpression, ExplicitlyCastableSignature, AlwaysNotNullable { + implements UnaryExpression, ExplicitlyCastableSignature, AlwaysNotNullable, ComputePrecision { public static final List SIGNATURES = ImmutableList.of( FunctionSignature.ret(BigIntType.INSTANCE).varArgs(AnyDataType.INSTANCE_WITHOUT_INDEX) @@ -64,6 +65,11 @@ public List getSignatures() { return SIGNATURES; } + @Override + public FunctionSignature computePrecision(FunctionSignature signature) { + return signature; + } + @Override public R accept(ExpressionVisitor visitor, C context) { return visitor.visitCrc32Internal(this, context); From d6cfa4061e26cfbec8ca19da44e4ee2b5b86008f Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 28 Jul 2025 14:04:13 +0800 Subject: [PATCH 19/21] Reapply "add log" This reverts commit 6f6ac91eccdc68bec84d0480237e464da87214b3. --- be/src/exec/tablet_info.h | 18 ++++++++++++++++++ be/src/vec/functions/function_string.h | 8 ++++++++ 2 files changed, 26 insertions(+) diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 1dceb102c78823..92f00bfc720a5d 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -235,6 +235,7 @@ class VOlapTablePartitionParam { compute_function = [this](vectorized::Block* block, uint32_t row, const VOlapTablePartition& partition) -> uint32_t { uint32_t hash_val = 0; + std::vector msgs; for (unsigned short _distributed_slot_loc : _distributed_slot_locs) { auto* slot_desc = _slots[_distributed_slot_loc]; auto& column = block->get_by_position(_distributed_slot_loc).column; @@ -246,7 +247,24 @@ class VOlapTablePartitionParam { } else { hash_val = HashUtil::zlib_crc_hash_null(hash_val); } + auto type_in_block = block->get_by_position(_distributed_slot_loc).type; + msgs.emplace_back(fmt::format( + "[slot_name: {}, type: {}, type_primitive_type: {}, type_in_block: {}, " + "type_in_block_primitive_type: {}]", + slot_desc->col_name(), slot_desc->type()->get_name(), + static_cast(slot_desc->type()->get_primitive_type()), + type_in_block->get_name(), + static_cast(type_in_block->get_primitive_type()))); } + LOG_INFO("[verbose] find_tablets") + .tag("db_id", db_id()) + .tag("table_id", table_id()) + .tag("types", fmt::format("{}", fmt::join(msgs, ","))) + .tag("row", row) + .tag("hash_val", hash_val) + .tag("partition.num_buckets", partition.num_buckets) + .tag("ret", hash_val % partition.num_buckets) + .tag("row data", block->dump_one_line(row, block->columns())); return hash_val % partition.num_buckets; }; } else { // random distribution diff --git a/be/src/vec/functions/function_string.h b/be/src/vec/functions/function_string.h index dc0628ccf1a9c4..30b47b1b9f5654 100644 --- a/be/src/vec/functions/function_string.h +++ b/be/src/vec/functions/function_string.h @@ -4964,6 +4964,13 @@ class FunctionCrc32Internal : public IFunction { block.get_by_position(arguments[i]).column->convert_to_full_column_if_const(); argument_primitive_types[i] = block.get_by_position(arguments[i]).type->get_primitive_type(); + LOG_INFO("[verbose] crc32_internal") + .tag("rows", input_rows_count) + .tag("slot_name", block.get_by_position(arguments[i]).name) + .tag("type", block.get_by_position(arguments[i]).type->get_name()) + .tag("primitive_type", + static_cast( + block.get_by_position(arguments[i]).type->get_primitive_type())); } auto res_col = ColumnInt64::create(); @@ -4982,6 +4989,7 @@ class FunctionCrc32Internal : public IFunction { hash_val = HashUtil::zlib_crc_hash_null(hash_val); } } + LOG_INFO("[verbose] crc32_internal data").tag("row", i).tag("hash_val", hash_val); res_data[i] = hash_val; } From 477ab9a8a3a3e2599d1a40426cd4ad8d60625d6f Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 28 Jul 2025 14:13:22 +0800 Subject: [PATCH 20/21] don't skip decimalv3 --- .../check_hash_bucket_table.groovy | 9 --------- 1 file changed, 9 deletions(-) diff --git a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy index e72150d2be174c..e1b38057da32c6 100644 --- a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy +++ b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy @@ -37,16 +37,7 @@ suite("check_hash_bucket_table") { def bucketColumns = info["DistributionKey"] if (bucketColumns == "RANDOM") {return false} def columnsDetail = sql_return_maparray "desc ${tblName} all;" - def bucketCols = bucketColumns.split(",").collect { it.trim() } - for (def col: bucketCols) { - def colDetail = columnsDetail.find { it.Field == col } - if (colDetail.InternalType.toLowerCase().contains("decimal")) { - logger.info("===== [check] skip to check table: ${db}.${tblName} because bucket column: ${col} is ${colDetail.InternalType}") - return false - } - } - def bucketColsStr = bucketCols.collect { "`${it}`" }.join(",") def partitionName = info["PartitionName"] try { From 160a09c0baa63dfeab01217bf0a1cc92eea284b4 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 28 Jul 2025 14:13:35 +0800 Subject: [PATCH 21/21] Revert "Reapply "add log"" This reverts commit 8cfbba6bfa0be148464cd810dbb8047332a8aecf. --- be/src/exec/tablet_info.h | 18 ------------------ be/src/vec/functions/function_string.h | 8 -------- 2 files changed, 26 deletions(-) diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 92f00bfc720a5d..1dceb102c78823 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -235,7 +235,6 @@ class VOlapTablePartitionParam { compute_function = [this](vectorized::Block* block, uint32_t row, const VOlapTablePartition& partition) -> uint32_t { uint32_t hash_val = 0; - std::vector msgs; for (unsigned short _distributed_slot_loc : _distributed_slot_locs) { auto* slot_desc = _slots[_distributed_slot_loc]; auto& column = block->get_by_position(_distributed_slot_loc).column; @@ -247,24 +246,7 @@ class VOlapTablePartitionParam { } else { hash_val = HashUtil::zlib_crc_hash_null(hash_val); } - auto type_in_block = block->get_by_position(_distributed_slot_loc).type; - msgs.emplace_back(fmt::format( - "[slot_name: {}, type: {}, type_primitive_type: {}, type_in_block: {}, " - "type_in_block_primitive_type: {}]", - slot_desc->col_name(), slot_desc->type()->get_name(), - static_cast(slot_desc->type()->get_primitive_type()), - type_in_block->get_name(), - static_cast(type_in_block->get_primitive_type()))); } - LOG_INFO("[verbose] find_tablets") - .tag("db_id", db_id()) - .tag("table_id", table_id()) - .tag("types", fmt::format("{}", fmt::join(msgs, ","))) - .tag("row", row) - .tag("hash_val", hash_val) - .tag("partition.num_buckets", partition.num_buckets) - .tag("ret", hash_val % partition.num_buckets) - .tag("row data", block->dump_one_line(row, block->columns())); return hash_val % partition.num_buckets; }; } else { // random distribution diff --git a/be/src/vec/functions/function_string.h b/be/src/vec/functions/function_string.h index 30b47b1b9f5654..dc0628ccf1a9c4 100644 --- a/be/src/vec/functions/function_string.h +++ b/be/src/vec/functions/function_string.h @@ -4964,13 +4964,6 @@ class FunctionCrc32Internal : public IFunction { block.get_by_position(arguments[i]).column->convert_to_full_column_if_const(); argument_primitive_types[i] = block.get_by_position(arguments[i]).type->get_primitive_type(); - LOG_INFO("[verbose] crc32_internal") - .tag("rows", input_rows_count) - .tag("slot_name", block.get_by_position(arguments[i]).name) - .tag("type", block.get_by_position(arguments[i]).type->get_name()) - .tag("primitive_type", - static_cast( - block.get_by_position(arguments[i]).type->get_primitive_type())); } auto res_col = ColumnInt64::create(); @@ -4989,7 +4982,6 @@ class FunctionCrc32Internal : public IFunction { hash_val = HashUtil::zlib_crc_hash_null(hash_val); } } - LOG_INFO("[verbose] crc32_internal data").tag("row", i).tag("hash_val", hash_val); res_data[i] = hash_val; }