diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 55b8113379e816..43bcf61c92137c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -348,6 +348,15 @@ public void executeQuery(String originStmt) throws Exception { executor.getProfile().getSummaryProfile().setParseSqlStartTime(parseSqlStartTime); executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime); executor.getProfile().getSummaryProfile().parsedByConnectionProcess = true; + // Here we set the MoreStmtExists flag without considering CLIENT_MULTI_STATEMENTS. + // So the master will always set SERVER_MORE_RESULTS_EXISTS when the statement is not the last one. + // When the Follower/Observer received the return result of Master, the Follower/Observer + // will check CLIENT_MULTI_STATEMENTS is set or not. It sends SERVER_MORE_RESULTS_EXISTS back to client + // only when CLIENT_MULTI_STATEMENTS is set. + // See the code below : if (getConnectContext().getMysqlChannel().clientMultiStatements()) + if (i != stmts.size() - 1 && connectType.equals(ConnectType.MYSQL)) { + executor.setMoreStmtExists(true); + } ctx.setExecutor(executor); if (cacheKeyType != null) { @@ -706,6 +715,9 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException result.setQueryId(ctx.queryId()); } result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId()); + if (request.moreResultExists) { + ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS; + } result.setPacket(getResultPacket()); result.setStatus(ctx.getState().toString()); if (ctx.getState().getStateType() == MysqlStateType.OK) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java index 7c87907d6196e7..9dcd379f913f30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java @@ -63,6 +63,7 @@ public class FEOpExecutor { protected int thriftTimeoutMs; protected boolean shouldNotRetry; + protected boolean moreStmtExists = false; public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, ConnectContext ctx, boolean isQuery) { this.feAddr = feAddress; @@ -173,6 +174,7 @@ protected TMasterOpRequest buildStmtForwardParams() throws AnalysisException { params.setStmtId(ctx.getStmtId()); params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); params.setSessionId(ctx.getSessionId()); + params.setMoreResultExists(moreStmtExists); if (Config.isCloudMode()) { String cluster = ""; @@ -224,6 +226,9 @@ public String getErrMsg() { return result.getErrMessage(); } + public void setMoreStmtExists(boolean moreStmtExists) { + this.moreStmtExists = moreStmtExists; + } public ByteBuffer getOutputPacket() { if (result == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 05450a0a6dc3f4..0f8bb745e9dbc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -201,6 +201,9 @@ public class StmtExecutor { private Boolean isForwardedToMaster = null; // Flag for execute prepare statement, need to use binary protocol resultset private boolean isComStmtExecute = false; + // Set to true if there are more stmt need to execute. + // Mainly for forward to master, so that master can set the mysql server status correctly. + private boolean moreStmtExists = false; // The result schema if "dry_run_query" is true. // Only one column to indicate the real return row numbers. @@ -354,6 +357,14 @@ public boolean isForwardToMaster() { return isForwardedToMaster; } + public boolean isMoreStmtExists() { + return moreStmtExists; + } + + public void setMoreStmtExists(boolean moreStmtExists) { + this.moreStmtExists = moreStmtExists; + } + private boolean shouldForwardToMaster() { if (Env.getCurrentEnv().isMaster()) { return false; @@ -945,6 +956,7 @@ private void forwardToMaster() throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId()); } + masterOpExecutor.setMoreStmtExists(moreStmtExists); masterOpExecutor.execute(); if (parsedStmt instanceof LogicalPlanAdapter) { // for nereids command diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 87d8fc8d0cf513..1a801ffd9589c4 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -383,6 +383,7 @@ struct TMasterOpRequest { 29: optional TTxnLoadInfo txnLoadInfo 30: optional TGroupCommitInfo groupCommitInfo 31: optional binary prepareExecuteBuffer + 32: optional bool moreResultExists // Server has more result to send // selectdb cloud 1000: optional string cloud_cluster diff --git a/regression-test/suites/query_p0/test_multiple_stmt.groovy b/regression-test/suites/query_p0/test_multiple_stmt.groovy new file mode 100644 index 00000000000000..4a7e75816d46c7 --- /dev/null +++ b/regression-test/suites/query_p0/test_multiple_stmt.groovy @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_multiple_stmt") { + sql """drop database if exists test_multiple_stmt""" + sql """create database test_multiple_stmt""" + sql """use test_multiple_stmt""" + sql """CREATE TABLE test_multiple_stmt ( + key1 int NOT NULL, + value1 int NOT NULL, + )ENGINE=OLAP + DUPLICATE KEY(`key1`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`key1`) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + def result = sql """insert into test_multiple_stmt values (1, 1); select * from test_multiple_stmt;""" + assertEquals(1, result.size()) + assertEquals(1, result[0][0]) + assertEquals(1, result[0][1]) +} \ No newline at end of file