Skip to content

Commit

Permalink
[enhancement](regression) fault injection for segcompaction test (apa…
Browse files Browse the repository at this point in the history
…che#25709)

1. generalized debug point facilities from docker suites for
   fault-injection/stubbing cases
2. add segcompaction fault-injection cases for demonstration
3. add -238 TOO_MANY_SEGMENTS fault-injection case for good
  • Loading branch information
freemandealer authored and wsjz committed Nov 19, 2023
1 parent 93d50a2 commit 808d74a
Show file tree
Hide file tree
Showing 13 changed files with 458 additions and 43 deletions.
3 changes: 3 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/thread_context.h"
#include "util/debug_points.h"
#include "util/slice.h"
#include "util/time.h"
#include "vec/columns/column.h"
Expand Down Expand Up @@ -677,6 +678,8 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(

Status BetaRowsetWriter::_check_segment_number_limit() {
size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
{ total_segment_num = dp->param("segnum", 1024); });
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
"too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, "
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/thread_context.h"
#include "util/debug_points.h"
#include "util/mem_info.h"
#include "util/time.h"
#include "vec/olap/vertical_block_reader.h"
Expand Down Expand Up @@ -167,19 +168,23 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
}
}

DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; });
if (raw_rows_read != sum_src_row) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction read row num does not match source. expect read row:{}, actual read "
"row:{}",
sum_src_row, raw_rows_read);
}

DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; });
if ((output_rows + merged_rows) != raw_rows_read) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction total row num does not match after merge. expect total row:{}, "
"actual total row:{}, (output_rows:{},merged_rows:{})",
raw_rows_read, output_rows + merged_rows, output_rows, merged_rows);
}
DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_filtered_rows",
{ filtered_rows++; });
if (filtered_rows != 0) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction should not have filtered rows but actual filtered rows:{}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --

-- !select_default --

-- !select_default --

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.doris.regression.action.HttpCliAction
import org.apache.doris.regression.util.JdbcUtils
import org.apache.doris.regression.util.Hdfs
import org.apache.doris.regression.util.SuiteUtils
import org.apache.doris.regression.util.DebugPoint
import org.junit.jupiter.api.Assertions
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -69,12 +70,14 @@ class Suite implements GroovyInterceptable {
final List<Future> lazyCheckFutures = new Vector<>()

SuiteCluster cluster
DebugPoint debugPoint

Suite(String name, String group, SuiteContext context) {
this.name = name
this.group = group
this.context = context
this.cluster = null
this.debugPoint = new DebugPoint(this)
}

String getConf(String key, String defaultValue = null) {
Expand Down Expand Up @@ -476,7 +479,7 @@ class Suite implements GroovyInterceptable {
String s3Url = "http://${s3BucketName}.${s3Endpoint}"
return s3Url
}

void scpFiles(String username, String host, String files, String filePath, boolean fromDst=true) {
String cmd = "scp -r ${username}@${host}:${files} ${filePath}"
if (!fromDst) {
Expand All @@ -487,7 +490,7 @@ class Suite implements GroovyInterceptable {
def code = process.waitFor()
Assert.assertEquals(0, code)
}

void sshExec(String username, String host, String cmd) {
String command = "ssh ${username}@${host} '${cmd}'"
def cmds = ["/bin/bash", "-c", command]
Expand All @@ -499,7 +502,7 @@ class Suite implements GroovyInterceptable {
assert errMsg.length() == 0: "error occurred!" + errMsg
assert p.exitValue() == 0
}


void getBackendIpHttpPort(Map<String, String> backendId_to_backendIP, Map<String, String> backendId_to_backendHttpPort) {
List<List<Object>> backends = sql("show backends");
Expand All @@ -509,7 +512,7 @@ class Suite implements GroovyInterceptable {
backendId_to_backendHttpPort.put(String.valueOf(backend[0]), String.valueOf(backend[4]));
}
return;
}
}

int getTotalLine(String filePath) {
def file = new File(filePath)
Expand Down Expand Up @@ -693,14 +696,14 @@ class Suite implements GroovyInterceptable {
String cleanedSqlStr = sql.replaceAll("\\s*;\\s*\$", "")
sql = cleanedSqlStr
}
quickRunTest(tag, sql, isOrder)
quickRunTest(tag, sql, isOrder)
}

void quickExecute(String tag, PreparedStatement stmt) {
logger.info("Execute tag: ${tag}, sql: ${stmt}".toString())
quickRunTest(tag, stmt)
quickRunTest(tag, stmt)
}

@Override
Object invokeMethod(String name, Object args) {
// qt: quick test
Expand Down Expand Up @@ -761,5 +764,9 @@ class Suite implements GroovyInterceptable {
// set server side prepared statement url
return "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + database + "?&useServerPrepStmts=true"
}

DebugPoint GetDebugPoint() {
return debugPoint
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.doris.regression.suite

import org.apache.doris.regression.Config
import org.apache.doris.regression.util.Http
import org.apache.doris.regression.util.DebugPoint
import org.apache.doris.regression.util.NodeType

import com.google.common.collect.Maps
import org.slf4j.Logger
Expand Down Expand Up @@ -62,13 +64,6 @@ class ListHeader {

}

enum NodeType {

FE,
BE,

}

class ServerNode {

int index
Expand All @@ -83,38 +78,23 @@ class ServerNode {
node.alive = fields.get(header.indexOf('alive')) == 'true'
}

String getHttpAddress() {
return 'http://' + host + ':' + httpPort
def getHttpAddress() {
return [host, httpPort]
}

void enableDebugPoint(String name, Map<String, String> params = null) {
def url = getHttpAddress() + '/api/debug_point/add/' + name
if (params != null && params.size() > 0) {
url += '?' + params.collect((k, v) -> k + '=' + v).join('&')
}
def result = Http.http_post(url, null, true)
checkHttpResult(result)
def (host, port) = getHttpAddress()
DebugPoint.enableDebugPoint(host, port, getNodeType(), name, params)
}

void disableDebugPoint(String name) {
def url = getHttpAddress() + '/api/debug_point/remove/' + name
def result = Http.http_post(url, null, true)
checkHttpResult(result)
def (host, port) = getHttpAddress()
DebugPoint.disableDebugPoint(host, port, getNodeType(), name)
}

void clearDebugPoints() {
def url = getHttpAddress() + '/api/debug_point/clear'
def result = Http.http_post(url, null, true)
checkHttpResult(result)
}

private void checkHttpResult(Object result) {
def type = getNodeType()
if (type == NodeType.FE) {
assert result.code == 0 : result.toString()
} else if (type == NodeType.BE) {
assert result.status == 'OK' : result.toString()
}
def (host, port) = getHttpAddress()
DebugPoint.clearDebugPoints(host, port, getNodeType())
}

NodeType getNodeType() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.regression.util
import org.apache.doris.regression.util.Http
import org.codehaus.groovy.runtime.IOGroovyMethods
import org.apache.doris.regression.suite.Suite

enum NodeType {
FE,
BE,
}

class DebugPoint {
Suite suite

DebugPoint(Suite suite) {
this.suite = suite
}

/* Enable debug point in regression
* Note: set BE config::enable_debug_points = true to take effect
* Parameters:
* host: hostname or ip of target node
* httpPort: http port of target node
* type: NodeType.BE or NodeType.FE
* name: debug point name
* params: timeout, execute, or other customized input params
*/
static def enableDebugPoint(String host, String httpPort, NodeType type, String name, Map<String, String> params = null) {
def url = 'http://' + host + ':' + httpPort + '/api/debug_point/add/' + name
if (params != null && params.size() > 0) {
url += '?' + params.collect((k, v) -> k + '=' + v).join('&')
}
def result = Http.http_post(url, null, true)
checkHttpResult(result, type)
}

/* Disable debug point in regression
* Parameters:
* host: hostname or ip of target node
* httpPort: http port of target node
* type: NodeType.BE or NodeType.FE
* name: debug point name
*/
static def disableDebugPoint(String host, String httpPort, NodeType type, String name) {
def url = 'http://' + host + ':' + httpPort + '/api/debug_point/remove/' + name
def result = Http.http_post(url, null, true)
checkHttpResult(result, type)
}

/* Disable all debug points in regression
* Parameters:
* host: hostname or ip of target node
* httpPort: http port of target node
* type: NodeType.BE or NodeType.FE
*/
static def clearDebugPoints(String host, String httpPort, NodeType type) {
def url = 'http://' + host + ':' + httpPort + '/api/debug_point/clear'
def result = Http.http_post(url, null, true)
checkHttpResult(result, type)
}

def operateDebugPointForAllBEs(Closure closure) {
def ipList = [:]
def portList = [:]
(ipList, portList) = getBEHostAndHTTPPort()
ipList.each { beid, ip ->
closure.call(ip, portList[beid])
}
}

/* Enable specific debug point for all BE node in cluster */
def enableDebugPointForAllBEs(String name, Map<String, String> params = null) {
operateDebugPointForAllBEs({ host, port ->
println "enable debug point $name for BE $host:$port"
enableDebugPoint(host, port, NodeType.BE, name, params)
})
}

/* Disable specific debug point for all BE node in cluster */
def disableDebugPointForAllBEs(String name) {
operateDebugPointForAllBEs { host, port ->
disableDebugPoint(host, port, NodeType.BE, name)
}
}

/* Disable all debug points for all BE node in cluster */
def clearDebugPointsForAllBEs() {
operateDebugPointForAllBEs { host, port ->
clearDebugPoints(host, port, NodeType.BE)
}
}

def getBEHostAndHTTPPort() {
def ipList = [:]
def portList = [:]
suite.getBackendIpHttpPort(ipList, portList)
return [ipList, portList]
}

def getFEHostAndHTTPPort() {
assert false : 'not implemented yet'
}

def enableDebugPointForAllFEs(String name, Map<String, String> params = null) {
assert false : 'not implemented yet'
}

def disableDebugPointForAllFEs(String name) {
assert false : 'not implemented yet'
}

def clearDebugPointsForAllFEs() {
assert false : 'not implemented yet'
}

static void checkHttpResult(Object result, NodeType type) {
if (type == NodeType.FE) {
assert result.code == 0 : result.toString()
} else if (type == NodeType.BE) {
assert result.status == 'OK' : result.toString()
}
}
}

Loading

0 comments on commit 808d74a

Please sign in to comment.