diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java index 8570b12c8b1bd9..b802edad14d79a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java @@ -42,119 +42,125 @@ public enum EventType { } @Retention(RetentionPolicy.RUNTIME) - public static @interface AuditField { + public @interface AuditField { String value() default ""; + + String colName() default ""; } public EventType type; - // all fields which is about to be audit should be annotated by "@AuditField" + // all fields which is about to be audited should be annotated by "@AuditField" // make them all "public" so that easy to visit. // uuid and time - @AuditField(value = "QueryId") + @AuditField(value = "QueryId", colName = "query_id") public String queryId = ""; - @AuditField(value = "Timestamp") + @AuditField(value = "Timestamp", colName = "time") public long timestamp = -1; // cs info - @AuditField(value = "Client") + @AuditField(value = "Client", colName = "client_ip") public String clientIp = ""; - @AuditField(value = "User") + @AuditField(value = "User", colName = "user") public String user = ""; - @AuditField(value = "FeIp") + @AuditField(value = "FeIp", colName = "frontend_ip") public String feIp = ""; // default ctl and db - @AuditField(value = "Ctl") + @AuditField(value = "Ctl", colName = "catalog") public String ctl = ""; - @AuditField(value = "Db") + @AuditField(value = "Db", colName = "db") public String db = ""; // query state - @AuditField(value = "State") + @AuditField(value = "State", colName = "state") public String state = ""; - @AuditField(value = "ErrorCode") + @AuditField(value = "ErrorCode", colName = "error_code") public int errorCode = 0; - @AuditField(value = "ErrorMessage") + @AuditField(value = "ErrorMessage", colName = "error_message") public String errorMessage = ""; // execution info - @AuditField(value = "Time(ms)") + @AuditField(value = "Time(ms)", colName = "query_time") public long queryTime = -1; - @AuditField(value = "CpuTimeMS") + @AuditField(value = "CpuTimeMS", colName = "cpu_time_ms") public long cpuTimeMs = -1; - @AuditField(value = "PeakMemoryBytes") + @AuditField(value = "PeakMemoryBytes", colName = "peak_memory_bytes") public long peakMemoryBytes = -1; - @AuditField(value = "ScanBytes") + @AuditField(value = "ScanBytes", colName = "scan_bytes") public long scanBytes = -1; - @AuditField(value = "ScanRows") + @AuditField(value = "ScanRows", colName = "scan_rows") public long scanRows = -1; - @AuditField(value = "ReturnRows") + @AuditField(value = "ReturnRows", colName = "return_rows") public long returnRows = -1; - @AuditField(value = "ShuffleSendBytes") - public long shuffleSendBytes = -1; - @AuditField(value = "ShuffleSendRows") + @AuditField(value = "ShuffleSendRows", colName = "shuffle_send_rows") public long shuffleSendRows = -1; - @AuditField(value = "ScanBytesFromLocalStorage") + @AuditField(value = "ShuffleSendBytes", colName = "shuffle_send_bytes") + public long shuffleSendBytes = -1; + @AuditField(value = "SpillWriteBytesToLocalStorage", colName = "spill_write_bytes_from_local_storage") + public long spillWriteBytesToLocalStorage = -1; + @AuditField(value = "SpillReadBytesFromLocalStorage", colName = "spill_read_bytes_from_local_storage") + public long spillReadBytesFromLocalStorage = -1; + @AuditField(value = "ScanBytesFromLocalStorage", colName = "scan_bytes_from_local_storage") public long scanBytesFromLocalStorage = -1; - @AuditField(value = "ScanBytesFromRemoteStorage") + @AuditField(value = "ScanBytesFromRemoteStorage", colName = "scan_bytes_from_remote_storage") public long scanBytesFromRemoteStorage = -1; // plan info - @AuditField(value = "ParseTimeMs") + @AuditField(value = "ParseTimeMs", colName = "parse_time_ms") public int parseTimeMs = -1; - @AuditField(value = "PlanTimesMs") + @AuditField(value = "PlanTimesMs", colName = "plan_times_ms") public String planTimesMs = ""; - @AuditField(value = "GetMetaTimesMs") + @AuditField(value = "GetMetaTimesMs", colName = "get_meta_times_ms") public String getMetaTimesMs = ""; - @AuditField(value = "ScheduleTimesMs") + @AuditField(value = "ScheduleTimesMs", colName = "schedule_times_ms") public String scheduleTimesMs = ""; - @AuditField(value = "HitSqlCache") + @AuditField(value = "HitSqlCache", colName = "hit_sql_cache") public boolean hitSqlCache = false; - @AuditField(value = "isHandledInFe") + @AuditField(value = "isHandledInFe", colName = "handled_in_fe") public boolean isHandledInFe = false; // table, view, m-view - @AuditField(value = "queriedTablesAndViews") + @AuditField(value = "queriedTablesAndViews", colName = "queried_tables_and_views") public String queriedTablesAndViews = ""; - @AuditField(value = "chosenMViews") + @AuditField(value = "chosenMViews", colName = "chosen_m_views") public String chosenMViews = ""; // variable and configs - @AuditField(value = "ChangedVariables") + @AuditField(value = "ChangedVariables", colName = "changed_variables") public String changedVariables = ""; @AuditField(value = "FuzzyVariables") public String fuzzyVariables = ""; - @AuditField(value = "SqlMode") + @AuditField(value = "SqlMode", colName = "sql_mode") public String sqlMode = ""; // type and digest @AuditField(value = "CommandType") public String commandType = ""; - @AuditField(value = "StmtType") + @AuditField(value = "StmtType", colName = "stmt_type") public String stmtType = ""; - @AuditField(value = "StmtId") + @AuditField(value = "StmtId", colName = "stmt_id") public long stmtId = -1; - @AuditField(value = "SqlHash") + @AuditField(value = "SqlHash", colName = "sql_hash") public String sqlHash = ""; - @AuditField(value = "SqlDigest") + @AuditField(value = "SqlDigest", colName = "sql_digest") public String sqlDigest = ""; - @AuditField(value = "IsQuery") + @AuditField(value = "IsQuery", colName = "is_query") public boolean isQuery = false; - @AuditField(value = "IsNereids") + @AuditField(value = "IsNereids", colName = "is_nereids") public boolean isNereids = false; - @AuditField(value = "IsInternal") + @AuditField(value = "IsInternal", colName = "is_internal") public boolean isInternal = false; // resource - @AuditField(value = "ComputeGroupName") - public String cloudClusterName = ""; - @AuditField(value = "WorkloadGroup") + @AuditField(value = "WorkloadGroup", colName = "workload_group") public String workloadGroup = ""; + @AuditField(value = "ComputeGroupName", colName = "compute_group") + public String cloudClusterName = ""; // stmt should be last one - @AuditField(value = "Stmt") + @AuditField(value = "Stmt", colName = "stmt") public String stmt = ""; public long pushToAuditLogQueueTime; diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java index 45e5052353be3f..8f393cdc27bc46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java @@ -46,6 +46,14 @@ public class AuditLoader extends Plugin implements AuditPlugin { public static final String AUDIT_LOG_TABLE = "audit_log"; + // the "0x1F" and "0x1E" are used to separate columns and lines in audit log data + public static final char AUDIT_TABLE_COL_SEPARATOR = 0x1F; + public static final char AUDIT_TABLE_LINE_DELIMITER = 0x1E; + // the "\\x1F" and "\\x1E" are used to specified column and line delimiter in stream load request + // which is corresponding to the "\\u001F" and "\\u001E" in audit log data. + public static final String AUDIT_TABLE_COL_SEPARATOR_STR = "\\x1F"; + public static final String AUDIT_TABLE_LINE_DELIMITER_STR = "\\x1E"; + private StringBuilder auditLogBuffer = new StringBuilder(); private int auditLogNum = 0; private long lastLoadTimeAuditLog = 0; @@ -141,71 +149,71 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) { // should be same order as InternalSchema.AUDIT_SCHEMA // uuid and time - logBuffer.append(event.queryId).append("\t"); - logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t"); + logBuffer.append(event.queryId).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append(AUDIT_TABLE_COL_SEPARATOR); // cs info - logBuffer.append(event.clientIp).append("\t"); - logBuffer.append(event.user).append("\t"); - logBuffer.append(event.feIp).append("\t"); + logBuffer.append(event.clientIp).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.user).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.feIp).append(AUDIT_TABLE_COL_SEPARATOR); // default ctl and db - logBuffer.append(event.ctl).append("\t"); - logBuffer.append(event.db).append("\t"); + logBuffer.append(event.ctl).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.db).append(AUDIT_TABLE_COL_SEPARATOR); // query state - logBuffer.append(event.state).append("\t"); - logBuffer.append(event.errorCode).append("\t"); - logBuffer.append(event.errorMessage).append("\t"); + logBuffer.append(event.state).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.errorCode).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.errorMessage).append(AUDIT_TABLE_COL_SEPARATOR); // execution info - logBuffer.append(event.queryTime).append("\t"); - logBuffer.append(event.cpuTimeMs).append("\t"); - logBuffer.append(event.peakMemoryBytes).append("\t"); - logBuffer.append(event.scanBytes).append("\t"); - logBuffer.append(event.scanRows).append("\t"); - logBuffer.append(event.returnRows).append("\t"); - logBuffer.append(event.shuffleSendRows).append("\t"); - logBuffer.append(event.shuffleSendBytes).append("\t"); - logBuffer.append(event.scanBytesFromLocalStorage).append("\t"); - logBuffer.append(event.scanBytesFromRemoteStorage).append("\t"); + logBuffer.append(event.queryTime).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.cpuTimeMs).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.peakMemoryBytes).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.scanBytes).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.scanRows).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.returnRows).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.shuffleSendRows).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.shuffleSendBytes).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.scanBytesFromLocalStorage).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.scanBytesFromRemoteStorage).append(AUDIT_TABLE_COL_SEPARATOR); // plan info - logBuffer.append(event.parseTimeMs).append("\t"); - logBuffer.append(event.planTimesMs).append("\t"); - logBuffer.append(event.getMetaTimesMs).append("\t"); - logBuffer.append(event.scheduleTimesMs).append("\t"); - logBuffer.append(event.hitSqlCache ? 1 : 0).append("\t"); - logBuffer.append(event.isHandledInFe ? 1 : 0).append("\t"); + logBuffer.append(event.parseTimeMs).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.planTimesMs).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.getMetaTimesMs).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.scheduleTimesMs).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.hitSqlCache ? 1 : 0).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.isHandledInFe ? 1 : 0).append(AUDIT_TABLE_COL_SEPARATOR); // queried tables, views and m-views - logBuffer.append(event.queriedTablesAndViews).append("\t"); - logBuffer.append(event.chosenMViews).append("\t"); + logBuffer.append(event.queriedTablesAndViews).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.chosenMViews).append(AUDIT_TABLE_COL_SEPARATOR); // variable and configs - logBuffer.append(event.changedVariables).append("\t"); - logBuffer.append(event.sqlMode).append("\t"); + logBuffer.append(event.changedVariables).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.sqlMode).append(AUDIT_TABLE_COL_SEPARATOR); // type and digest - logBuffer.append(event.stmtType).append("\t"); - logBuffer.append(event.stmtId).append("\t"); - logBuffer.append(event.sqlHash).append("\t"); - logBuffer.append(event.sqlDigest).append("\t"); - logBuffer.append(event.isQuery ? 1 : 0).append("\t"); - logBuffer.append(event.isNereids ? 1 : 0).append("\t"); - logBuffer.append(event.isInternal ? 1 : 0).append("\t"); + logBuffer.append(event.stmtType).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.stmtId).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.sqlHash).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.sqlDigest).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.isQuery ? 1 : 0).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.isNereids ? 1 : 0).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.isInternal ? 1 : 0).append(AUDIT_TABLE_COL_SEPARATOR); // resource - logBuffer.append(event.workloadGroup).append("\t"); - logBuffer.append(event.cloudClusterName).append("\t"); + logBuffer.append(event.workloadGroup).append(AUDIT_TABLE_COL_SEPARATOR); + logBuffer.append(event.cloudClusterName).append(AUDIT_TABLE_COL_SEPARATOR); // already trim the query in org.apache.doris.qe.AuditLogHelper#logAuditLog String stmt = event.stmt; if (LOG.isDebugEnabled()) { LOG.debug("receive audit event with stmt: {}", stmt); } - logBuffer.append(stmt).append("\n"); + logBuffer.append(stmt).append(AUDIT_TABLE_LINE_DELIMITER); } // public for external call. diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java index a6b9062bbea48d..64410fa5c22469 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java @@ -121,6 +121,12 @@ private String getAuditLogString(AuditEvent event) throws IllegalAccessException } } + // replace new line characters with escaped characters to make sure the stmt in one line + if (af.value().equals("Stmt")) { + fieldValue = ((String) fieldValue).replace("\n", "\\n") + .replace("\r", "\\r"); + } + sb.append("|").append(af.value()).append("=").append(fieldValue); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java index 0b70e9591d509d..d2576937d9894d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java @@ -68,6 +68,8 @@ private HttpURLConnection getConnection(String urlStr, String label, String clus InternalSchema.AUDIT_SCHEMA.stream().map(c -> c.getName()).collect( Collectors.joining(","))); conn.addRequestProperty("redirect-policy", "random-be"); + conn.addRequestProperty("column_separator", AuditLoader.AUDIT_TABLE_COL_SEPARATOR_STR); + conn.addRequestProperty("line_delimiter", AuditLoader.AUDIT_TABLE_LINE_DELIMITER_STR); conn.setDoOutput(true); conn.setDoInput(true); return conn; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index 11ad11bb7eb2e9..fb8b227b8b7d89 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -116,9 +116,7 @@ public static String handleStmt(String origStmt, StatementBase parsedStmt) { int maxLen = GlobalVariable.auditPluginMaxSqlLength; origStmt = truncateByBytes(origStmt, maxLen, " ... /* truncated. audit_plugin_max_sql_length=" + maxLen + " */"); - return origStmt.replace("\n", "\\n") - .replace("\t", "\\t") - .replace("\r", "\\r"); + return origStmt; } private static Optional handleInsertStmt(String origStmt, StatementBase parsedStmt) { @@ -149,9 +147,6 @@ private static Optional handleInsertStmt(String origStmt, StatementBase Math.min(GlobalVariable.auditPluginMaxInsertStmtLength, GlobalVariable.auditPluginMaxSqlLength)); origStmt = truncateByBytes(origStmt, maxLen, " ... /* total " + rowCnt + " rows, truncated. audit_plugin_max_insert_stmt_length=" + maxLen + " */"); - origStmt = origStmt.replace("\n", "\\n") - .replace("\t", "\\t") - .replace("\r", "\\r"); return Optional.of(origStmt); } else { return Optional.empty(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java index 8c678447c3a1eb..f3e71c248df06d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java @@ -80,12 +80,9 @@ public void testHandleStmtTruncationForNonInsertStmt() { // 4. Test statement with newlines, tabs, carriage returns String stmtWithSpecialChars = "SELECT *\nFROM table1\tWHERE id = 1\r"; result = AuditLogHelper.handleStmt(stmtWithSpecialChars, nonInsertStmt); - Assert.assertTrue("Should escape newlines", result.contains("\\n")); - Assert.assertTrue("Should escape tabs", result.contains("\\t")); - Assert.assertTrue("Should escape carriage returns", result.contains("\\r")); - Assert.assertFalse("Should not contain actual newlines", result.contains("\n")); - Assert.assertFalse("Should not contain actual tabs", result.contains("\t")); - Assert.assertFalse("Should not contain actual carriage returns", result.contains("\r")); + Assert.assertTrue("Should contain actual newlines", result.contains("\n")); + Assert.assertTrue("Should contain actual tabs", result.contains("\t")); + Assert.assertTrue("Should contain actual carriage returns", result.contains("\r")); // 5. Test long statement with Chinese characters truncation String chineseStmt @@ -118,12 +115,6 @@ public void testHandleStmtTruncationForNonInsertStmt() { String emptyStmt = ""; result = AuditLogHelper.handleStmt(emptyStmt, nonInsertStmt); Assert.assertEquals("Empty string should remain empty", "", result); - - // 9. Test statement with only special characters - String specialCharsStmt = "\n\t\r\n\t\r"; - result = AuditLogHelper.handleStmt(specialCharsStmt, nonInsertStmt); - Assert.assertEquals("Should escape all special characters", "\\n\\t\\r\\n\\t\\r", result); - } finally { // Restore original values GlobalVariable.auditPluginMaxSqlLength = originalMaxSqlLength; @@ -172,12 +163,9 @@ public void testHandleStmtTruncationForInsertStmt() { result = AuditLogHelper.handleStmt(insertWithSpecialChars, insertStmt); // Verify special characters are properly escaped - Assert.assertTrue("Should escape newlines in INSERT", result.contains("\\n")); - Assert.assertTrue("Should escape tabs in INSERT", result.contains("\\t")); - Assert.assertTrue("Should escape carriage returns in INSERT", result.contains("\\r")); - Assert.assertFalse("Should not contain actual newlines", result.contains("\n")); - Assert.assertFalse("Should not contain actual tabs", result.contains("\t")); - Assert.assertFalse("Should not contain actual carriage returns", result.contains("\r")); + Assert.assertTrue("Should contain actual newlines", result.contains("\n")); + Assert.assertTrue("Should contain actual tabs", result.contains("\t")); + Assert.assertTrue("Should contain actual carriage returns", result.contains("\r")); // 4. Test comparison: same length statements, different handling for INSERT vs non-INSERT // Create a statement with length between 80-200