Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.DorisHttpException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.manager.HttpUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
Expand All @@ -36,6 +41,7 @@
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.NeedAuditEncryption;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
Expand All @@ -46,9 +52,15 @@
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TMemoryScratchSink;
Expand All @@ -64,6 +76,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -235,6 +248,12 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
+ "." + tableName + "]");
}
}
UUID uuid = UUID.randomUUID();
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
context.setQueryId(queryId);
context.setStartTime();
context.setSqlHash(DigestUtils.md5Hex(sql));

NereidsPlanner nereidsPlanner = new NereidsPlanner(context.getStatementContext());
LogicalPlan rewrittenPlan = (LogicalPlan) nereidsPlanner.planWithLock(parsedPlan,
PhysicalProperties.GATHER, ExplainCommand.ExplainLevel.REWRITTEN_PLAN);
Expand Down Expand Up @@ -282,8 +301,8 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
tQueryPlanInfo.plan_fragment = tPlanFragment;
tQueryPlanInfo.desc_tbl = planner.getDescTable().toThrift();
// set query_id
UUID uuid = UUID.randomUUID();
tQueryPlanInfo.query_id = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
tQueryPlanInfo.query_id = queryId;


Map<Long, TTabletVersionInfo> tabletInfo = new HashMap<>();
// acquire resolved tablet distribution
Expand All @@ -309,6 +328,7 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
result.put("partitions", tabletRoutings);
result.put("opaqued_query_plan", opaquedQueryPlan);
result.put("status", 200);
addToAuditLog(context, sql, query);
} finally {
if (needSetParallelResultSinkToFalse) {
sessionVariable.setParallelResultSink(false);
Expand All @@ -317,6 +337,101 @@ private void handleQuery(ConnectContext context, String requestDb, String reques

}

private static void addToAuditLog(ConnectContext ctx, String origStmt, StatementBase parsedStmt) {
// slow query
long elapseMs = System.currentTimeMillis() - ctx.getStartTime();
CatalogIf catalog = ctx.getCurrentCatalog();
String cluster = "";
try {
if (Config.isCloudMode()) {
cluster = ctx.getCloudCluster(false);
}
} catch (ComputeGroupException e) {
LOG.warn("Failed to get cloud cluster", e);
}

AuditEvent.AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
// ATTN: MUST reset, otherwise, the same AuditEventBuilder instance will be used in the next query.
auditEventBuilder.reset();
auditEventBuilder
.setEventType(AuditEvent.EventType.AFTER_QUERY)
.setQueryId(DebugUtil.printId(ctx.queryId()))
.setTimestamp(ctx.getStartTime())
.setClientIp(ctx.getClientIP())
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
.setFeIp(FrontendOptions.getLocalHostAddress())
.setCtl(catalog == null ? InternalCatalog.INTERNAL_CATALOG_NAME : catalog.getName())
.setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
.setState(ctx.getState().toString())
.setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode())
.setErrorMessage((ctx.getState().getErrorMessage() == null ? "" :
ctx.getState().getErrorMessage().replace("\n", " ").replace("\t", " ")))
.setQueryTime(elapseMs)
.setCpuTimeMs(0)
.setPeakMemoryBytes(0)
.setScanBytes(0)
.setScanRows(0)
.setReturnRows(ctx.getReturnRows())
.setSpillWriteBytesToLocalStorage(0)
.setSpillReadBytesFromLocalStorage(0)
.setScanBytesFromLocalStorage(0)
.setScanBytesFromRemoteStorage(0)
.setFuzzyVariables("")
.setCommandType("HttpPlan")
.setStmtType("SELECT")
.setStmtId(ctx.getStmtId())
.setSqlHash(ctx.getSqlHash())
.setIsQuery(true)
.setIsNereids(true)
.setisInternal(false)
.setCloudCluster(Strings.isNullOrEmpty(cluster) ? "UNKNOWN" : cluster)
.setWorkloadGroup(ctx.getWorkloadGroupName());

// sql mode
SessionVariable sessionVariable = ctx.getSessionVariable();
if (sessionVariable != null) {
try {
auditEventBuilder.setSqlMode(SqlModeHelper.decode(sessionVariable.getSqlMode()));
} catch (Exception e) {
LOG.warn("decode sql mode {} failed.", sessionVariable.getSqlMode(), e);
}
}

boolean isSyntaxErr = ctx.getState().getStateType() == QueryState.MysqlStateType.ERR
&& ctx.getState().getErrType() == QueryState.ErrType.SYNTAX_PARSE_ERR;
String encryptSql = isSyntaxErr ? "Syntax Error" : origStmt;
if (isSyntaxErr) {
auditEventBuilder.setErrorMessage("Syntax Error");
}
// We put origin query stmt at the end of audit log, for parsing the log more convenient.
LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
if ((logicalPlan instanceof NeedAuditEncryption)) {
encryptSql = ((NeedAuditEncryption) logicalPlan).geneEncryptionSQL(origStmt);
}

int maxLen = GlobalVariable.auditPluginMaxSqlLength;
encryptSql = AuditLogHelper.truncateByBytes(encryptSql, maxLen,
" ... /* truncated. audit_plugin_max_sql_length=" + maxLen + " */");
auditEventBuilder.setStmt(encryptSql);

if (!Env.getCurrentEnv().isMaster()) {
StmtExecutor executor = ctx.getExecutor();
if (executor != null && executor.isForwardToMaster()) {
auditEventBuilder.setState(executor.getProxyStatus());
int proxyStatusCode = executor.getProxyStatusCode();
if (proxyStatusCode != 0) {
auditEventBuilder.setErrorCode(proxyStatusCode);
auditEventBuilder.setErrorMessage(executor.getProxyErrMsg());
}
}
}
AuditEvent event = auditEventBuilder.build();
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(event);
if (LOG.isDebugEnabled()) {
LOG.debug("submit audit event: {}", event.queryId);
}
}

/**
* acquire all involved (already pruned) tablet routing
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private static Optional<String> handleInsertStmt(String origStmt, StatementBase
}
}

private static String truncateByBytes(String str, int maxLen, String suffix) {
public static String truncateByBytes(String str, int maxLen, String suffix) {
// use `getBytes().length` to get real byte length
if (maxLen >= str.getBytes().length) {
return str;
Expand Down
Loading