Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,27 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.DorisHttpException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.manager.HttpUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TMemoryScratchSink;
Expand All @@ -53,6 +61,7 @@

import com.google.common.base.Strings;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -216,6 +225,12 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
}
}

UUID uuid = UUID.randomUUID();
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
context.setQueryId(queryId);
context.setStartTime();
context.setSqlHash(DigestUtils.md5Hex(sql));

// acquired Planner to get PlanNode and fragment templates
Planner planner = stmtExecutor.planner();
// acquire ScanNode to obtain pruned tablet
Expand Down Expand Up @@ -247,8 +262,7 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
tQueryPlanInfo.plan_fragment = tPlanFragment;
tQueryPlanInfo.desc_tbl = query.getAnalyzer().getDescTbl().toThrift();
// set query_id
UUID uuid = UUID.randomUUID();
tQueryPlanInfo.query_id = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
tQueryPlanInfo.query_id = queryId;

Map<Long, TTabletVersionInfo> tabletInfo = new HashMap<>();
// acquire resolved tablet distribution
Expand All @@ -273,6 +287,72 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
result.put("partitions", tabletRoutings);
result.put("opaqued_query_plan", opaquedQueryPlan);
result.put("status", 200);
addToAuditLog(context, sql, query);
}

private static void addToAuditLog(ConnectContext ctx, String origStmt, StatementBase parsedStmt) {
// slow query
long elapseMs = System.currentTimeMillis() - ctx.getStartTime();
CatalogIf catalog = ctx.getCurrentCatalog();
AuditEvent.AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
// ATTN: MUST reset, otherwise, the same AuditEventBuilder instance will be used in the next query.
auditEventBuilder.reset();
auditEventBuilder
.setEventType(AuditEvent.EventType.AFTER_QUERY)
.setQueryId(DebugUtil.printId(ctx.queryId()))
.setTimestamp(ctx.getStartTime())
.setClientIp(ctx.getClientIP())
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
.setFeIp(FrontendOptions.getLocalHostAddress())
.setCtl(catalog == null ? InternalCatalog.INTERNAL_CATALOG_NAME : catalog.getName())
.setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
.setState(ctx.getState().toString())
.setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode())
.setErrorMessage((ctx.getState().getErrorMessage() == null ? "" :
ctx.getState().getErrorMessage().replace("\n", " ").replace("\t", " ")))
.setQueryTime(elapseMs)
.setCpuTimeMs(0)
.setPeakMemoryBytes(0)
.setScanBytes(0)
.setScanRows(0)
.setReturnRows(ctx.getReturnRows())
.setScanBytesFromLocalStorage(0)
.setScanBytesFromRemoteStorage(0)
.setFuzzyVariables("")
.setCommandType("HttpPlan")
.setStmtId(ctx.getStmtId())
.setSqlHash(ctx.getSqlHash())
.setIsQuery(true)
.setIsNereids(false)
.setWorkloadGroup(ctx.getWorkloadGroupName());

boolean isSyntaxErr = ctx.getState().getStateType() == QueryState.MysqlStateType.ERR
&& ctx.getState().getErrType() == QueryState.ErrType.ANALYSIS_ERR;
String encryptSql = isSyntaxErr ? "Syntax Error" : origStmt;
if (isSyntaxErr) {
auditEventBuilder.setErrorMessage("Syntax Error");
}
int maxLen = GlobalVariable.auditPluginMaxSqlLength;
encryptSql = AuditLogHelper.truncateByBytes(encryptSql, maxLen,
" ... /* truncated. audit_plugin_max_sql_length=" + maxLen + " */");
auditEventBuilder.setStmt(encryptSql);

if (!Env.getCurrentEnv().isMaster()) {
StmtExecutor executor = ctx.getExecutor();
if (executor != null && executor.isForwardToMaster()) {
auditEventBuilder.setState(executor.getProxyStatus());
int proxyStatusCode = executor.getProxyStatusCode();
if (proxyStatusCode != 0) {
auditEventBuilder.setErrorCode(proxyStatusCode);
auditEventBuilder.setErrorMessage(executor.getProxyErrMsg());
}
}
}
AuditEvent event = auditEventBuilder.build();
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(event);
if (LOG.isDebugEnabled()) {
LOG.debug("submit audit event: {}", event.queryId);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private static Optional<String> handleInsertStmt(String origStmt, StatementBase
}
}

private static String truncateByBytes(String str, int maxLen, String suffix) {
public static String truncateByBytes(String str, int maxLen, String suffix) {
// use `getBytes().length` to get real byte length
if (maxLen >= str.getBytes().length) {
return str;
Expand Down
Loading