diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index c4a8dbb568a64a..5ef14743492b95 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -49,6 +49,7 @@ #include "olap/storage_engine.h" #include "olap/tablet_schema.h" #include "runtime/thread_context.h" +#include "util/debug_points.h" #include "util/slice.h" #include "util/time.h" #include "vec/columns/column.h" @@ -677,6 +678,8 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( Status BetaRowsetWriter::_check_segment_number_limit() { size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; + DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", + { total_segment_num = dp->param("segnum", 1024); }); if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) { return Status::Error( "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, " diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index a11363856035b6..eabf0e830d3a3b 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -58,6 +58,7 @@ #include "olap/storage_engine.h" #include "olap/tablet_schema.h" #include "runtime/thread_context.h" +#include "util/debug_points.h" #include "util/mem_info.h" #include "util/time.h" #include "vec/olap/vertical_block_reader.h" @@ -167,6 +168,7 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat } } + DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; }); if (raw_rows_read != sum_src_row) { return Status::Error( "segcompaction read row num does not match source. expect read row:{}, actual read " @@ -174,12 +176,15 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat sum_src_row, raw_rows_read); } + DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; }); if ((output_rows + merged_rows) != raw_rows_read) { return Status::Error( "segcompaction total row num does not match after merge. expect total row:{}, " "actual total row:{}, (output_rows:{},merged_rows:{})", raw_rows_read, output_rows + merged_rows, output_rows, merged_rows); } + DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_filtered_rows", + { filtered_rows++; }); if (filtered_rows != 0) { return Status::Error( "segcompaction should not have filtered rows but actual filtered rows:{}", diff --git a/regression-test/data/fault_injection_p0/fault_injection_p0/test_segcompaction_fault_injection.out b/regression-test/data/fault_injection_p0/fault_injection_p0/test_segcompaction_fault_injection.out new file mode 100644 index 00000000000000..e25bd2c3f92b47 --- /dev/null +++ b/regression-test/data/fault_injection_p0/fault_injection_p0/test_segcompaction_fault_injection.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- + +-- !select_default -- + +-- !select_default -- + diff --git a/regression-test/data/fault_injection_p0/fault_injection_p0/test_too_many_segments_fault_injection.out b/regression-test/data/fault_injection_p0/fault_injection_p0/test_too_many_segments_fault_injection.out new file mode 100644 index 00000000000000..afeab4c41d0905 --- /dev/null +++ b/regression-test/data/fault_injection_p0/fault_injection_p0/test_too_many_segments_fault_injection.out @@ -0,0 +1,3 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- + diff --git a/regression-test/data/segcompaction_p2/test_segcompaction_fault_injection.out b/regression-test/data/segcompaction_p2/test_segcompaction_fault_injection.out new file mode 100644 index 00000000000000..afeab4c41d0905 --- /dev/null +++ b/regression-test/data/segcompaction_p2/test_segcompaction_fault_injection.out @@ -0,0 +1,3 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 5065f0e3e390ac..11e41e9bcf8346 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -38,6 +38,7 @@ import org.apache.doris.regression.action.HttpCliAction import org.apache.doris.regression.util.JdbcUtils import org.apache.doris.regression.util.Hdfs import org.apache.doris.regression.util.SuiteUtils +import org.apache.doris.regression.util.DebugPoint import org.junit.jupiter.api.Assertions import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -69,12 +70,14 @@ class Suite implements GroovyInterceptable { final List lazyCheckFutures = new Vector<>() SuiteCluster cluster + DebugPoint debugPoint Suite(String name, String group, SuiteContext context) { this.name = name this.group = group this.context = context this.cluster = null + this.debugPoint = new DebugPoint(this) } String getConf(String key, String defaultValue = null) { @@ -476,7 +479,7 @@ class Suite implements GroovyInterceptable { String s3Url = "http://${s3BucketName}.${s3Endpoint}" return s3Url } - + void scpFiles(String username, String host, String files, String filePath, boolean fromDst=true) { String cmd = "scp -r ${username}@${host}:${files} ${filePath}" if (!fromDst) { @@ -487,7 +490,7 @@ class Suite implements GroovyInterceptable { def code = process.waitFor() Assert.assertEquals(0, code) } - + void sshExec(String username, String host, String cmd) { String command = "ssh ${username}@${host} '${cmd}'" def cmds = ["/bin/bash", "-c", command] @@ -499,7 +502,7 @@ class Suite implements GroovyInterceptable { assert errMsg.length() == 0: "error occurred!" + errMsg assert p.exitValue() == 0 } - + void getBackendIpHttpPort(Map backendId_to_backendIP, Map backendId_to_backendHttpPort) { List> backends = sql("show backends"); @@ -509,7 +512,7 @@ class Suite implements GroovyInterceptable { backendId_to_backendHttpPort.put(String.valueOf(backend[0]), String.valueOf(backend[4])); } return; - } + } int getTotalLine(String filePath) { def file = new File(filePath) @@ -693,14 +696,14 @@ class Suite implements GroovyInterceptable { String cleanedSqlStr = sql.replaceAll("\\s*;\\s*\$", "") sql = cleanedSqlStr } - quickRunTest(tag, sql, isOrder) + quickRunTest(tag, sql, isOrder) } void quickExecute(String tag, PreparedStatement stmt) { logger.info("Execute tag: ${tag}, sql: ${stmt}".toString()) - quickRunTest(tag, stmt) + quickRunTest(tag, stmt) } - + @Override Object invokeMethod(String name, Object args) { // qt: quick test @@ -761,5 +764,9 @@ class Suite implements GroovyInterceptable { // set server side prepared statement url return "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + database + "?&useServerPrepStmts=true" } + + DebugPoint GetDebugPoint() { + return debugPoint + } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index f6f5002365fb8a..29935caf25697c 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -18,6 +18,8 @@ package org.apache.doris.regression.suite import org.apache.doris.regression.Config import org.apache.doris.regression.util.Http +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType import com.google.common.collect.Maps import org.slf4j.Logger @@ -62,13 +64,6 @@ class ListHeader { } -enum NodeType { - - FE, - BE, - -} - class ServerNode { int index @@ -83,38 +78,23 @@ class ServerNode { node.alive = fields.get(header.indexOf('alive')) == 'true' } - String getHttpAddress() { - return 'http://' + host + ':' + httpPort + def getHttpAddress() { + return [host, httpPort] } void enableDebugPoint(String name, Map params = null) { - def url = getHttpAddress() + '/api/debug_point/add/' + name - if (params != null && params.size() > 0) { - url += '?' + params.collect((k, v) -> k + '=' + v).join('&') - } - def result = Http.http_post(url, null, true) - checkHttpResult(result) + def (host, port) = getHttpAddress() + DebugPoint.enableDebugPoint(host, port, getNodeType(), name, params) } void disableDebugPoint(String name) { - def url = getHttpAddress() + '/api/debug_point/remove/' + name - def result = Http.http_post(url, null, true) - checkHttpResult(result) + def (host, port) = getHttpAddress() + DebugPoint.disableDebugPoint(host, port, getNodeType(), name) } void clearDebugPoints() { - def url = getHttpAddress() + '/api/debug_point/clear' - def result = Http.http_post(url, null, true) - checkHttpResult(result) - } - - private void checkHttpResult(Object result) { - def type = getNodeType() - if (type == NodeType.FE) { - assert result.code == 0 : result.toString() - } else if (type == NodeType.BE) { - assert result.status == 'OK' : result.toString() - } + def (host, port) = getHttpAddress() + DebugPoint.clearDebugPoints(host, port, getNodeType()) } NodeType getNodeType() { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy new file mode 100644 index 00000000000000..c30f7fbbcbe016 --- /dev/null +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.regression.util +import org.apache.doris.regression.util.Http +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.suite.Suite + +enum NodeType { + FE, + BE, +} + +class DebugPoint { + Suite suite + + DebugPoint(Suite suite) { + this.suite = suite + } + + /* Enable debug point in regression + * Note: set BE config::enable_debug_points = true to take effect + * Parameters: + * host: hostname or ip of target node + * httpPort: http port of target node + * type: NodeType.BE or NodeType.FE + * name: debug point name + * params: timeout, execute, or other customized input params + */ + static def enableDebugPoint(String host, String httpPort, NodeType type, String name, Map params = null) { + def url = 'http://' + host + ':' + httpPort + '/api/debug_point/add/' + name + if (params != null && params.size() > 0) { + url += '?' + params.collect((k, v) -> k + '=' + v).join('&') + } + def result = Http.http_post(url, null, true) + checkHttpResult(result, type) + } + + /* Disable debug point in regression + * Parameters: + * host: hostname or ip of target node + * httpPort: http port of target node + * type: NodeType.BE or NodeType.FE + * name: debug point name + */ + static def disableDebugPoint(String host, String httpPort, NodeType type, String name) { + def url = 'http://' + host + ':' + httpPort + '/api/debug_point/remove/' + name + def result = Http.http_post(url, null, true) + checkHttpResult(result, type) + } + + /* Disable all debug points in regression + * Parameters: + * host: hostname or ip of target node + * httpPort: http port of target node + * type: NodeType.BE or NodeType.FE + */ + static def clearDebugPoints(String host, String httpPort, NodeType type) { + def url = 'http://' + host + ':' + httpPort + '/api/debug_point/clear' + def result = Http.http_post(url, null, true) + checkHttpResult(result, type) + } + + def operateDebugPointForAllBEs(Closure closure) { + def ipList = [:] + def portList = [:] + (ipList, portList) = getBEHostAndHTTPPort() + ipList.each { beid, ip -> + closure.call(ip, portList[beid]) + } + } + + /* Enable specific debug point for all BE node in cluster */ + def enableDebugPointForAllBEs(String name, Map params = null) { + operateDebugPointForAllBEs({ host, port -> + println "enable debug point $name for BE $host:$port" + enableDebugPoint(host, port, NodeType.BE, name, params) + }) + } + + /* Disable specific debug point for all BE node in cluster */ + def disableDebugPointForAllBEs(String name) { + operateDebugPointForAllBEs { host, port -> + disableDebugPoint(host, port, NodeType.BE, name) + } + } + + /* Disable all debug points for all BE node in cluster */ + def clearDebugPointsForAllBEs() { + operateDebugPointForAllBEs { host, port -> + clearDebugPoints(host, port, NodeType.BE) + } + } + + def getBEHostAndHTTPPort() { + def ipList = [:] + def portList = [:] + suite.getBackendIpHttpPort(ipList, portList) + return [ipList, portList] + } + + def getFEHostAndHTTPPort() { + assert false : 'not implemented yet' + } + + def enableDebugPointForAllFEs(String name, Map params = null) { + assert false : 'not implemented yet' + } + + def disableDebugPointForAllFEs(String name) { + assert false : 'not implemented yet' + } + + def clearDebugPointsForAllFEs() { + assert false : 'not implemented yet' + } + + static void checkHttpResult(Object result, NodeType type) { + if (type == NodeType.FE) { + assert result.code == 0 : result.toString() + } else if (type == NodeType.BE) { + assert result.status == 'OK' : result.toString() + } + } +} + diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index f022ba2dec8373..df9b08157f409a 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -20,7 +20,7 @@ PPROF_TMPDIR="$DORIS_HOME/log/" # INFO, WARNING, ERROR, FATAL sys_log_level = INFO -# ports for admin, web, heartbeat service +# ports for admin, web, heartbeat service be_port = 9161 webserver_port = 8141 heartbeat_service_port = 9151 @@ -36,7 +36,7 @@ buffer_pool_limit = 2% storage_page_cache_limit = 0% disable_storage_page_cache = true chunk_reserved_bytes_limit = 134217728 -# Choose one if there are more than one ip except loopback address. +# Choose one if there are more than one ip except loopback address. # Note that there should at most one ip match this list. # If no ip match this rule, will choose one randomly. # use CIDR format, e.g. 10.10.10.0/24 @@ -48,7 +48,7 @@ chunk_reserved_bytes_limit = 134217728 # you can add capacity limit at the end of each root path, seperate by ',' # eg: # /home/disk2/doris, capacity limit is disk capacity, HDD(default) -# +# # you also can specify the properties by setting ':', seperate by ',' # property 'medium' has a higher priority than the extension of path # @@ -61,7 +61,7 @@ chunk_reserved_bytes_limit = 134217728 # sys_log_verbose_modules = * log_buffer_level = -1 enable_stream_load_record = true -# palo_cgroups +# palo_cgroups #storage_root_path=/mnt/hdd01/doris.SSD/NON_VEC_RELEASE;/mnt/hdd01/doris.HDD/NON_VEC_RELEASE;/mnt/hdd02/doris.SSD/NON_VEC_RELEASE;/mnt/hdd02/doris.HDD/NON_VEC_RELEASE;/mnt/hdd03/doris.SSD/NON_VEC_RELEASE;/mnt/hdd03/doris.HDD/NON_VEC_RELEASE;/mnt/hdd04/doris.SSD/NON_VEC_RELEASE;/mnt/hdd04/doris.HDD/NON_VEC_RELEASE;/mnt/hdd05/doris.SSD/NON_VEC_RELEASE;/mnt/hdd05/doris.HDD/NON_VEC_RELEASE;/mnt/hdd06/doris.SSD/NON_VEC_RELEASE;/mnt/hdd06/doris.HDD/NON_VEC_RELEASE; storage_root_path=/mnt/ssd01/cluster_storage/doris.SSD/P0/cluster1 @@ -75,3 +75,4 @@ enable_set_in_bitmap_value=true enable_feature_binlog=true max_sys_mem_available_low_water_mark_bytes=69206016 user_files_secure_path=/ +enable_debug_points=true diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy b/regression-test/pipeline/p0/conf/regression-conf.groovy index fecda4db1fb7d0..e0e7be6ed4ccf0 100644 --- a/regression-test/pipeline/p0/conf/regression-conf.groovy +++ b/regression-test/pipeline/p0/conf/regression-conf.groovy @@ -58,7 +58,7 @@ excludeGroups = "" excludeSuites = "test_sql_block_rule,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter" // this directories will not be executed -excludeDirectories = "workload_manager_p1" +excludeDirectories = "workload_manager_p1,fault_injection_p0" customConf1 = "test_custom_conf_value" diff --git a/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy new file mode 100644 index 00000000000000..2f601f13d11959 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +def tableName = "segcompaction_correctness_test" +def create_table_sql = """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20), + `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20), + `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20), + `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20), + `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20), + `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20), + `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20), + `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20), + `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20), + `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20) + ) + DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1 + PROPERTIES ( "replication_num" = "1" ); + """ +def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49" + +suite("test_segcompaction_correctness") { + def runLoadWithSegcompaction = { + String ak = getS3AK() + String sk = getS3SK() + String endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = getS3BucketName() + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + String backend_id; + try { + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + boolean disableAutoCompaction = true + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List) ele)[2]) + } + } + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql "${create_table_sql}" + + def uuid = UUID.randomUUID().toString().replace("-", "0") + String columns_str = ("$columns" != "") ? "($columns)" : ""; + + sql """ + LOAD LABEL $uuid ( + DATA INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc") + INTO TABLE ${tableName} + FORMAT AS "ORC" + $columns_str + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "$endpoint", + "AWS_REGION" = "$region" + ) + properties( + "use_new_load_scan_node" = "true" + ) + """ + + def max_try_milli_secs = 3600000 + String [][] result = '' + while (max_try_milli_secs > 0) { + result = sql """ show load where label="$uuid" order by createtime desc limit 1; """ + if (result[0][2].equals("FINISHED")) { + logger.info("Load FINISHED " + " $uuid") + break; + } + if (result[0][2].equals("CANCELLED")) { + logger.info("Load CANCELLED " + " $uuid") + break; + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertTrue(1 == 2, "load Timeout: $uuid") + } + } + assertTrue(result[0][2].equals("CANCELLED")) + + result = sql """ show load where label="$uuid" order by createtime desc limit 1; """ + qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47 order by col_1, col_2; """ + tablets = sql """ show tablets from ${tableName}; """ + } finally { + try_sql("DROP TABLE IF EXISTS ${tableName}") + } + } + + // wrong_sum_src_row + try { + GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_sum_src_row") + runLoadWithSegcompaction() + } finally { + GetDebugPoint().disableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_sum_src_row") + } + + // wrong_merged_rows + try { + GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_merged_rows") + runLoadWithSegcompaction() + } finally { + GetDebugPoint().disableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_merged_rows") + } + + // wrong_filtered_rows + try { + GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_filtered_rows") + runLoadWithSegcompaction() + } finally { + GetDebugPoint().disableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_filtered_rows") + } +} + diff --git a/regression-test/suites/fault_injection_p0/test_too_many_segments_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_too_many_segments_fault_injection.groovy new file mode 100644 index 00000000000000..b68e324ee98ea3 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_too_many_segments_fault_injection.groovy @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +def tableName = "too_many_segments_test" +def create_table_sql = """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20), + `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20), + `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20), + `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20), + `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20), + `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20), + `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20), + `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20), + `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20), + `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20) + ) + DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1 + PROPERTIES ( "replication_num" = "1" ); + """ +def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49" + +suite("test_too_many_segments") { // the epic -238 case + def runLoadWithTooManySegments = { + String ak = getS3AK() + String sk = getS3SK() + String endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = getS3BucketName() + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + String backend_id; + try { + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + boolean disableAutoCompaction = true + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List) ele)[2]) + } + } + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql "${create_table_sql}" + + def uuid = UUID.randomUUID().toString().replace("-", "0") + String columns_str = ("$columns" != "") ? "($columns)" : ""; + + sql """ + LOAD LABEL $uuid ( + DATA INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc") + INTO TABLE ${tableName} + FORMAT AS "ORC" + $columns_str + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "$endpoint", + "AWS_REGION" = "$region" + ) + properties( + "use_new_load_scan_node" = "true" + ) + """ + + Thread.sleep(2000) + GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments", + ["segnum":2000]) + + def max_try_milli_secs = 3600000 + String [][] result = '' + while (max_try_milli_secs > 0) { + result = sql """ show load where label="$uuid" order by createtime desc limit 1; """ + if (result[0][2].equals("FINISHED")) { + logger.info("Load FINISHED " + " $uuid") + break; + } + if (result[0][2].equals("CANCELLED")) { + logger.info("Load CANCELLED " + " $uuid") + break; + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertTrue(1 == 2, "load Timeout: $uuid") + } + } + assertTrue(result[0][7].contains("-238")) // EPIC! + + result = sql """ show load where label="$uuid" order by createtime desc limit 1; """ + qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47 order by col_1, col_2; """ + tablets = sql """ show tablets from ${tableName}; """ + } finally { + try_sql("DROP TABLE IF EXISTS ${tableName}") + GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments") + } + } + runLoadWithTooManySegments() +} diff --git a/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy b/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy index 81fd2f2661fc82..8c3efa5aa591b7 100644 --- a/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy +++ b/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy @@ -16,7 +16,7 @@ // under the License. import org.apache.doris.regression.suite.ClusterOptions -import org.apache.doris.regression.suite.NodeType +import org.apache.doris.regression.util.NodeType import org.apache.doris.regression.suite.SuiteCluster class InjectCase {