Skip to content

Commit fc5d621

Browse files
starocean999Your Name
authored andcommitted
[feature](QueryPlanAction)add sql received from table query plan action into audit log (#58739)
after this pr, the sql received from table query plan action will be shown in fe.audit.log like bellow: ``` 2025-12-05 10:44:54,703 [query] |Timestamp=**2025-12-05 10:44:49.496**|Client=|User=**admin**|Ctl=internal|Db=|CommandType=**HttpPlan**|State=OK|ErrorCode=0|ErrorMessage=|Time(ms)=12|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=0|QueryId=e3f55b6e86f04add-bbf511c6cfe555a9|IsQuery=true|IsNereids=true|FeIp=10.16.10.3|Stmt=**select * from tt.load_csv_datetime_conversion;**|CpuTimeMS=0|ShuffleSendBytes=-1|ShuffleSendRows=-1|SqlHash=14cc20060e3baef061099ed185864d11|PeakMemoryBytes=0|SqlDigest=|WorkloadGroup=|FuzzyVariables=|scanBytesFromLocalStorage=0|scanBytesFromRemoteStorage=0 ```
1 parent 9ac212e commit fc5d621

File tree

2 files changed

+118
-3
lines changed

2 files changed

+118
-3
lines changed

fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/TableQueryPlanAction.java

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,15 @@
2222
import org.apache.doris.catalog.Env;
2323
import org.apache.doris.catalog.Table;
2424
import org.apache.doris.catalog.TableIf;
25+
import org.apache.doris.cloud.qe.ComputeGroupException;
26+
import org.apache.doris.cluster.ClusterNamespace;
2527
import org.apache.doris.common.Config;
2628
import org.apache.doris.common.DorisHttpException;
2729
import org.apache.doris.common.MetaNotFoundException;
30+
import org.apache.doris.common.util.DebugUtil;
2831
import org.apache.doris.common.util.NetUtils;
32+
import org.apache.doris.datasource.CatalogIf;
33+
import org.apache.doris.datasource.InternalCatalog;
2934
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
3035
import org.apache.doris.httpv2.rest.manager.HttpUtils;
3136
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -36,6 +41,7 @@
3641
import org.apache.doris.nereids.properties.PhysicalProperties;
3742
import org.apache.doris.nereids.trees.plans.commands.Command;
3843
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
44+
import org.apache.doris.nereids.trees.plans.commands.NeedAuditEncryption;
3945
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
4046
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
4147
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
@@ -46,9 +52,15 @@
4652
import org.apache.doris.nereids.util.RelationUtil;
4753
import org.apache.doris.planner.PlanFragment;
4854
import org.apache.doris.planner.ScanNode;
55+
import org.apache.doris.plugin.AuditEvent;
56+
import org.apache.doris.qe.AuditLogHelper;
4957
import org.apache.doris.qe.ConnectContext;
5058
import org.apache.doris.qe.GlobalVariable;
59+
import org.apache.doris.qe.QueryState;
5160
import org.apache.doris.qe.SessionVariable;
61+
import org.apache.doris.qe.SqlModeHelper;
62+
import org.apache.doris.qe.StmtExecutor;
63+
import org.apache.doris.service.FrontendOptions;
5264
import org.apache.doris.thrift.TDataSink;
5365
import org.apache.doris.thrift.TDataSinkType;
5466
import org.apache.doris.thrift.TMemoryScratchSink;
@@ -64,6 +76,7 @@
6476
import io.netty.handler.codec.http.HttpResponseStatus;
6577
import jakarta.servlet.http.HttpServletRequest;
6678
import jakarta.servlet.http.HttpServletResponse;
79+
import org.apache.commons.codec.digest.DigestUtils;
6780
import org.apache.logging.log4j.LogManager;
6881
import org.apache.logging.log4j.Logger;
6982
import org.apache.thrift.TException;
@@ -235,6 +248,12 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
235248
+ "." + tableName + "]");
236249
}
237250
}
251+
UUID uuid = UUID.randomUUID();
252+
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
253+
context.setQueryId(queryId);
254+
context.setStartTime();
255+
context.setSqlHash(DigestUtils.md5Hex(sql));
256+
238257
NereidsPlanner nereidsPlanner = new NereidsPlanner(context.getStatementContext());
239258
LogicalPlan rewrittenPlan = (LogicalPlan) nereidsPlanner.planWithLock(parsedPlan,
240259
PhysicalProperties.GATHER, ExplainCommand.ExplainLevel.REWRITTEN_PLAN);
@@ -282,8 +301,8 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
282301
tQueryPlanInfo.plan_fragment = tPlanFragment;
283302
tQueryPlanInfo.desc_tbl = planner.getDescTable().toThrift();
284303
// set query_id
285-
UUID uuid = UUID.randomUUID();
286-
tQueryPlanInfo.query_id = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
304+
tQueryPlanInfo.query_id = queryId;
305+
287306

288307
Map<Long, TTabletVersionInfo> tabletInfo = new HashMap<>();
289308
// acquire resolved tablet distribution
@@ -309,6 +328,7 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
309328
result.put("partitions", tabletRoutings);
310329
result.put("opaqued_query_plan", opaquedQueryPlan);
311330
result.put("status", 200);
331+
addToAuditLog(context, sql, query);
312332
} finally {
313333
if (needSetParallelResultSinkToFalse) {
314334
sessionVariable.setParallelResultSink(false);
@@ -317,6 +337,101 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
317337

318338
}
319339

340+
private static void addToAuditLog(ConnectContext ctx, String origStmt, StatementBase parsedStmt) {
341+
// slow query
342+
long elapseMs = System.currentTimeMillis() - ctx.getStartTime();
343+
CatalogIf catalog = ctx.getCurrentCatalog();
344+
String cluster = "";
345+
try {
346+
if (Config.isCloudMode()) {
347+
cluster = ctx.getCloudCluster(false);
348+
}
349+
} catch (ComputeGroupException e) {
350+
LOG.warn("Failed to get cloud cluster", e);
351+
}
352+
353+
AuditEvent.AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
354+
// ATTN: MUST reset, otherwise, the same AuditEventBuilder instance will be used in the next query.
355+
auditEventBuilder.reset();
356+
auditEventBuilder
357+
.setEventType(AuditEvent.EventType.AFTER_QUERY)
358+
.setQueryId(DebugUtil.printId(ctx.queryId()))
359+
.setTimestamp(ctx.getStartTime())
360+
.setClientIp(ctx.getClientIP())
361+
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
362+
.setFeIp(FrontendOptions.getLocalHostAddress())
363+
.setCtl(catalog == null ? InternalCatalog.INTERNAL_CATALOG_NAME : catalog.getName())
364+
.setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
365+
.setState(ctx.getState().toString())
366+
.setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode())
367+
.setErrorMessage((ctx.getState().getErrorMessage() == null ? "" :
368+
ctx.getState().getErrorMessage().replace("\n", " ").replace("\t", " ")))
369+
.setQueryTime(elapseMs)
370+
.setCpuTimeMs(0)
371+
.setPeakMemoryBytes(0)
372+
.setScanBytes(0)
373+
.setScanRows(0)
374+
.setReturnRows(ctx.getReturnRows())
375+
.setSpillWriteBytesToLocalStorage(0)
376+
.setSpillReadBytesFromLocalStorage(0)
377+
.setScanBytesFromLocalStorage(0)
378+
.setScanBytesFromRemoteStorage(0)
379+
.setFuzzyVariables("")
380+
.setCommandType("HttpPlan")
381+
.setStmtType("SELECT")
382+
.setStmtId(ctx.getStmtId())
383+
.setSqlHash(ctx.getSqlHash())
384+
.setIsQuery(true)
385+
.setIsNereids(true)
386+
.setisInternal(false)
387+
.setCloudCluster(Strings.isNullOrEmpty(cluster) ? "UNKNOWN" : cluster)
388+
.setWorkloadGroup(ctx.getWorkloadGroupName());
389+
390+
// sql mode
391+
SessionVariable sessionVariable = ctx.getSessionVariable();
392+
if (sessionVariable != null) {
393+
try {
394+
auditEventBuilder.setSqlMode(SqlModeHelper.decode(sessionVariable.getSqlMode()));
395+
} catch (Exception e) {
396+
LOG.warn("decode sql mode {} failed.", sessionVariable.getSqlMode(), e);
397+
}
398+
}
399+
400+
boolean isSyntaxErr = ctx.getState().getStateType() == QueryState.MysqlStateType.ERR
401+
&& ctx.getState().getErrType() == QueryState.ErrType.SYNTAX_PARSE_ERR;
402+
String encryptSql = isSyntaxErr ? "Syntax Error" : origStmt;
403+
if (isSyntaxErr) {
404+
auditEventBuilder.setErrorMessage("Syntax Error");
405+
}
406+
// We put origin query stmt at the end of audit log, for parsing the log more convenient.
407+
LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
408+
if ((logicalPlan instanceof NeedAuditEncryption)) {
409+
encryptSql = ((NeedAuditEncryption) logicalPlan).geneEncryptionSQL(origStmt);
410+
}
411+
412+
int maxLen = GlobalVariable.auditPluginMaxSqlLength;
413+
encryptSql = AuditLogHelper.truncateByBytes(encryptSql, maxLen,
414+
" ... /* truncated. audit_plugin_max_sql_length=" + maxLen + " */");
415+
auditEventBuilder.setStmt(encryptSql);
416+
417+
if (!Env.getCurrentEnv().isMaster()) {
418+
StmtExecutor executor = ctx.getExecutor();
419+
if (executor != null && executor.isForwardToMaster()) {
420+
auditEventBuilder.setState(executor.getProxyStatus());
421+
int proxyStatusCode = executor.getProxyStatusCode();
422+
if (proxyStatusCode != 0) {
423+
auditEventBuilder.setErrorCode(proxyStatusCode);
424+
auditEventBuilder.setErrorMessage(executor.getProxyErrMsg());
425+
}
426+
}
427+
}
428+
AuditEvent event = auditEventBuilder.build();
429+
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(event);
430+
if (LOG.isDebugEnabled()) {
431+
LOG.debug("submit audit event: {}", event.queryId);
432+
}
433+
}
434+
320435
/**
321436
* acquire all involved (already pruned) tablet routing
322437
*

fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ private static Optional<String> handleInsertStmt(String origStmt, StatementBase
139139
}
140140
}
141141

142-
private static String truncateByBytes(String str, int maxLen, String suffix) {
142+
public static String truncateByBytes(String str, int maxLen, String suffix) {
143143
// use `getBytes().length` to get real byte length
144144
if (maxLen >= str.getBytes().length) {
145145
return str;

0 commit comments

Comments
 (0)