diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/TableQueryPlanAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/TableQueryPlanAction.java index 2bb5d48ac06672..c7b2b9e81b436e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/TableQueryPlanAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/TableQueryPlanAction.java @@ -22,10 +22,15 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; +import org.apache.doris.cloud.qe.ComputeGroupException; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.DorisHttpException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.NetUtils; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.rest.manager.HttpUtils; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -36,6 +41,7 @@ import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; +import org.apache.doris.nereids.trees.plans.commands.NeedAuditEncryption; import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; @@ -46,9 +52,15 @@ import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.ScanNode; +import org.apache.doris.plugin.AuditEvent; +import org.apache.doris.qe.AuditLogHelper; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.GlobalVariable; +import org.apache.doris.qe.QueryState; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.SqlModeHelper; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TMemoryScratchSink; @@ -64,6 +76,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -235,6 +248,12 @@ private void handleQuery(ConnectContext context, String requestDb, String reques + "." + tableName + "]"); } } + UUID uuid = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + context.setQueryId(queryId); + context.setStartTime(); + context.setSqlHash(DigestUtils.md5Hex(sql)); + NereidsPlanner nereidsPlanner = new NereidsPlanner(context.getStatementContext()); LogicalPlan rewrittenPlan = (LogicalPlan) nereidsPlanner.planWithLock(parsedPlan, PhysicalProperties.GATHER, ExplainCommand.ExplainLevel.REWRITTEN_PLAN); @@ -282,8 +301,8 @@ private void handleQuery(ConnectContext context, String requestDb, String reques tQueryPlanInfo.plan_fragment = tPlanFragment; tQueryPlanInfo.desc_tbl = planner.getDescTable().toThrift(); // set query_id - UUID uuid = UUID.randomUUID(); - tQueryPlanInfo.query_id = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + tQueryPlanInfo.query_id = queryId; + Map tabletInfo = new HashMap<>(); // acquire resolved tablet distribution @@ -309,6 +328,7 @@ private void handleQuery(ConnectContext context, String requestDb, String reques result.put("partitions", tabletRoutings); result.put("opaqued_query_plan", opaquedQueryPlan); result.put("status", 200); + addToAuditLog(context, sql, query); } finally { if (needSetParallelResultSinkToFalse) { sessionVariable.setParallelResultSink(false); @@ -317,6 +337,101 @@ private void handleQuery(ConnectContext context, String requestDb, String reques } + private static void addToAuditLog(ConnectContext ctx, String origStmt, StatementBase parsedStmt) { + // slow query + long elapseMs = System.currentTimeMillis() - ctx.getStartTime(); + CatalogIf catalog = ctx.getCurrentCatalog(); + String cluster = ""; + try { + if (Config.isCloudMode()) { + cluster = ctx.getCloudCluster(false); + } + } catch (ComputeGroupException e) { + LOG.warn("Failed to get cloud cluster", e); + } + + AuditEvent.AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder(); + // ATTN: MUST reset, otherwise, the same AuditEventBuilder instance will be used in the next query. + auditEventBuilder.reset(); + auditEventBuilder + .setEventType(AuditEvent.EventType.AFTER_QUERY) + .setQueryId(DebugUtil.printId(ctx.queryId())) + .setTimestamp(ctx.getStartTime()) + .setClientIp(ctx.getClientIP()) + .setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser())) + .setFeIp(FrontendOptions.getLocalHostAddress()) + .setCtl(catalog == null ? InternalCatalog.INTERNAL_CATALOG_NAME : catalog.getName()) + .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase())) + .setState(ctx.getState().toString()) + .setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode()) + .setErrorMessage((ctx.getState().getErrorMessage() == null ? "" : + ctx.getState().getErrorMessage().replace("\n", " ").replace("\t", " "))) + .setQueryTime(elapseMs) + .setCpuTimeMs(0) + .setPeakMemoryBytes(0) + .setScanBytes(0) + .setScanRows(0) + .setReturnRows(ctx.getReturnRows()) + .setSpillWriteBytesToLocalStorage(0) + .setSpillReadBytesFromLocalStorage(0) + .setScanBytesFromLocalStorage(0) + .setScanBytesFromRemoteStorage(0) + .setFuzzyVariables("") + .setCommandType("HttpPlan") + .setStmtType("SELECT") + .setStmtId(ctx.getStmtId()) + .setSqlHash(ctx.getSqlHash()) + .setIsQuery(true) + .setIsNereids(true) + .setisInternal(false) + .setCloudCluster(Strings.isNullOrEmpty(cluster) ? "UNKNOWN" : cluster) + .setWorkloadGroup(ctx.getWorkloadGroupName()); + + // sql mode + SessionVariable sessionVariable = ctx.getSessionVariable(); + if (sessionVariable != null) { + try { + auditEventBuilder.setSqlMode(SqlModeHelper.decode(sessionVariable.getSqlMode())); + } catch (Exception e) { + LOG.warn("decode sql mode {} failed.", sessionVariable.getSqlMode(), e); + } + } + + boolean isSyntaxErr = ctx.getState().getStateType() == QueryState.MysqlStateType.ERR + && ctx.getState().getErrType() == QueryState.ErrType.SYNTAX_PARSE_ERR; + String encryptSql = isSyntaxErr ? "Syntax Error" : origStmt; + if (isSyntaxErr) { + auditEventBuilder.setErrorMessage("Syntax Error"); + } + // We put origin query stmt at the end of audit log, for parsing the log more convenient. + LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); + if ((logicalPlan instanceof NeedAuditEncryption)) { + encryptSql = ((NeedAuditEncryption) logicalPlan).geneEncryptionSQL(origStmt); + } + + int maxLen = GlobalVariable.auditPluginMaxSqlLength; + encryptSql = AuditLogHelper.truncateByBytes(encryptSql, maxLen, + " ... /* truncated. audit_plugin_max_sql_length=" + maxLen + " */"); + auditEventBuilder.setStmt(encryptSql); + + if (!Env.getCurrentEnv().isMaster()) { + StmtExecutor executor = ctx.getExecutor(); + if (executor != null && executor.isForwardToMaster()) { + auditEventBuilder.setState(executor.getProxyStatus()); + int proxyStatusCode = executor.getProxyStatusCode(); + if (proxyStatusCode != 0) { + auditEventBuilder.setErrorCode(proxyStatusCode); + auditEventBuilder.setErrorMessage(executor.getProxyErrMsg()); + } + } + } + AuditEvent event = auditEventBuilder.build(); + Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(event); + if (LOG.isDebugEnabled()) { + LOG.debug("submit audit event: {}", event.queryId); + } + } + /** * acquire all involved (already pruned) tablet routing * diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index 536194a8c0d540..e8bb7aaa32b61b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -178,7 +178,7 @@ private static Optional handleInsertStmt(String origStmt, StatementBase } } - private static String truncateByBytes(String str, int maxLen, String suffix) { + public static String truncateByBytes(String str, int maxLen, String suffix) { // use `getBytes().length` to get real byte length if (maxLen >= str.getBytes().length) { return str;