Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](regression) fault injection for segcompaction test #25709

Merged
merged 4 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/thread_context.h"
#include "util/debug_points.h"
#include "util/slice.h"
#include "util/time.h"
#include "vec/columns/column.h"
Expand Down Expand Up @@ -677,6 +678,8 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(

Status BetaRowsetWriter::_check_segment_number_limit() {
size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
{ total_segment_num = dp->param("segnum", 1024); });
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
"too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, "
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/thread_context.h"
#include "util/debug_points.h"
#include "util/mem_info.h"
#include "util/time.h"
#include "vec/olap/vertical_block_reader.h"
Expand Down Expand Up @@ -167,19 +168,23 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
}
}

DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; });
if (raw_rows_read != sum_src_row) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction read row num does not match source. expect read row:{}, actual read "
"row:{}",
sum_src_row, raw_rows_read);
}

DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; });
if ((output_rows + merged_rows) != raw_rows_read) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction total row num does not match after merge. expect total row:{}, "
"actual total row:{}, (output_rows:{},merged_rows:{})",
raw_rows_read, output_rows + merged_rows, output_rows, merged_rows);
}
DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_filtered_rows",
{ filtered_rows++; });
if (filtered_rows != 0) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction should not have filtered rows but actual filtered rows:{}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --

-- !select_default --

-- !select_default --

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.doris.regression.action.HttpCliAction
import org.apache.doris.regression.util.JdbcUtils
import org.apache.doris.regression.util.Hdfs
import org.apache.doris.regression.util.SuiteUtils
import org.apache.doris.regression.util.DebugPoint
import org.junit.jupiter.api.Assertions
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -69,12 +70,14 @@ class Suite implements GroovyInterceptable {
final List<Future> lazyCheckFutures = new Vector<>()

SuiteCluster cluster
DebugPoint debugPoint

Suite(String name, String group, SuiteContext context) {
this.name = name
this.group = group
this.context = context
this.cluster = null
this.debugPoint = new DebugPoint(this)
}

String getConf(String key, String defaultValue = null) {
Expand Down Expand Up @@ -476,7 +479,7 @@ class Suite implements GroovyInterceptable {
String s3Url = "http://${s3BucketName}.${s3Endpoint}"
return s3Url
}

void scpFiles(String username, String host, String files, String filePath, boolean fromDst=true) {
String cmd = "scp -r ${username}@${host}:${files} ${filePath}"
if (!fromDst) {
Expand All @@ -487,7 +490,7 @@ class Suite implements GroovyInterceptable {
def code = process.waitFor()
Assert.assertEquals(0, code)
}

void sshExec(String username, String host, String cmd) {
String command = "ssh ${username}@${host} '${cmd}'"
def cmds = ["/bin/bash", "-c", command]
Expand All @@ -499,7 +502,7 @@ class Suite implements GroovyInterceptable {
assert errMsg.length() == 0: "error occurred!" + errMsg
assert p.exitValue() == 0
}


void getBackendIpHttpPort(Map<String, String> backendId_to_backendIP, Map<String, String> backendId_to_backendHttpPort) {
List<List<Object>> backends = sql("show backends");
Expand All @@ -509,7 +512,7 @@ class Suite implements GroovyInterceptable {
backendId_to_backendHttpPort.put(String.valueOf(backend[0]), String.valueOf(backend[4]));
}
return;
}
}

int getTotalLine(String filePath) {
def file = new File(filePath)
Expand Down Expand Up @@ -693,14 +696,14 @@ class Suite implements GroovyInterceptable {
String cleanedSqlStr = sql.replaceAll("\\s*;\\s*\$", "")
sql = cleanedSqlStr
}
quickRunTest(tag, sql, isOrder)
quickRunTest(tag, sql, isOrder)
}

void quickExecute(String tag, PreparedStatement stmt) {
logger.info("Execute tag: ${tag}, sql: ${stmt}".toString())
quickRunTest(tag, stmt)
quickRunTest(tag, stmt)
}

@Override
Object invokeMethod(String name, Object args) {
// qt: quick test
Expand Down Expand Up @@ -761,5 +764,9 @@ class Suite implements GroovyInterceptable {
// set server side prepared statement url
return "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + database + "?&useServerPrepStmts=true"
}

DebugPoint GetDebugPoint() {
return debugPoint
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.doris.regression.suite

import org.apache.doris.regression.Config
import org.apache.doris.regression.util.Http
import org.apache.doris.regression.util.DebugPoint
import org.apache.doris.regression.util.NodeType

import com.google.common.collect.Maps
import org.slf4j.Logger
Expand Down Expand Up @@ -62,13 +64,6 @@ class ListHeader {

}

enum NodeType {

FE,
BE,

}

class ServerNode {

int index
Expand All @@ -83,38 +78,23 @@ class ServerNode {
node.alive = fields.get(header.indexOf('alive')) == 'true'
}

String getHttpAddress() {
return 'http://' + host + ':' + httpPort
def getHttpAddress() {
return [host, httpPort]
}

void enableDebugPoint(String name, Map<String, String> params = null) {
def url = getHttpAddress() + '/api/debug_point/add/' + name
if (params != null && params.size() > 0) {
url += '?' + params.collect((k, v) -> k + '=' + v).join('&')
}
def result = Http.http_post(url, null, true)
checkHttpResult(result)
def (host, port) = getHttpAddress()
DebugPoint.enableDebugPoint(host, port, getNodeType(), name, params)
}

void disableDebugPoint(String name) {
def url = getHttpAddress() + '/api/debug_point/remove/' + name
def result = Http.http_post(url, null, true)
checkHttpResult(result)
def (host, port) = getHttpAddress()
DebugPoint.disableDebugPoint(host, port, getNodeType(), name)
}

void clearDebugPoints() {
def url = getHttpAddress() + '/api/debug_point/clear'
def result = Http.http_post(url, null, true)
checkHttpResult(result)
}

private void checkHttpResult(Object result) {
def type = getNodeType()
if (type == NodeType.FE) {
assert result.code == 0 : result.toString()
} else if (type == NodeType.BE) {
assert result.status == 'OK' : result.toString()
}
def (host, port) = getHttpAddress()
DebugPoint.clearDebugPoints(host, port, getNodeType())
}

NodeType getNodeType() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.regression.util
import org.apache.doris.regression.util.Http
import org.codehaus.groovy.runtime.IOGroovyMethods
import org.apache.doris.regression.suite.Suite

enum NodeType {
FE,
BE,
}

class DebugPoint {
Suite suite

DebugPoint(Suite suite) {
this.suite = suite
}

/* Enable debug point in regression
* Note: set BE config::enable_debug_points = true to take effect
* Parameters:
* host: hostname or ip of target node
* httpPort: http port of target node
* type: NodeType.BE or NodeType.FE
* name: debug point name
* params: timeout, execute, or other customized input params
*/
static def enableDebugPoint(String host, String httpPort, NodeType type, String name, Map<String, String> params = null) {
def url = 'http://' + host + ':' + httpPort + '/api/debug_point/add/' + name
if (params != null && params.size() > 0) {
url += '?' + params.collect((k, v) -> k + '=' + v).join('&')
}
def result = Http.http_post(url, null, true)
checkHttpResult(result, type)
}

/* Disable debug point in regression
* Parameters:
* host: hostname or ip of target node
* httpPort: http port of target node
* type: NodeType.BE or NodeType.FE
* name: debug point name
*/
static def disableDebugPoint(String host, String httpPort, NodeType type, String name) {
def url = 'http://' + host + ':' + httpPort + '/api/debug_point/remove/' + name
def result = Http.http_post(url, null, true)
checkHttpResult(result, type)
}

/* Disable all debug points in regression
* Parameters:
* host: hostname or ip of target node
* httpPort: http port of target node
* type: NodeType.BE or NodeType.FE
*/
static def clearDebugPoints(String host, String httpPort, NodeType type) {
def url = 'http://' + host + ':' + httpPort + '/api/debug_point/clear'
def result = Http.http_post(url, null, true)
checkHttpResult(result, type)
}

def operateDebugPointForAllBEs(Closure closure) {
def ipList = [:]
def portList = [:]
(ipList, portList) = getBEHostAndHTTPPort()
ipList.each { beid, ip ->
closure.call(ip, portList[beid])
}
}

/* Enable specific debug point for all BE node in cluster */
def enableDebugPointForAllBEs(String name, Map<String, String> params = null) {
operateDebugPointForAllBEs({ host, port ->
println "enable debug point $name for BE $host:$port"
enableDebugPoint(host, port, NodeType.BE, name, params)
})
}

/* Disable specific debug point for all BE node in cluster */
def disableDebugPointForAllBEs(String name) {
operateDebugPointForAllBEs { host, port ->
disableDebugPoint(host, port, NodeType.BE, name)
}
}

/* Disable all debug points for all BE node in cluster */
def clearDebugPointsForAllBEs() {
operateDebugPointForAllBEs { host, port ->
clearDebugPoints(host, port, NodeType.BE)
}
}

def getBEHostAndHTTPPort() {
def ipList = [:]
def portList = [:]
suite.getBackendIpHttpPort(ipList, portList)
return [ipList, portList]
}

def getFEHostAndHTTPPort() {
assert false : 'not implemented yet'
}

def enableDebugPointForAllFEs(String name, Map<String, String> params = null) {
assert false : 'not implemented yet'
}

def disableDebugPointForAllFEs(String name) {
assert false : 'not implemented yet'
}

def clearDebugPointsForAllFEs() {
assert false : 'not implemented yet'
}

static void checkHttpResult(Object result, NodeType type) {
if (type == NodeType.FE) {
assert result.code == 0 : result.toString()
} else if (type == NodeType.BE) {
assert result.status == 'OK' : result.toString()
}
}
}

Loading
Loading