Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -6155,7 +6155,7 @@ public void replayDropGlobalFunction(FunctionSearchDesc functionSearchDesc) {
* we can't set callback which is in fe-core to config items which are in fe-common. so wrap them here. it's not so
* good but is best for us now.
*/
public void setMutableConfigwithCallback(String key, String value) throws ConfigException {
public void setMutableConfigWithCallback(String key, String value) throws ConfigException {
ConfigBase.setMutableConfig(key, value);
if (configtoThreads.get(key) != null) {
try {
Expand All @@ -6175,7 +6175,7 @@ public void setConfig(AdminSetConfigStmt stmt) throws Exception {

for (Map.Entry<String, String> entry : configs.entrySet()) {
try {
setMutableConfigwithCallback(entry.getKey(), entry.getValue());
setMutableConfigWithCallback(entry.getKey(), entry.getValue());
} catch (ConfigException e) {
throw new DdlException(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected Object set_config(HttpServletRequest request, HttpServletResponse resp
try {
if (confValue != null && confValue.length == 1) {
try {
Env.getCurrentEnv().setMutableConfigwithCallback(confKey, confValue[0]);
Env.getCurrentEnv().setMutableConfigWithCallback(confKey, confValue[0]);
} catch (ConfigException e) {
throw new DdlException(e.getMessage());
}
Expand Down
139 changes: 108 additions & 31 deletions fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TExprNode;
Expand All @@ -39,22 +41,28 @@
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* FEOpExecutor is used to send request to specific FE
*/
public class FEOpExecutor {
private static final Logger LOG = LogManager.getLogger(FEOpExecutor.class);

private static final float RPC_TIMEOUT_COEFFICIENT = 1.2f;
protected static final float RPC_TIMEOUT_COEFFICIENT = 1.2f;

private final OriginStatement originStmt;
private final ConnectContext ctx;
private TMasterOpResult result;
private TNetworkAddress feAddr;
protected final OriginStatement originStmt;
protected final ConnectContext ctx;
protected TMasterOpResult result;
protected TNetworkAddress feAddr;

// the total time of thrift connectTime, readTime and writeTime
private int thriftTimeoutMs;
protected int thriftTimeoutMs;

private boolean shouldNotRetry;
protected boolean shouldNotRetry;

public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, ConnectContext ctx, boolean isQuery) {
this.feAddr = feAddress;
Expand All @@ -66,7 +74,15 @@ public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, Conne
}

public void execute() throws Exception {
result = forward(feAddr, buildStmtForwardParams());
result = forward(buildStmtForwardParams());
if (ctx.isTxnModel()) {
if (result.isSetTxnLoadInfo()) {
ctx.getTxnEntry().setTxnLoadInfoInObserver(result.getTxnLoadInfo());
} else {
ctx.setTxnEntry(null);
LOG.info("set txn entry to null");
}
}
}

public void cancel() throws Exception {
Expand All @@ -84,22 +100,24 @@ public void cancel() throws Exception {
request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
// just make the protocol happy
request.setSql("");
result = forward(feAddr, request);
result = forward(request);
}

// Send request to specific fe
private TMasterOpResult forward(TNetworkAddress thriftAddress, TMasterOpRequest params) throws Exception {
protected TMasterOpResult forward(TMasterOpRequest params) throws Exception {
ctx.getEnv().checkReadyOrThrow();

FrontendService.Client client;
try {
client = ClientPool.frontendPool.borrowObject(thriftAddress, thriftTimeoutMs);
client = ClientPool.frontendPool.borrowObject(feAddr, thriftTimeoutMs);
} catch (Exception e) {
// may throw NullPointerException. add err msg
throw new Exception("Failed to get fe client: " + thriftAddress.toString(), e);
throw new Exception("Failed to get master client.", e);
}
final StringBuilder forwardMsg = new StringBuilder("forward to master FE " + feAddr.toString());
if (!params.isSyncJournalOnly()) {
forwardMsg.append(", statement id: ").append(ctx.getStmtId());
}
final StringBuilder forwardMsg = new StringBuilder("forward to FE " + thriftAddress.toString());
forwardMsg.append(", statement id: ").append(ctx.getStmtId());
LOG.info(forwardMsg.toString());

boolean isReturnToPool = false;
Expand All @@ -110,7 +128,7 @@ private TMasterOpResult forward(TNetworkAddress thriftAddress, TMasterOpRequest
} catch (TTransportException e) {
// wrap the raw exception.
forwardMsg.append(" : failed");
Exception exception = new ForwardToFEException(forwardMsg.toString(), e);
Exception exception = new ForwardToMasterException(forwardMsg.toString(), e);

boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs);
if (!ok) {
Expand All @@ -130,14 +148,14 @@ private TMasterOpResult forward(TNetworkAddress thriftAddress, TMasterOpRequest
}
} finally {
if (isReturnToPool) {
ClientPool.frontendPool.returnObject(thriftAddress, client);
ClientPool.frontendPool.returnObject(feAddr, client);
} else {
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
ClientPool.frontendPool.invalidateObject(feAddr, client);
}
}
}

private TMasterOpRequest buildStmtForwardParams() {
protected TMasterOpRequest buildStmtForwardParams() throws AnalysisException {
TMasterOpRequest params = new TMasterOpRequest();
// node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
Expand All @@ -151,22 +169,38 @@ private TMasterOpRequest buildStmtForwardParams() {
params.setUserIp(ctx.getRemoteIP());
params.setStmtId(ctx.getStmtId());
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
params.setSessionId(ctx.getSessionId());

String cluster = "";
try {
ctx.getCloudCluster(false);
} catch (ComputeGroupException e) {
LOG.warn("failed to get cloud cluster", e);
}
if (!Strings.isNullOrEmpty(cluster)) {
params.setCloudCluster(cluster);
if (Config.isCloudMode()) {
String cluster = "";
try {
cluster = ctx.getCloudCluster(false);
} catch (Exception e) {
LOG.warn("failed to get cloud compute group", e);
}
if (!Strings.isNullOrEmpty(cluster)) {
params.setCloudCluster(cluster);
}
}

// session variables
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
params.setUserVariables(getForwardUserVariables(ctx.getUserVars()));
if (null != ctx.queryId()) {
params.setQueryId(ctx.queryId());
}

// set transaction load info
if (ctx.isTxnModel()) {
params.setTxnLoadInfo(ctx.getTxnEntry().getTxnLoadInfoInObserver());
}

if (ctx.getCommand() == MysqlCommand.COM_STMT_EXECUTE) {
if (null != ctx.getPrepareExecuteBuffer()) {
params.setPrepareExecuteBuffer(ctx.getPrepareExecuteBuffer());
}
}

return params;
}

Expand All @@ -187,6 +221,48 @@ public String getErrMsg() {
return result.getErrMessage();
}


public ByteBuffer getOutputPacket() {
if (result == null) {
return null;
}
return result.packet;
}

public TUniqueId getQueryId() {
if (result != null && result.isSetQueryId()) {
return result.getQueryId();
} else {
return null;
}
}

public String getProxyStatus() {
if (result == null) {
return QueryState.MysqlStateType.UNKNOWN.name();
}
if (!result.isSetStatus()) {
return QueryState.MysqlStateType.UNKNOWN.name();
} else {
return result.getStatus();
}
}

public ShowResultSet getProxyResultSet() {
if (result == null) {
return null;
}
if (result.isSetResultSet()) {
return new ShowResultSet(result.resultSet);
} else {
return null;
}
}

public List<ByteBuffer> getQueryResultBufList() {
return result.isSetQueryResultBufList() ? result.getQueryResultBufList() : Collections.emptyList();
}

private Map<String, TExprNode> getForwardUserVariables(Map<String, LiteralExpr> userVariables) {
Map<String, TExprNode> forwardVariables = Maps.newHashMap();
for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) {
Expand All @@ -198,21 +274,22 @@ private Map<String, TExprNode> getForwardUserVariables(Map<String, LiteralExpr>
return forwardVariables;
}

public static class ForwardToFEException extends RuntimeException {

protected static class ForwardToMasterException extends RuntimeException {
private static final Map<Integer, String> TYPE_MSG_MAP =
ImmutableMap.<Integer, String>builder()
.put(TTransportException.UNKNOWN, "Unknown exception")
.put(TTransportException.NOT_OPEN, "Connection is not open")
.put(TTransportException.ALREADY_OPEN, "Connection has already opened up")
.put(TTransportException.TIMED_OUT, "Connection timeout")
.put(TTransportException.TIMED_OUT,
"Connection timeout, please check network state or enlarge session variable:"
+ "`query_timeout`/`insert_timeout`")
.put(TTransportException.END_OF_FILE, "EOF")
.put(TTransportException.CORRUPTED_DATA, "Corrupted data")
.build();

private final String msg;

public ForwardToFEException(String msg, TTransportException exception) {
public ForwardToMasterException(String msg, TTransportException exception) {
this.msg = msg + ", cause: " + TYPE_MSG_MAP.get(exception.getType()) + ", " + exception.getMessage();
}

Expand Down
Loading