Skip to content

Commit

Permalink
[fix](regression) Add regression for group commit executed on observe… (
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi authored Nov 10, 2023
1 parent 70fdd1f commit ca47d75
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,7 @@ private void handleInsertStmt() throws Exception {
TransactionStatus txnStatus = TransactionStatus.ABORTED;
String errMsg = "";
TableType tblType = insertStmt.getTargetTable().getType();
boolean isGroupCommit = false;
if (context.isTxnModel()) {
if (insertStmt.getQueryStmt() instanceof SelectStmt) {
if (((SelectStmt) insertStmt.getQueryStmt()).getTableRefs().size() > 0) {
Expand All @@ -1824,6 +1825,7 @@ private void handleInsertStmt() throws Exception {
label = context.getTxnEntry().getLabel();
txnId = context.getTxnEntry().getTxnConf().getTxnId();
} else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) {
isGroupCommit = true;
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt;
int maxRetry = 3;
for (int i = 0; i < maxRetry; i++) {
Expand Down Expand Up @@ -2013,6 +2015,9 @@ private void handleInsertStmt() throws Exception {
if (!Strings.isNullOrEmpty(errMsg)) {
sb.append(", 'err':'").append(errMsg).append("'");
}
if (isGroupCommit) {
sb.append(", 'query_id':'").append(DebugUtil.printId(context.queryId)).append("'");
}
sb.append("}");

context.getState().setOk(loadedRows, filteredRows, sb.toString());
Expand Down
53 changes: 52 additions & 1 deletion regression-test/suites/insert_p0/insert_group_commit_into.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

import com.mysql.cj.jdbc.StatementImpl
import org.codehaus.groovy.runtime.IOGroovyMethods

suite("insert_group_commit_into") {
def dbName = "regression_test_insert_p0"
Expand Down Expand Up @@ -65,6 +66,7 @@ suite("insert_group_commit_into") {
// assertEquals(result, expected_row_count)
assertTrue(serverInfo.contains("'status':'PREPARE'"))
assertTrue(serverInfo.contains("'label':'group_commit_"))
return serverInfo
}

def none_group_commit_insert = { sql, expected_row_count ->
Expand Down Expand Up @@ -211,6 +213,55 @@ suite("insert_group_commit_into") {
// try_sql("DROP TABLE ${table}")
}

// test connect to observer fe
try {
def fes = sql_return_maparray "show frontends"
logger.info("frontends: ${fes}")
if (fes.size() > 1) {
def observer_fe = null
for (def fe : fes) {
if (fe.IsMaster == "false") {
observer_fe = fe
break
}
}
if (observer_fe != null) {
def url = "jdbc:mysql://${observer_fe.Host}:${observer_fe.QueryPort}/"
logger.info("observer url: " + url)
connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = url) {
sql """ set enable_insert_group_commit = true; """
sql """ set enable_nereids_dml = false; """
sql """ set enable_profile= true; """

// 1. insert into
def server_info = group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1
assertTrue(server_info.contains('query_id'))
// get query_id, such as 43f87963586a482a-b0496bcf9e2b5555
def query_id_index = server_info.indexOf("'query_id':'") + "'query_id':'".length()
def query_id = server_info.substring(query_id_index, query_id_index + 33)
logger.info("query_id: " + query_id)
// 2. check profile
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET -u ${context.config.jdbcUser}:${context.config.jdbcPassword} http://${observer_fe.Host}:${observer_fe.HttpPort}")
sb.append("/api/profile?query_id=").append(query_id)
String command = sb.toString()
logger.info(command)
def process = command.execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
logger.info("Get profile: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def json = parseJson(out)
assertEquals("success", json.msg.toLowerCase())
}
}
} else {
logger.info("only one fe, skip test connect to observer fe")
}
} finally {
}

// table with array type
tableName = "insert_group_commit_into_duplicate_array"
table = dbName + "." + tableName
Expand Down Expand Up @@ -266,4 +317,4 @@ suite("insert_group_commit_into") {
// try_sql("DROP TABLE ${table}")
}
}
}
}

0 comments on commit ca47d75

Please sign in to comment.