Skip to content

Commit 456d2b2

Browse files
committed
[feature](QueryPlanAction)add sql received from table query plan action into audit log
1 parent 24d8cf8 commit 456d2b2

File tree

2 files changed

+119
-3
lines changed

2 files changed

+119
-3
lines changed

fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/TableQueryPlanAction.java

Lines changed: 118 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,15 @@
2222
import org.apache.doris.catalog.Env;
2323
import org.apache.doris.catalog.Table;
2424
import org.apache.doris.catalog.TableIf;
25+
import org.apache.doris.cloud.qe.ComputeGroupException;
26+
import org.apache.doris.cluster.ClusterNamespace;
2527
import org.apache.doris.common.Config;
2628
import org.apache.doris.common.DorisHttpException;
2729
import org.apache.doris.common.MetaNotFoundException;
30+
import org.apache.doris.common.util.DebugUtil;
2831
import org.apache.doris.common.util.NetUtils;
32+
import org.apache.doris.datasource.CatalogIf;
33+
import org.apache.doris.datasource.InternalCatalog;
2934
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
3035
import org.apache.doris.httpv2.rest.manager.HttpUtils;
3136
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -36,6 +41,7 @@
3641
import org.apache.doris.nereids.properties.PhysicalProperties;
3742
import org.apache.doris.nereids.trees.plans.commands.Command;
3843
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
44+
import org.apache.doris.nereids.trees.plans.commands.NeedAuditEncryption;
3945
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
4046
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
4147
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
@@ -46,9 +52,15 @@
4652
import org.apache.doris.nereids.util.RelationUtil;
4753
import org.apache.doris.planner.PlanFragment;
4854
import org.apache.doris.planner.ScanNode;
55+
import org.apache.doris.plugin.AuditEvent;
56+
import org.apache.doris.qe.AuditLogHelper;
4957
import org.apache.doris.qe.ConnectContext;
5058
import org.apache.doris.qe.GlobalVariable;
59+
import org.apache.doris.qe.QueryState;
5160
import org.apache.doris.qe.SessionVariable;
61+
import org.apache.doris.qe.SqlModeHelper;
62+
import org.apache.doris.qe.StmtExecutor;
63+
import org.apache.doris.service.FrontendOptions;
5264
import org.apache.doris.thrift.TDataSink;
5365
import org.apache.doris.thrift.TDataSinkType;
5466
import org.apache.doris.thrift.TMemoryScratchSink;
@@ -64,6 +76,7 @@
6476
import io.netty.handler.codec.http.HttpResponseStatus;
6577
import jakarta.servlet.http.HttpServletRequest;
6678
import jakarta.servlet.http.HttpServletResponse;
79+
import org.apache.commons.codec.digest.DigestUtils;
6780
import org.apache.logging.log4j.LogManager;
6881
import org.apache.logging.log4j.Logger;
6982
import org.apache.thrift.TException;
@@ -235,6 +248,13 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
235248
+ "." + tableName + "]");
236249
}
237250
}
251+
UUID uuid = UUID.randomUUID();
252+
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
253+
context.setQueryId(queryId);
254+
context.setStartTime();
255+
context.setSqlHash(DigestUtils.md5Hex(sql));
256+
context.getState().setIsQuery(true);
257+
238258
NereidsPlanner nereidsPlanner = new NereidsPlanner(context.getStatementContext());
239259
LogicalPlan rewrittenPlan = (LogicalPlan) nereidsPlanner.planWithLock(parsedPlan,
240260
PhysicalProperties.GATHER, ExplainCommand.ExplainLevel.REWRITTEN_PLAN);
@@ -282,8 +302,8 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
282302
tQueryPlanInfo.plan_fragment = tPlanFragment;
283303
tQueryPlanInfo.desc_tbl = planner.getDescTable().toThrift();
284304
// set query_id
285-
UUID uuid = UUID.randomUUID();
286-
tQueryPlanInfo.query_id = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
305+
tQueryPlanInfo.query_id = queryId;
306+
287307

288308
Map<Long, TTabletVersionInfo> tabletInfo = new HashMap<>();
289309
// acquire resolved tablet distribution
@@ -309,6 +329,7 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
309329
result.put("partitions", tabletRoutings);
310330
result.put("opaqued_query_plan", opaquedQueryPlan);
311331
result.put("status", 200);
332+
addToAuditLog(context, sql, query);
312333
} finally {
313334
if (needSetParallelResultSinkToFalse) {
314335
sessionVariable.setParallelResultSink(false);
@@ -317,6 +338,101 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
317338

318339
}
319340

341+
private static void addToAuditLog(ConnectContext ctx, String origStmt, StatementBase parsedStmt) {
342+
// slow query
343+
long elapseMs = System.currentTimeMillis() - ctx.getStartTime();
344+
CatalogIf catalog = ctx.getCurrentCatalog();
345+
String cluster = "";
346+
try {
347+
if (Config.isCloudMode()) {
348+
cluster = ctx.getCloudCluster(false);
349+
}
350+
} catch (ComputeGroupException e) {
351+
LOG.warn("Failed to get cloud cluster", e);
352+
}
353+
354+
AuditEvent.AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
355+
// ATTN: MUST reset, otherwise, the same AuditEventBuilder instance will be used in the next query.
356+
auditEventBuilder.reset();
357+
auditEventBuilder
358+
.setEventType(AuditEvent.EventType.AFTER_QUERY)
359+
.setQueryId(DebugUtil.printId(ctx.queryId()))
360+
.setTimestamp(ctx.getStartTime())
361+
.setClientIp(ctx.getClientIP())
362+
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
363+
.setFeIp(FrontendOptions.getLocalHostAddress())
364+
.setCtl(catalog == null ? InternalCatalog.INTERNAL_CATALOG_NAME : catalog.getName())
365+
.setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
366+
.setState(ctx.getState().toString())
367+
.setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode())
368+
.setErrorMessage((ctx.getState().getErrorMessage() == null ? "" :
369+
ctx.getState().getErrorMessage().replace("\n", " ").replace("\t", " ")))
370+
.setQueryTime(elapseMs)
371+
.setCpuTimeMs(0)
372+
.setPeakMemoryBytes(0)
373+
.setScanBytes(0)
374+
.setScanRows(0)
375+
.setReturnRows(ctx.getReturnRows())
376+
.setSpillWriteBytesToLocalStorage(0)
377+
.setSpillReadBytesFromLocalStorage(0)
378+
.setScanBytesFromLocalStorage(0)
379+
.setScanBytesFromRemoteStorage(0)
380+
.setFuzzyVariables("")
381+
.setCommandType("HttpPlan")
382+
.setStmtType("SELECT")
383+
.setStmtId(ctx.getStmtId())
384+
.setSqlHash(ctx.getSqlHash())
385+
.setIsQuery(true)
386+
.setIsNereids(false)
387+
.setisInternal(false)
388+
.setCloudCluster(Strings.isNullOrEmpty(cluster) ? "UNKNOWN" : cluster)
389+
.setWorkloadGroup(ctx.getWorkloadGroupName());
390+
391+
// sql mode
392+
SessionVariable sessionVariable = ctx.getSessionVariable();
393+
if (sessionVariable != null) {
394+
try {
395+
auditEventBuilder.setSqlMode(SqlModeHelper.decode(sessionVariable.getSqlMode()));
396+
} catch (Exception e) {
397+
LOG.warn("decode sql mode {} failed.", sessionVariable.getSqlMode(), e);
398+
}
399+
}
400+
401+
boolean isSyntaxErr = ctx.getState().getStateType() == QueryState.MysqlStateType.ERR
402+
&& ctx.getState().getErrType() == QueryState.ErrType.SYNTAX_PARSE_ERR;
403+
String encryptSql = isSyntaxErr ? "Syntax Error" : origStmt;
404+
if (isSyntaxErr) {
405+
auditEventBuilder.setErrorMessage("Syntax Error");
406+
}
407+
// We put origin query stmt at the end of audit log, for parsing the log more convenient.
408+
LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
409+
if ((logicalPlan instanceof NeedAuditEncryption)) {
410+
encryptSql = ((NeedAuditEncryption) logicalPlan).geneEncryptionSQL(origStmt);
411+
}
412+
413+
int maxLen = GlobalVariable.auditPluginMaxSqlLength;
414+
encryptSql = AuditLogHelper.truncateByBytes(encryptSql, maxLen,
415+
" ... /* truncated. audit_plugin_max_sql_length=" + maxLen + " */");
416+
auditEventBuilder.setStmt(encryptSql);
417+
418+
if (!Env.getCurrentEnv().isMaster()) {
419+
StmtExecutor executor = ctx.getExecutor();
420+
if (executor != null && executor.isForwardToMaster()) {
421+
auditEventBuilder.setState(executor.getProxyStatus());
422+
int proxyStatusCode = executor.getProxyStatusCode();
423+
if (proxyStatusCode != 0) {
424+
auditEventBuilder.setErrorCode(proxyStatusCode);
425+
auditEventBuilder.setErrorMessage(executor.getProxyErrMsg());
426+
}
427+
}
428+
}
429+
AuditEvent event = auditEventBuilder.build();
430+
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(event);
431+
if (LOG.isDebugEnabled()) {
432+
LOG.debug("submit audit event: {}", event.queryId);
433+
}
434+
}
435+
320436
/**
321437
* acquire all involved (already pruned) tablet routing
322438
*

fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ private static Optional<String> handleInsertStmt(String origStmt, StatementBase
178178
}
179179
}
180180

181-
private static String truncateByBytes(String str, int maxLen, String suffix) {
181+
public static String truncateByBytes(String str, int maxLen, String suffix) {
182182
// use `getBytes().length` to get real byte length
183183
if (maxLen >= str.getBytes().length) {
184184
return str;

0 commit comments

Comments
 (0)