Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
}
ctx.addPreparedStatementContext(name,
new PreparedStatementContext(this, ctx, ctx.getStatementContext(), name));
if (ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE) {
if (ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE && !ctx.isProxy()) {
executor.sendStmtPrepareOK(Integer.parseInt(name), labels);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,16 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocal;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.xnio.StreamConnection;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -239,6 +242,10 @@ public enum ConnectType {
// it's default thread-safe
private boolean isProxy = false;

@Getter
@Setter
private ByteBuffer prepareExecuteBuffer;

private MysqlHandshakePacket mysqlHandshakePacket;

public void setUserQueryTimeout(int queryTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.plugin.DialectConverterPlugin;
Expand All @@ -87,6 +88,7 @@
import java.io.IOException;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -693,7 +695,6 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
ctx.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(ctx.qualifiedUser));

ctx.setThreadLocalInfo();
StmtExecutor executor = null;
try {
// 0 for compatibility.
int idx = request.isSetStmtIdx() ? request.getStmtIdx() : 0;
Expand Down Expand Up @@ -722,7 +723,25 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
}

executor.execute(queryId);
if (request.isSetPrepareExecuteBuffer()) {
ctx.setCommand(MysqlCommand.COM_STMT_PREPARE);
executor.execute();
ctx.setCommand(MysqlCommand.COM_STMT_EXECUTE);
String preparedStmtId = executor.getPrepareStmtName();
PreparedStatementContext preparedStatementContext = ctx.getPreparedStementContext(preparedStmtId);
if (preparedStatementContext == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Something error, just support nereids preparedStmtId:{}", preparedStmtId);
}
throw new RuntimeException("Prepare failed when proxy execute");
}
handleExecute(preparedStatementContext.command, Long.parseLong(preparedStmtId),
preparedStatementContext,
ByteBuffer.wrap(request.getPrepareExecuteBuffer()).order(ByteOrder.LITTLE_ENDIAN), queryId);
} else {
executor.execute(queryId);
}

} catch (IOException e) {
// Client failed.
LOG.warn("Process one query failed because IOException: ", e);
Expand Down Expand Up @@ -772,6 +791,11 @@ public void processOnce() throws IOException, NotImplementedException {
throw new NotImplementedException("Not Impl processOnce");
}

protected void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx,
ByteBuffer packetBuf, TUniqueId queryId) {
throw new NotSupportedException("Just MysqlConnectProcessor support execute");
}

private Map<String, LiteralExpr> userVariableFromThrift(Map<String, TExprNode> thriftMap) throws TException {
try {
Map<String, LiteralExpr> userVariables = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TExprNode;
Expand Down Expand Up @@ -183,6 +184,12 @@ private TMasterOpRequest buildStmtForwardParams() {
if (null != ctx.queryId()) {
params.setQueryId(ctx.queryId());
}
if (ctx.getCommand() == MysqlCommand.COM_STMT_EXECUTE) {
if (null != ctx.getPrepareExecuteBuffer()) {
params.setPrepareExecuteBuffer(ctx.getPrepareExecuteBuffer());
}
}

return params;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.doris.nereids.trees.plans.PlaceholderId;
import org.apache.doris.nereids.trees.plans.commands.ExecuteCommand;
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -170,14 +171,32 @@ private void handleExecute(PrepareStmt prepareStmt, long stmtId) {
}
}

private void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx) {
private String getHexStr(ByteBuffer packetBuf) {
byte[] bytes = packetBuf.array();
StringBuilder hex = new StringBuilder();
for (int i = packetBuf.position(); i < packetBuf.limit(); ++i) {
hex.append(String.format("%02X ", bytes[i]));
}
return hex.toString();
}

@Override
protected void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx,
ByteBuffer packetBuf, TUniqueId queryId) {
int paramCount = prepareCommand.placeholderCount();
LOG.debug("execute prepared statement {}, paramCount {}", stmtId, paramCount);
// null bitmap
String stmtStr = "";
try {
StatementContext statementContext = prepCtx.statementContext;
if (paramCount > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("execute param buf: {}, array: {}", packetBuf, getHexStr(packetBuf));
}
if (!ctx.isProxy()) {
ctx.setPrepareExecuteBuffer(packetBuf.duplicate());
}

byte[] nullbitmapData = new byte[(paramCount + 7) / 8];
packetBuf.get(nullbitmapData);
// new_params_bind_flag
Expand Down Expand Up @@ -218,7 +237,12 @@ private void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedS
stmt.setOrigStmt(prepareCommand.getOriginalStmt());
executor = new StmtExecutor(ctx, stmt);
ctx.setExecutor(executor);
executor.execute();
if (null != queryId) {
executor.execute(queryId);
} else {
executor.execute();
}

if (ctx.getSessionVariable().isEnablePreparedStmtAuditLog()) {
stmtStr = executeStmt.toSql();
stmtStr = stmtStr + " /*originalSql = " + prepareCommand.getOriginalStmt().originStmt + "*/";
Expand All @@ -233,6 +257,7 @@ private void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedS
if (ctx.getSessionVariable().isEnablePreparedStmtAuditLog()) {
auditAfterExec(stmtStr, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true);
}
ctx.setPrepareExecuteBuffer(null);
}

// process COM_EXECUTE, parse binary row data
Expand Down Expand Up @@ -266,7 +291,7 @@ private void handleExecute() {
"msg: Not supported such prepared statement");
return;
}
handleExecute(preparedStatementContext.command, stmtId, preparedStatementContext);
handleExecute(preparedStatementContext.command, stmtId, preparedStatementContext, packetBuf, null);
}
}

Expand Down
12 changes: 10 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
import org.apache.doris.nereids.minidump.MinidumpUtils;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.rules.exploration.mv.InitMaterializationContextHook;
import org.apache.doris.nereids.trees.expressions.Placeholder;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand;
Expand Down Expand Up @@ -267,6 +268,7 @@ public class StmtExecutor {
private Data.PQueryStatistics.Builder statisticsForAuditLog;
private boolean isCached;
private String stmtName;
private String prepareStmtName; // for proxy
private StatementBase prepareStmt = null;
private String mysqlLoadId;
// Distinguish from prepare and execute command
Expand Down Expand Up @@ -681,8 +683,10 @@ private void executeByNereids(TUniqueId queryId) throws Exception {
}
long stmtId = Config.prepared_stmt_start_id > 0
? Config.prepared_stmt_start_id : context.getPreparedStmtId();
logicalPlan = new PrepareCommand(String.valueOf(stmtId),
logicalPlan, statementContext.getPlaceholders(), originStmt);
this.prepareStmtName = String.valueOf(stmtId);
List<Placeholder> placeholders = context == null
? statementContext.getPlaceholders() : context.getStatementContext().getPlaceholders();
logicalPlan = new PrepareCommand(prepareStmtName, logicalPlan, placeholders, originStmt);
}
// when we in transaction mode, we only support insert into command and transaction command
if (context.isTxnModel()) {
Expand Down Expand Up @@ -3477,4 +3481,8 @@ public void sendProxyQueryResult() throws IOException {
context.getMysqlChannel().sendOnePacket(byteBuffer);
}
}

public String getPrepareStmtName() {
return this.prepareStmtName;
}
}
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ struct TMasterOpRequest {
// transaction load
29: optional TTxnLoadInfo txnLoadInfo
30: optional TGroupCommitInfo groupCommitInfo
31: optional binary prepareExecuteBuffer
}

struct TColumnDefinition {
Expand Down
Loading