From 092893384621424d4d2aa42f66cd1bd4e4b5bc9e Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Sat, 21 Jun 2025 00:39:53 +0800 Subject: [PATCH 1/2] [opt](audit) use one line in audit log and origin statement in audit table (#52032) Previously, when auditing, we use replace all `\n`, `\t` in origin sql string with `\\n`, `\\t`, so that the sql string can be written in one line. But this lead to some problem: 1. User can not direct use the sql in audit log to execute. 2. Some replacement is wrong, eg, replace the `\n` in a quota string. This PR changes the logic: 1. For audit log, only replace `\n` with `\\n` to keep SQL in one line. 2. For audit table, keep the origin string. 3. Use special column and line separator for audit log load data, to avoid conflict with char in SQL --- .../org/apache/doris/plugin/AuditEvent.java | 112 ++++++++++-------- .../doris/plugin/audit/AuditLoader.java | 66 ++++++----- .../doris/plugin/audit/AuditLogBuilder.java | 6 + .../doris/plugin/audit/AuditStreamLoader.java | 2 + .../org/apache/doris/qe/AuditLogHelper.java | 7 +- .../plugin/audit/AuditLogBuilderTest.java | 24 +--- 6 files changed, 115 insertions(+), 102 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java index 20c05d982f806e..783c32f063b6ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java @@ -42,77 +42,91 @@ public enum EventType { } @Retention(RetentionPolicy.RUNTIME) - public static @interface AuditField { + public @interface AuditField { String value() default ""; + + String colName() default ""; } public EventType type; - // all fields which is about to be audit should be annotated by "@AuditField" + // all fields which is about to be audited should be annotated by "@AuditField" // make them all "public" so that easy to visit. - @AuditField(value = "Timestamp") + + // uuid and time + @AuditField(value = "QueryId", colName = "query_id") + public String queryId = ""; + @AuditField(value = "Timestamp", colName = "time") public long timestamp = -1; - @AuditField(value = "Client") + + // cs info + @AuditField(value = "Client", colName = "client_ip") public String clientIp = ""; - @AuditField(value = "User") + @AuditField(value = "User", colName = "user") public String user = ""; - @AuditField(value = "Ctl") + @AuditField(value = "FeIp", colName = "frontend_ip") + public String feIp = ""; + + // default ctl and db + @AuditField(value = "Ctl", colName = "catalog") public String ctl = ""; - @AuditField(value = "Db") + @AuditField(value = "Db", colName = "db") public String db = ""; - @AuditField(value = "CommandType") - public String commandType = ""; - @AuditField(value = "State") + + // query state + @AuditField(value = "State", colName = "state") public String state = ""; - @AuditField(value = "ErrorCode") + @AuditField(value = "ErrorCode", colName = "error_code") public int errorCode = 0; - @AuditField(value = "ErrorMessage") + @AuditField(value = "ErrorMessage", colName = "error_message") public String errorMessage = ""; - @AuditField(value = "Time(ms)") + + // execution info + @AuditField(value = "Time(ms)", colName = "query_time") public long queryTime = -1; - @AuditField(value = "ScanBytes") + @AuditField(value = "CpuTimeMS", colName = "cpu_time_ms") + public long cpuTimeMs = -1; + @AuditField(value = "PeakMemoryBytes", colName = "peak_memory_bytes") + public long peakMemoryBytes = -1; + @AuditField(value = "ScanBytes", colName = "scan_bytes") public long scanBytes = -1; - @AuditField(value = "ScanRows") + @AuditField(value = "ScanRows", colName = "scan_rows") public long scanRows = -1; - @AuditField(value = "ReturnRows") + @AuditField(value = "ReturnRows", colName = "return_rows") public long returnRows = -1; - @AuditField(value = "StmtId") - public long stmtId = -1; - @AuditField(value = "QueryId") - public String queryId = ""; - @AuditField(value = "IsQuery") - public boolean isQuery = false; - @AuditField(value = "IsNereids") - public boolean isNereids = false; - @AuditField(value = "FeIp") - public String feIp = ""; - @AuditField(value = "StmtType") - public String stmtType = ""; - @AuditField(value = "Stmt") - public String stmt = ""; - @AuditField(value = "CpuTimeMS") - public long cpuTimeMs = -1; - @AuditField(value = "ShuffleSendBytes") - public long shuffleSendBytes = -1; - @AuditField(value = "ShuffleSendRows") + @AuditField(value = "ShuffleSendRows", colName = "shuffle_send_rows") public long shuffleSendRows = -1; - @AuditField(value = "SqlHash") + @AuditField(value = "ShuffleSendBytes", colName = "shuffle_send_bytes") + public long shuffleSendBytes = -1; + + @AuditField(value = "FuzzyVariables") + public String fuzzyVariables = ""; + + // type and digest + @AuditField(value = "CommandType") + public String commandType = ""; + @AuditField(value = "StmtType", colName = "stmt_type") + public String stmtType = ""; + @AuditField(value = "StmtId", colName = "stmt_id") + public long stmtId = -1; + @AuditField(value = "SqlHash", colName = "sql_hash") public String sqlHash = ""; - @AuditField(value = "PeakMemoryBytes") - public long peakMemoryBytes = -1; - @AuditField(value = "SqlDigest") + @AuditField(value = "SqlDigest", colName = "sql_digest") public String sqlDigest = ""; - @AuditField(value = "ComputeGroupName") - public String cloudClusterName = ""; - @AuditField(value = "WorkloadGroup") + @AuditField(value = "IsQuery", colName = "is_query") + public boolean isQuery = false; + @AuditField(value = "IsNereids", colName = "is_nereids") + public boolean isNereids = false; + + // resource + @AuditField(value = "WorkloadGroup", colName = "workload_group") public String workloadGroup = ""; - // note: newly added fields should be always before fuzzyVariables - @AuditField(value = "FuzzyVariables") - public String fuzzyVariables = ""; - @AuditField(value = "ScanBytesFromLocalStorage") - public long scanBytesFromLocalStorage = -1; - @AuditField(value = "ScanBytesFromRemoteStorage") - public long scanBytesFromRemoteStorage = -1; + @AuditField(value = "ComputeGroupName", colName = "compute_group") + public String cloudClusterName = ""; + + // stmt should be last one + @AuditField(value = "Stmt", colName = "stmt") + public String stmt = ""; public long pushToAuditLogQueueTime; diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java index 722ab48669b0d3..c1047bec1b13ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java @@ -46,6 +46,14 @@ public class AuditLoader extends Plugin implements AuditPlugin { public static final String AUDIT_LOG_TABLE = "audit_log"; + // the "\\u001F" and "\\u001E" are used to separate columns and lines in audit log data + public static final String AUDIT_TABLE_COL_SEPARATOR = "\\u001F"; + public static final String AUDIT_TABLE_LINE_DELIMITER = "\\u001E"; + // the "\\x1F" and "\\x1E" are used to specified column and line delimiter in stream load request + // which is corresponding to the "\\u001F" and "\\u001E" in audit log data. + public static final String AUDIT_TABLE_COL_SEPARATOR_STR = "\\x1F"; + public static final String AUDIT_TABLE_LINE_DELIMITER_STR = "\\x1E"; + private StringBuilder auditLogBuffer = new StringBuilder(); private int auditLogNum = 0; private long lastLoadTimeAuditLog = 0; @@ -139,40 +147,40 @@ private void assembleAudit(AuditEvent event) { private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) { // should be same order as InternalSchema.AUDIT_SCHEMA - logBuffer.append(event.queryId).append("\t"); - logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t"); - logBuffer.append(event.clientIp).append("\t"); - logBuffer.append(event.user).append("\t"); - logBuffer.append(event.ctl).append("\t"); - logBuffer.append(event.db).append("\t"); - logBuffer.append(event.state).append("\t"); - logBuffer.append(event.errorCode).append("\t"); - logBuffer.append(event.errorMessage).append("\t"); - logBuffer.append(event.queryTime).append("\t"); - logBuffer.append(event.scanBytes).append("\t"); - logBuffer.append(event.scanRows).append("\t"); - logBuffer.append(event.returnRows).append("\t"); - logBuffer.append(event.shuffleSendRows).append("\t"); - logBuffer.append(event.shuffleSendBytes).append("\t"); - logBuffer.append(event.scanBytesFromLocalStorage).append("\t"); - logBuffer.append(event.scanBytesFromRemoteStorage).append("\t"); - logBuffer.append(event.stmtId).append("\t"); - logBuffer.append(event.stmtType).append("\t"); - logBuffer.append(event.isQuery ? 1 : 0).append("\t"); - logBuffer.append(event.isNereids ? 1 : 0).append("\t"); - logBuffer.append(event.feIp).append("\t"); - logBuffer.append(event.cpuTimeMs).append("\t"); - logBuffer.append(event.sqlHash).append("\t"); - logBuffer.append(event.sqlDigest).append("\t"); - logBuffer.append(event.peakMemoryBytes).append("\t"); - logBuffer.append(event.workloadGroup).append("\t"); - logBuffer.append(event.cloudClusterName).append("\t"); + logBuffer.append(event.queryId).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.clientIp).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.user).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.ctl).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.db).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.state).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.errorCode).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.errorMessage).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.queryTime).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.scanBytes).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.scanRows).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.returnRows).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.shuffleSendRows).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.shuffleSendBytes).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.scanBytesFromLocalStorage).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.scanBytesFromRemoteStorage).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.stmtId).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.stmtType).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.isQuery ? 1 : 0).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.isNereids ? 1 : 0).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.feIp).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.cpuTimeMs).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.sqlHash).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.sqlDigest).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.peakMemoryBytes).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.workloadGroup).append("AUDIT_TABLE_COL_SEPARATOR"); + logBuffer.append(event.cloudClusterName).append("AUDIT_TABLE_COL_SEPARATOR"); // already trim the query in org.apache.doris.qe.AuditLogHelper#logAuditLog String stmt = event.stmt; if (LOG.isDebugEnabled()) { LOG.debug("receive audit event with stmt: {}", stmt); } - logBuffer.append(stmt).append("\n"); + logBuffer.append(stmt).append(AUDIT_TABLE_LINE_DELIMITER); } // public for external call. diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java index 4208d5def2ebac..94d7973f29431a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java @@ -121,6 +121,12 @@ private String getAuditLogString(AuditEvent event) throws IllegalAccessException } } + // replace new line characters with escaped characters to make sure the stmt in one line + if (af.value().equals("Stmt")) { + fieldValue = ((String) fieldValue).replace("\n", "\\n") + .replace("\r", "\\r"); + } + sb.append("|").append(af.value()).append("=").append(fieldValue); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java index 0b70e9591d509d..d2576937d9894d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java @@ -68,6 +68,8 @@ private HttpURLConnection getConnection(String urlStr, String label, String clus InternalSchema.AUDIT_SCHEMA.stream().map(c -> c.getName()).collect( Collectors.joining(","))); conn.addRequestProperty("redirect-policy", "random-be"); + conn.addRequestProperty("column_separator", AuditLoader.AUDIT_TABLE_COL_SEPARATOR_STR); + conn.addRequestProperty("line_delimiter", AuditLoader.AUDIT_TABLE_LINE_DELIMITER_STR); conn.setDoOutput(true); conn.setDoInput(true); return conn; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index f1470d444b91d4..ecfd08aaa710b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -101,9 +101,7 @@ public static String handleStmt(String origStmt, StatementBase parsedStmt) { int maxLen = GlobalVariable.auditPluginMaxSqlLength; origStmt = truncateByBytes(origStmt, maxLen, " ... /* truncated. audit_plugin_max_sql_length=" + maxLen + " */"); - return origStmt.replace("\n", "\\n") - .replace("\t", "\\t") - .replace("\r", "\\r"); + return origStmt; } private static Optional handleInsertStmt(String origStmt, StatementBase parsedStmt) { @@ -134,9 +132,6 @@ private static Optional handleInsertStmt(String origStmt, StatementBase Math.min(GlobalVariable.auditPluginMaxInsertStmtLength, GlobalVariable.auditPluginMaxSqlLength)); origStmt = truncateByBytes(origStmt, maxLen, " ... /* total " + rowCnt + " rows, truncated. audit_plugin_max_insert_stmt_length=" + maxLen + " */"); - origStmt = origStmt.replace("\n", "\\n") - .replace("\t", "\\t") - .replace("\r", "\\r"); return Optional.of(origStmt); } else { return Optional.empty(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java index 8c678447c3a1eb..f3e71c248df06d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java @@ -80,12 +80,9 @@ public void testHandleStmtTruncationForNonInsertStmt() { // 4. Test statement with newlines, tabs, carriage returns String stmtWithSpecialChars = "SELECT *\nFROM table1\tWHERE id = 1\r"; result = AuditLogHelper.handleStmt(stmtWithSpecialChars, nonInsertStmt); - Assert.assertTrue("Should escape newlines", result.contains("\\n")); - Assert.assertTrue("Should escape tabs", result.contains("\\t")); - Assert.assertTrue("Should escape carriage returns", result.contains("\\r")); - Assert.assertFalse("Should not contain actual newlines", result.contains("\n")); - Assert.assertFalse("Should not contain actual tabs", result.contains("\t")); - Assert.assertFalse("Should not contain actual carriage returns", result.contains("\r")); + Assert.assertTrue("Should contain actual newlines", result.contains("\n")); + Assert.assertTrue("Should contain actual tabs", result.contains("\t")); + Assert.assertTrue("Should contain actual carriage returns", result.contains("\r")); // 5. Test long statement with Chinese characters truncation String chineseStmt @@ -118,12 +115,6 @@ public void testHandleStmtTruncationForNonInsertStmt() { String emptyStmt = ""; result = AuditLogHelper.handleStmt(emptyStmt, nonInsertStmt); Assert.assertEquals("Empty string should remain empty", "", result); - - // 9. Test statement with only special characters - String specialCharsStmt = "\n\t\r\n\t\r"; - result = AuditLogHelper.handleStmt(specialCharsStmt, nonInsertStmt); - Assert.assertEquals("Should escape all special characters", "\\n\\t\\r\\n\\t\\r", result); - } finally { // Restore original values GlobalVariable.auditPluginMaxSqlLength = originalMaxSqlLength; @@ -172,12 +163,9 @@ public void testHandleStmtTruncationForInsertStmt() { result = AuditLogHelper.handleStmt(insertWithSpecialChars, insertStmt); // Verify special characters are properly escaped - Assert.assertTrue("Should escape newlines in INSERT", result.contains("\\n")); - Assert.assertTrue("Should escape tabs in INSERT", result.contains("\\t")); - Assert.assertTrue("Should escape carriage returns in INSERT", result.contains("\\r")); - Assert.assertFalse("Should not contain actual newlines", result.contains("\n")); - Assert.assertFalse("Should not contain actual tabs", result.contains("\t")); - Assert.assertFalse("Should not contain actual carriage returns", result.contains("\r")); + Assert.assertTrue("Should contain actual newlines", result.contains("\n")); + Assert.assertTrue("Should contain actual tabs", result.contains("\t")); + Assert.assertTrue("Should contain actual carriage returns", result.contains("\r")); // 4. Test comparison: same length statements, different handling for INSERT vs non-INSERT // Create a statement with length between 80-200 From e9f6af8255c503915a336d68703316ef8a9a883d Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 24 Jun 2025 15:54:34 +0800 Subject: [PATCH 2/2] fix --- .../src/main/java/org/apache/doris/plugin/AuditEvent.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java index 783c32f063b6ff..7a2f48d4e6f7f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java @@ -98,6 +98,10 @@ public enum EventType { public long shuffleSendRows = -1; @AuditField(value = "ShuffleSendBytes", colName = "shuffle_send_bytes") public long shuffleSendBytes = -1; + @AuditField(value = "ScanBytesFromLocalStorage", colName = "scan_bytes_from_local_storage") + public long scanBytesFromLocalStorage = -1; + @AuditField(value = "ScanBytesFromRemoteStorage", colName = "scan_bytes_from_remote_storage") + public long scanBytesFromRemoteStorage = -1; @AuditField(value = "FuzzyVariables") public String fuzzyVariables = "";