diff --git a/be/src/vec/functions/function_string.cpp b/be/src/vec/functions/function_string.cpp index 97d8079985d12b..d1041b7b0adb40 100644 --- a/be/src/vec/functions/function_string.cpp +++ b/be/src/vec/functions/function_string.cpp @@ -1381,6 +1381,7 @@ void register_function_string(SimpleFunctionFactory& factory) { factory.register_function(); factory.register_function(); factory.register_function(); + factory.register_function(); factory.register_alias(FunctionLeft::name, "strleft"); factory.register_alias(FunctionRight::name, "strright"); diff --git a/be/src/vec/functions/function_string.h b/be/src/vec/functions/function_string.h index 97c950d97b71be..2e59dce22effd0 100644 --- a/be/src/vec/functions/function_string.h +++ b/be/src/vec/functions/function_string.h @@ -49,6 +49,9 @@ #include "gutil/strings/numbers.h" #include "gutil/strings/substitute.h" #include "runtime/decimalv2_value.h" +#include "runtime/define_primitive_type.h" +#include "runtime/primitive_type.h" +#include "runtime/raw_value.h" #include "runtime/string_search.hpp" #include "util/sha.h" #include "util/string_util.h" @@ -5021,4 +5024,56 @@ class FunctionXPathString : public IFunction { } }; +// ATTN: for debug only +// compute crc32 hash value as the same way in `VOlapTablePartitionParam::find_tablets()` +class FunctionCrc32Internal : public IFunction { +public: + static constexpr auto name = "crc32_internal"; + static FunctionPtr create() { return std::make_shared(); } + String get_name() const override { return name; } + size_t get_number_of_arguments() const override { return 0; } + bool is_variadic() const override { return true; } + bool use_default_implementation_for_nulls() const override { return false; } + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return std::make_shared(); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t input_rows_count) const override { + DCHECK_GE(arguments.size(), 1); + + auto argument_size = arguments.size(); + std::vector argument_columns(argument_size); + std::vector argument_primitive_types(argument_size); + + for (size_t i = 0; i < argument_size; ++i) { + argument_columns[i] = + block.get_by_position(arguments[i]).column->convert_to_full_column_if_const(); + argument_primitive_types[i] = + block.get_by_position(arguments[i]).type->get_type_as_type_descriptor().type; + } + + auto res_col = ColumnInt64::create(); + auto& res_data = res_col->get_data(); + res_data.resize_fill(input_rows_count, 0); + + for (size_t i = 0; i < input_rows_count; ++i) { + uint32_t hash_val = 0; + for (size_t j = 0; j < argument_size; ++j) { + const auto& column = argument_columns[j]; + auto primitive_type = argument_primitive_types[j]; + auto val = column->get_data_at(i); + if (val.data != nullptr) { + hash_val = RawValue::zlib_crc32(val.data, val.size, primitive_type, hash_val); + } else { + hash_val = HashUtil::zlib_crc_hash_null(hash_val); + } + } + res_data[i] = hash_val; + } + + block.replace_by_position(result, std::move(res_col)); + return Status::OK(); + } +}; } // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java index ea49acefd67502..579f3148dd5ae5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java @@ -129,6 +129,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.Cot; import org.apache.doris.nereids.trees.expressions.functions.scalar.CountEqual; import org.apache.doris.nereids.trees.expressions.functions.scalar.Crc32; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Crc32Internal; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateMap; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateNamedStruct; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateStruct; @@ -771,6 +772,7 @@ public class BuiltinScalarFunctions implements FunctionHelper { scalar(Left.class, "left", "strleft"), scalar(Length.class, "length"), scalar(Crc32.class, "crc32"), + scalar(Crc32Internal.class, "crc32_internal"), scalar(Like.class, "like"), scalar(Ln.class, "ln", "dlog1"), scalar(Locate.class, "locate"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java new file mode 100644 index 00000000000000..d42cfbeaa20868 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32Internal.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.scalar; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable; +import org.apache.doris.nereids.trees.expressions.functions.ComputePrecision; +import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; +import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.BigIntType; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.nereids.util.ExpressionUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * for debug only, compute crc32 hash value as the same way in `VOlapTablePartitionParam::find_tablets()` + */ +public class Crc32Internal extends ScalarFunction + implements UnaryExpression, ExplicitlyCastableSignature, AlwaysNotNullable, ComputePrecision { + + public static final List SIGNATURES = ImmutableList.of( + FunctionSignature.ret(BigIntType.INSTANCE).varArgs(AnyDataType.INSTANCE_WITHOUT_INDEX) + ); + + /** + * constructor with 1 or more arguments. + */ + public Crc32Internal(Expression arg, Expression... varArgs) { + super("crc32_internal", ExpressionUtils.mergeArguments(arg, varArgs)); + } + + /** + * withChildren. + */ + @Override + public Crc32Internal withChildren(List children) { + Preconditions.checkArgument(children.size() >= 1); + return new Crc32Internal(children.get(0), + children.subList(1, children.size()).toArray(new Expression[0])); + } + + @Override + public List getSignatures() { + return SIGNATURES; + } + + @Override + public FunctionSignature computePrecision(FunctionSignature signature) { + return signature; + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitCrc32Internal(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java index 0c49e99362f25f..c1725cd1658211 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java @@ -137,6 +137,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.Cot; import org.apache.doris.nereids.trees.expressions.functions.scalar.CountEqual; import org.apache.doris.nereids.trees.expressions.functions.scalar.Crc32; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Crc32Internal; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateMap; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateNamedStruct; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateStruct; @@ -1546,6 +1547,10 @@ default R visitCrc32(Crc32 crc32, C context) { return visitScalarFunction(crc32, context); } + default R visitCrc32Internal(Crc32Internal crc32Internal, C context) { + return visitScalarFunction(crc32Internal, context); + } + default R visitLike(Like like, C context) { return visitStringRegexPredicate(like, context); } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/RegressionTest.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/RegressionTest.groovy index 65def1c50f1109..700ff4add8b845 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/RegressionTest.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/RegressionTest.groovy @@ -313,12 +313,19 @@ class RegressionTest { if (!config.withOutLoadData) { log.info('Start to run load scripts') runScripts(config, recorder, directoryFilter, - { fileName -> fileName.substring(0, fileName.lastIndexOf(".")) == "load" }) + { fileName -> { + def name = fileName.substring(0, fileName.lastIndexOf(".")) + return name == "load" && name != "check_hash_bucket_table"} }) } log.info('Start to run scripts') runScripts(config, recorder, directoryFilter, - { fileName -> fileName.substring(0, fileName.lastIndexOf(".")) != "load" }) + { fileName -> { + def name = fileName.substring(0, fileName.lastIndexOf(".")) + return name != "load" && name != "check_hash_bucket_table"} }) + log.info('Start to run check hash bucket table scripts') + runScripts(config, recorder, directoryFilter, + { fileName -> fileName.substring(0, fileName.lastIndexOf(".")) == "check_hash_bucket_table" }) return recorder } diff --git a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy new file mode 100644 index 00000000000000..e1b38057da32c6 --- /dev/null +++ b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +suite("check_hash_bucket_table") { + + AtomicInteger dbNum = new AtomicInteger(0) + AtomicInteger tableNum = new AtomicInteger(0) + AtomicInteger partitionNum = new AtomicInteger(0) + def executor = Executors.newFixedThreadPool(30) + def futures = [] + + def excludedDbs = ["mysql", "information_schema", "__internal_schema"].toSet() + + logger.info("===== [check] begin to check hash bucket tables") + def checkPartition = { String db, String tblName, def info -> + int bucketNum = info["Buckets"].toInteger() + if (bucketNum <= 1) { return false} + + def bucketColumns = info["DistributionKey"] + if (bucketColumns == "RANDOM") {return false} + def columnsDetail = sql_return_maparray "desc ${tblName} all;" + def bucketCols = bucketColumns.split(",").collect { it.trim() } + def bucketColsStr = bucketCols.collect { "`${it}`" }.join(",") + def partitionName = info["PartitionName"] + try { + def tabletIdList = sql_return_maparray(""" show replica status from ${tblName} partition(${partitionName}); """).collect { it.TabletId }.toList() + def tabletIds = tabletIdList.toSet() + int replicaNum = tabletIdList.stream().filter { it == tabletIdList[0] }.count() + logger.info("""===== [check] Begin to check partition: ${db}.${tblName}, partition name: ${partitionName}, bucket num: ${bucketNum}, replica num: ${replicaNum}, bucket columns: ${bucketColsStr}""") + (0..replicaNum-1).each { replica -> + sql "set use_fix_replica=${replica};" + tabletIds.each { it2 -> + def tabletId = it2 + try { + def res = sql "select crc32_internal(${bucketColsStr}) % ${bucketNum} from ${db}.${tblName} tablet(${tabletId}) group by crc32_internal(${bucketColsStr}) % ${bucketNum};" + if (res.size() > 1) { + logger.info("""===== [check] check failed: ${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId}, bucket columns: ${bucketColsStr}, res.size()=${res.size()}, res=${res}""") + assert res.size() == 1 + } + } catch (AssertionError e) { + throw e + } catch (Throwable e) { + logger.info("===== [check] catch exception, table: ${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId}, e=${e}") + } + } + sql "set use_fix_replica=-1;" + } + logger.info("""===== [check] Finish to check table partition: ${db}.${tblName}, partitionName: ${partitionName}, replica num: ${replicaNum}, bucket num: ${bucketNum}, bucket columns: ${bucketColsStr}""") + } catch (AssertionError e) { + throw e + } catch (Throwable e) { + logger.info("===== [check] catch exception, table: ${db}.${tblName}, partition name: ${partitionName}, e=${e}") + } + return true + } + + def checkTable = { String db, String tblName -> + sql "use ${db};" + def showStmt = sql_return_maparray("show create table ${tblName}")[0]["Create Table"] + def partitionInfo = sql_return_maparray """ show partitions from ${tblName}; """ + int checkedPartition = 0 + partitionInfo.each { + if (checkPartition(db, tblName, it)) { + ++checkedPartition + } + } + logger.info("""===== [check] Finish to check table: ${db}.${tblName}""") + partitionNum.addAndGet(checkedPartition) + return checkedPartition > 0 + } + + def checkDb = { String db -> + sql "use ${db};" + dbNum.incrementAndGet() + def tables = sql("show full tables").stream().filter{ it[1] == "BASE TABLE" }.collect{ it[0] }.toList() + def asyncMVs = sql_return_maparray("""select * from mv_infos("database"="${db}");""").collect{ it.Name }.toSet() + tables.each { + def tblName = it + if (!asyncMVs.contains(tblName)) { + futures << executor.submit({ + if (checkTable(db, tblName)) { + tableNum.incrementAndGet() + } + }) + } + } + } + + def allDbs = sql "show databases" + allDbs.each { + def db = it[0] + if (!excludedDbs.contains(db)) { + checkDb(db) + } + } + futures.each { it.get() } + executor.shutdown() + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES) + logger.info("===== [check] finish to check hash bucket tables, db num: ${dbNum}, table num: ${tableNum}, partition num: ${partitionNum}") +}