Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
}
ctx.addPreparedStatementContext(name,
new PreparedStatementContext(this, ctx, ctx.getStatementContext(), name));
if (ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE) {
if (ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE && !ctx.isProxy()) {
executor.sendStmtPrepareOK(Integer.parseInt(name), labels);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,16 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocal;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.xnio.StreamConnection;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -248,6 +251,10 @@ public enum ConnectType {
// it's default thread-safe
private boolean isProxy = false;

@Getter
@Setter
private ByteBuffer prepareExecuteBuffer;

private MysqlHandshakePacket mysqlHandshakePacket;

public void setUserQueryTimeout(int queryTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.MysqlServerStatusFlag;
Expand All @@ -60,6 +61,7 @@
import org.apache.doris.nereids.parser.SqlDialectHelper;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.proto.Data;
Expand All @@ -83,6 +85,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -647,7 +650,24 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
UUID uuid = UUID.randomUUID();
queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
}
executor.queryRetry(queryId);
if (request.isSetPrepareExecuteBuffer()) {
ctx.setCommand(MysqlCommand.COM_STMT_PREPARE);
executor.execute();
ctx.setCommand(MysqlCommand.COM_STMT_EXECUTE);
String preparedStmtId = executor.getPrepareStmtName();
PreparedStatementContext preparedStatementContext = ctx.getPreparedStementContext(preparedStmtId);
if (preparedStatementContext == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Something error, just support nereids preparedStmtId:{}", preparedStmtId);
}
throw new RuntimeException("Prepare failed when proxy execute");
}
handleExecute(preparedStatementContext.command, Long.parseLong(preparedStmtId),
preparedStatementContext,
ByteBuffer.wrap(request.getPrepareExecuteBuffer()).order(ByteOrder.LITTLE_ENDIAN), queryId);
} else {
executor.queryRetry(queryId);
}
} catch (IOException e) {
// Client failed.
LOG.warn("Process one query failed because IOException: ", e);
Expand Down Expand Up @@ -717,4 +737,10 @@ private Map<String, LiteralExpr> userVariableFromThrift(Map<String, TExprNode> t
throw new TException(e.getMessage());
}
}


protected void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx,
ByteBuffer packetBuf, TUniqueId queryId) {
throw new NotSupportedException("Just MysqlConnectProcessor support execute");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TExprNode;
Expand Down Expand Up @@ -234,6 +235,12 @@ private TMasterOpRequest buildStmtForwardParams() throws AnalysisException {
if (ctx.isTxnModel()) {
params.setTxnLoadInfo(ctx.getTxnEntry().getTxnLoadInfoInObserver());
}

if (ctx.getCommand() == MysqlCommand.COM_STMT_EXECUTE) {
if (null != ctx.getPrepareExecuteBuffer()) {
params.setPrepareExecuteBuffer(ctx.getPrepareExecuteBuffer());
}
}
return params;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.doris.nereids.trees.plans.PlaceholderId;
import org.apache.doris.nereids.trees.plans.commands.ExecuteCommand;
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -104,14 +105,31 @@ private void debugPacket() {
}
}

private void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx) {
private String getHexStr(ByteBuffer packetBuf) {
byte[] bytes = packetBuf.array();
StringBuilder hex = new StringBuilder();
for (int i = packetBuf.position(); i < packetBuf.limit(); ++i) {
hex.append(String.format("%02X ", bytes[i]));
}
return hex.toString();
}

@Override
protected void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx,
ByteBuffer packetBuf, TUniqueId queryId) {
int paramCount = prepareCommand.placeholderCount();
LOG.debug("execute prepared statement {}, paramCount {}", stmtId, paramCount);
// null bitmap
String stmtStr = "";
try {
StatementContext statementContext = prepCtx.statementContext;
if (paramCount > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("execute param buf: {}, array: {}", packetBuf, getHexStr(packetBuf));
}
if (!ctx.isProxy()) {
ctx.setPrepareExecuteBuffer(packetBuf.duplicate());
}
byte[] nullbitmapData = new byte[(paramCount + 7) / 8];
packetBuf.get(nullbitmapData);
// new_params_bind_flag
Expand Down Expand Up @@ -152,7 +170,11 @@ private void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedS
stmt.setOrigStmt(prepareCommand.getOriginalStmt());
executor = new StmtExecutor(ctx, stmt);
ctx.setExecutor(executor);
executor.execute();
if (null != queryId) {
executor.execute(queryId);
} else {
executor.execute();
}
if (ctx.getSessionVariable().isEnablePreparedStmtAuditLog()) {
stmtStr = executeStmt.toSql();
stmtStr = stmtStr + " /*originalSql = " + prepareCommand.getOriginalStmt().originStmt + "*/";
Expand Down Expand Up @@ -195,7 +217,7 @@ private void handleExecute() {
"msg: Not supported such prepared statement");
return;
}
handleExecute(preparedStatementContext.command, stmtId, preparedStatementContext);
handleExecute(preparedStatementContext.command, stmtId, preparedStatementContext, packetBuf, null);
}

// Process COM_QUERY statement,
Expand Down
17 changes: 13 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import org.apache.doris.nereids.minidump.MinidumpUtils;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.rules.exploration.mv.InitMaterializationContextHook;
import org.apache.doris.nereids.trees.expressions.Placeholder;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.InlineTable;
import org.apache.doris.nereids.trees.plans.commands.Command;
Expand Down Expand Up @@ -288,6 +289,7 @@ public class StmtExecutor {
private Data.PQueryStatistics.Builder statisticsForAuditLog;
private boolean isCached;
private String stmtName;
private String prepareStmtName; // for prox
private String mysqlLoadId;
// Handle selects that fe can do without be
private boolean isHandleQueryInFe = false;
Expand Down Expand Up @@ -722,8 +724,12 @@ private void executeByNereids(TUniqueId queryId) throws Exception {
}
long stmtId = Config.prepared_stmt_start_id > 0
? Config.prepared_stmt_start_id : context.getPreparedStmtId();
logicalPlan = new PrepareCommand(String.valueOf(stmtId),
logicalPlan, statementContext.getPlaceholders(), originStmt);
this.prepareStmtName = String.valueOf(stmtId);
// When proxy executing, this.statementContext is created in constructor.
// But context.statementContext is created in LogicalPlanBuilder.
List<Placeholder> placeholders = context == null
? statementContext.getPlaceholders() : context.getStatementContext().getPlaceholders();
logicalPlan = new PrepareCommand(prepareStmtName, logicalPlan, placeholders, originStmt);
}
// when we in transaction mode, we only support insert into command and transaction command
if (context.isTxnModel()) {
Expand All @@ -744,8 +750,7 @@ private void executeByNereids(TUniqueId queryId) throws Exception {
if (logicalPlan instanceof InsertIntoTableCommand) {
profileType = ProfileType.LOAD;
}
if (context.getCommand() == MysqlCommand.COM_STMT_PREPARE
|| context.getCommand() == MysqlCommand.COM_STMT_EXECUTE) {
if (context.getCommand() == MysqlCommand.COM_STMT_PREPARE) {
throw new UserException("Forward master command is not supported for prepare statement");
}
if (isProxy) {
Expand Down Expand Up @@ -3758,4 +3763,8 @@ public void sendProxyQueryResult() throws IOException {
context.getMysqlChannel().sendOnePacket(byteBuffer);
}
}

public String getPrepareStmtName() {
return this.prepareStmtName;
}
}
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ struct TMasterOpRequest {
// transaction load
29: optional TTxnLoadInfo txnLoadInfo
30: optional TGroupCommitInfo groupCommitInfo
31: optional binary prepareExecuteBuffer

// selectdb cloud
1000: optional string cloud_cluster
Expand Down
7 changes: 6 additions & 1 deletion regression-test/suites/query_p0/test_forward_qeury.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ suite("test_forward_query", 'docker') {

cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.forward_all_queries' : [forwardAllQueries:true, execute:1]])

def ret = sql """ SELECT * FROM ${tbl} """
def stmt = prepareStatement("""INSERT INTO ${tbl} VALUES(?);""")
stmt.setInt(1, 2)
stmt.executeUpdate()

def ret = sql """ SELECT * FROM ${tbl} order by k1"""
assertEquals(ret[0][0], 1)
assertEquals(ret[1][0], 2)
}
}
Loading