Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 48 additions & 46 deletions fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,123 +42,125 @@ public enum EventType {
}

@Retention(RetentionPolicy.RUNTIME)
public static @interface AuditField {
public @interface AuditField {
String value() default "";

String colName() default "";
}

public EventType type;

// all fields which is about to be audit should be annotated by "@AuditField"
// all fields which is about to be audited should be annotated by "@AuditField"
// make them all "public" so that easy to visit.

// uuid and time
@AuditField(value = "QueryId")
@AuditField(value = "QueryId", colName = "query_id")
public String queryId = "";
@AuditField(value = "Timestamp")
@AuditField(value = "Timestamp", colName = "time")
public long timestamp = -1;

// cs info
@AuditField(value = "Client")
@AuditField(value = "Client", colName = "client_ip")
public String clientIp = "";
@AuditField(value = "User")
@AuditField(value = "User", colName = "user")
public String user = "";
@AuditField(value = "FeIp")
@AuditField(value = "FeIp", colName = "frontend_ip")
public String feIp = "";

// default ctl and db
@AuditField(value = "Ctl")
@AuditField(value = "Ctl", colName = "catalog")
public String ctl = "";
@AuditField(value = "Db")
@AuditField(value = "Db", colName = "db")
public String db = "";

// query state
@AuditField(value = "State")
@AuditField(value = "State", colName = "state")
public String state = "";
@AuditField(value = "ErrorCode")
@AuditField(value = "ErrorCode", colName = "error_code")
public int errorCode = 0;
@AuditField(value = "ErrorMessage")
@AuditField(value = "ErrorMessage", colName = "error_message")
public String errorMessage = "";

// execution info
@AuditField(value = "Time(ms)")
@AuditField(value = "Time(ms)", colName = "query_time")
public long queryTime = -1;
@AuditField(value = "CpuTimeMS")
@AuditField(value = "CpuTimeMS", colName = "cpu_time_ms")
public long cpuTimeMs = -1;
@AuditField(value = "PeakMemoryBytes")
@AuditField(value = "PeakMemoryBytes", colName = "peak_memory_bytes")
public long peakMemoryBytes = -1;
@AuditField(value = "ScanBytes")
@AuditField(value = "ScanBytes", colName = "scan_bytes")
public long scanBytes = -1;
@AuditField(value = "ScanRows")
@AuditField(value = "ScanRows", colName = "scan_rows")
public long scanRows = -1;
@AuditField(value = "ReturnRows")
@AuditField(value = "ReturnRows", colName = "return_rows")
public long returnRows = -1;
@AuditField(value = "ShuffleSendBytes")
public long shuffleSendBytes = -1;
@AuditField(value = "ShuffleSendRows")
@AuditField(value = "ShuffleSendRows", colName = "shuffle_send_rows")
public long shuffleSendRows = -1;
@AuditField(value = "SpillWriteBytesToLocalStorage")
@AuditField(value = "ShuffleSendBytes", colName = "shuffle_send_bytes")
public long shuffleSendBytes = -1;
@AuditField(value = "SpillWriteBytesToLocalStorage", colName = "spill_write_bytes_from_local_storage")
public long spillWriteBytesToLocalStorage = -1;
@AuditField(value = "SpillReadBytesFromLocalStorage")
@AuditField(value = "SpillReadBytesFromLocalStorage", colName = "spill_read_bytes_from_local_storage")
public long spillReadBytesFromLocalStorage = -1;
@AuditField(value = "ScanBytesFromLocalStorage")
@AuditField(value = "ScanBytesFromLocalStorage", colName = "scan_bytes_from_local_storage")
public long scanBytesFromLocalStorage = -1;
@AuditField(value = "ScanBytesFromRemoteStorage")
@AuditField(value = "ScanBytesFromRemoteStorage", colName = "scan_bytes_from_remote_storage")
public long scanBytesFromRemoteStorage = -1;

// plan info
@AuditField(value = "ParseTimeMs")
@AuditField(value = "ParseTimeMs", colName = "parse_time_ms")
public int parseTimeMs = -1;
@AuditField(value = "PlanTimesMs")
@AuditField(value = "PlanTimesMs", colName = "plan_times_ms")
public String planTimesMs = "";
@AuditField(value = "GetMetaTimesMs")
@AuditField(value = "GetMetaTimesMs", colName = "get_meta_times_ms")
public String getMetaTimesMs = "";
@AuditField(value = "ScheduleTimesMs")
@AuditField(value = "ScheduleTimesMs", colName = "schedule_times_ms")
public String scheduleTimesMs = "";
@AuditField(value = "HitSqlCache")
@AuditField(value = "HitSqlCache", colName = "hit_sql_cache")
public boolean hitSqlCache = false;
@AuditField(value = "isHandledInFe")
@AuditField(value = "isHandledInFe", colName = "handled_in_fe")
public boolean isHandledInFe = false;

// table, view, m-view
@AuditField(value = "queriedTablesAndViews")
@AuditField(value = "queriedTablesAndViews", colName = "queried_tables_and_views")
public String queriedTablesAndViews = "";
@AuditField(value = "chosenMViews")
@AuditField(value = "chosenMViews", colName = "chosen_m_views")
public String chosenMViews = "";

// variable and configs
@AuditField(value = "ChangedVariables")
@AuditField(value = "ChangedVariables", colName = "changed_variables")
public String changedVariables = "";
@AuditField(value = "FuzzyVariables")
public String fuzzyVariables = "";
@AuditField(value = "SqlMode")
@AuditField(value = "SqlMode", colName = "sql_mode")
public String sqlMode = "";

// type and digest
@AuditField(value = "CommandType")
public String commandType = "";
@AuditField(value = "StmtType")
@AuditField(value = "StmtType", colName = "stmt_type")
public String stmtType = "";
@AuditField(value = "StmtId")
@AuditField(value = "StmtId", colName = "stmt_id")
public long stmtId = -1;
@AuditField(value = "SqlHash")
@AuditField(value = "SqlHash", colName = "sql_hash")
public String sqlHash = "";
@AuditField(value = "SqlDigest")
@AuditField(value = "SqlDigest", colName = "sql_digest")
public String sqlDigest = "";
@AuditField(value = "IsQuery")
@AuditField(value = "IsQuery", colName = "is_query")
public boolean isQuery = false;
@AuditField(value = "IsNereids")
@AuditField(value = "IsNereids", colName = "is_nereids")
public boolean isNereids = false;
@AuditField(value = "IsInternal")
@AuditField(value = "IsInternal", colName = "is_internal")
public boolean isInternal = false;

// resource
@AuditField(value = "ComputeGroupName")
public String cloudClusterName = "";
@AuditField(value = "WorkloadGroup")
@AuditField(value = "WorkloadGroup", colName = "workload_group")
public String workloadGroup = "";
@AuditField(value = "ComputeGroupName", colName = "compute_group")
public String cloudClusterName = "";

// stmt should be last one
@AuditField(value = "Stmt")
@AuditField(value = "Stmt", colName = "stmt")
public String stmt = "";

public long pushToAuditLogQueueTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ public class AuditLoader extends Plugin implements AuditPlugin {

public static final String AUDIT_LOG_TABLE = "audit_log";

// the "\\u001F" and "\\u001E" are used to separate columns and lines in audit log data
public static final String AUDIT_TABLE_COL_SEPARATOR = "\\u001F";
public static final String AUDIT_TABLE_LINE_DELIMITER = "\\u001E";
// the "\\x1F" and "\\x1E" are used to specified column and line delimiter in stream load request
// which is corresponding to the "\\u001F" and "\\u001E" in audit log data.
public static final String AUDIT_TABLE_COL_SEPARATOR_STR = "\\x1F";
public static final String AUDIT_TABLE_LINE_DELIMITER_STR = "\\x1E";

private StringBuilder auditLogBuffer = new StringBuilder();
private int auditLogNum = 0;
private long lastLoadTimeAuditLog = 0;
Expand Down Expand Up @@ -141,73 +149,73 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
// should be same order as InternalSchema.AUDIT_SCHEMA

// uuid and time
logBuffer.append(event.queryId).append("\t");
logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t");
logBuffer.append(event.queryId).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append(AUDIT_TABLE_COL_SEPARATOR);

// cs info
logBuffer.append(event.clientIp).append("\t");
logBuffer.append(event.user).append("\t");
logBuffer.append(event.feIp).append("\t");
logBuffer.append(event.clientIp).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.user).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.feIp).append(AUDIT_TABLE_COL_SEPARATOR);

// default ctl and db
logBuffer.append(event.ctl).append("\t");
logBuffer.append(event.db).append("\t");
logBuffer.append(event.ctl).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.db).append(AUDIT_TABLE_COL_SEPARATOR);

// query state
logBuffer.append(event.state).append("\t");
logBuffer.append(event.errorCode).append("\t");
logBuffer.append(event.errorMessage).append("\t");
logBuffer.append(event.state).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.errorCode).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.errorMessage).append(AUDIT_TABLE_COL_SEPARATOR);

// execution info
logBuffer.append(event.queryTime).append("\t");
logBuffer.append(event.cpuTimeMs).append("\t");
logBuffer.append(event.peakMemoryBytes).append("\t");
logBuffer.append(event.scanBytes).append("\t");
logBuffer.append(event.scanRows).append("\t");
logBuffer.append(event.returnRows).append("\t");
logBuffer.append(event.shuffleSendRows).append("\t");
logBuffer.append(event.shuffleSendBytes).append("\t");
logBuffer.append(event.spillWriteBytesToLocalStorage).append("\t");
logBuffer.append(event.spillReadBytesFromLocalStorage).append("\t");
logBuffer.append(event.scanBytesFromLocalStorage).append("\t");
logBuffer.append(event.scanBytesFromRemoteStorage).append("\t");
logBuffer.append(event.queryTime).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.cpuTimeMs).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.peakMemoryBytes).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.scanBytes).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.scanRows).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.returnRows).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.shuffleSendRows).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.shuffleSendBytes).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.spillWriteBytesToLocalStorage).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.spillReadBytesFromLocalStorage).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.scanBytesFromLocalStorage).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.scanBytesFromRemoteStorage).append(AUDIT_TABLE_COL_SEPARATOR);

// plan info
logBuffer.append(event.parseTimeMs).append("\t");
logBuffer.append(event.planTimesMs).append("\t");
logBuffer.append(event.getMetaTimesMs).append("\t");
logBuffer.append(event.scheduleTimesMs).append("\t");
logBuffer.append(event.hitSqlCache ? 1 : 0).append("\t");
logBuffer.append(event.isHandledInFe ? 1 : 0).append("\t");
logBuffer.append(event.parseTimeMs).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.planTimesMs).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.getMetaTimesMs).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.scheduleTimesMs).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.hitSqlCache ? 1 : 0).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.isHandledInFe ? 1 : 0).append(AUDIT_TABLE_COL_SEPARATOR);

// queried tables, views and m-views
logBuffer.append(event.queriedTablesAndViews).append("\t");
logBuffer.append(event.chosenMViews).append("\t");
logBuffer.append(event.queriedTablesAndViews).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.chosenMViews).append(AUDIT_TABLE_COL_SEPARATOR);

// variable and configs
logBuffer.append(event.changedVariables).append("\t");
logBuffer.append(event.sqlMode).append("\t");
logBuffer.append(event.changedVariables).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.sqlMode).append(AUDIT_TABLE_COL_SEPARATOR);


// type and digest
logBuffer.append(event.stmtType).append("\t");
logBuffer.append(event.stmtId).append("\t");
logBuffer.append(event.sqlHash).append("\t");
logBuffer.append(event.sqlDigest).append("\t");
logBuffer.append(event.isQuery ? 1 : 0).append("\t");
logBuffer.append(event.isNereids ? 1 : 0).append("\t");
logBuffer.append(event.isInternal ? 1 : 0).append("\t");
logBuffer.append(event.stmtType).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.stmtId).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.sqlHash).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.sqlDigest).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.isQuery ? 1 : 0).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.isNereids ? 1 : 0).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.isInternal ? 1 : 0).append(AUDIT_TABLE_COL_SEPARATOR);

// resource
logBuffer.append(event.workloadGroup).append("\t");
logBuffer.append(event.cloudClusterName).append("\t");
logBuffer.append(event.workloadGroup).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.cloudClusterName).append(AUDIT_TABLE_COL_SEPARATOR);

// already trim the query in org.apache.doris.qe.AuditLogHelper#logAuditLog
String stmt = event.stmt;
if (LOG.isDebugEnabled()) {
LOG.debug("receive audit event with stmt: {}", stmt);
}
logBuffer.append(stmt).append("\n");
logBuffer.append(stmt).append(AUDIT_TABLE_LINE_DELIMITER);
}

// public for external call.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ private String getAuditLogString(AuditEvent event) throws IllegalAccessException
}
}

// replace new line characters with escaped characters to make sure the stmt in one line
if (af.value().equals("Stmt")) {
fieldValue = ((String) fieldValue).replace("\n", "\\n")
.replace("\r", "\\r");
}

sb.append("|").append(af.value()).append("=").append(fieldValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ private HttpURLConnection getConnection(String urlStr, String label, String clus
InternalSchema.AUDIT_SCHEMA.stream().map(c -> c.getName()).collect(
Collectors.joining(",")));
conn.addRequestProperty("redirect-policy", "random-be");
conn.addRequestProperty("column_separator", AuditLoader.AUDIT_TABLE_COL_SEPARATOR_STR);
conn.addRequestProperty("line_delimiter", AuditLoader.AUDIT_TABLE_LINE_DELIMITER_STR);
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ public static String handleStmt(String origStmt, StatementBase parsedStmt) {
int maxLen = GlobalVariable.auditPluginMaxSqlLength;
origStmt = truncateByBytes(origStmt, maxLen, " ... /* truncated. audit_plugin_max_sql_length=" + maxLen
+ " */");
return origStmt.replace("\n", "\\n")
.replace("\t", "\\t")
.replace("\r", "\\r");
return origStmt;
}

private static Optional<String> handleInsertStmt(String origStmt, StatementBase parsedStmt) {
Expand Down Expand Up @@ -148,9 +146,6 @@ private static Optional<String> handleInsertStmt(String origStmt, StatementBase
Math.min(GlobalVariable.auditPluginMaxInsertStmtLength, GlobalVariable.auditPluginMaxSqlLength));
origStmt = truncateByBytes(origStmt, maxLen, " ... /* total " + rowCnt
+ " rows, truncated. audit_plugin_max_insert_stmt_length=" + maxLen + " */");
origStmt = origStmt.replace("\n", "\\n")
.replace("\t", "\\t")
.replace("\r", "\\r");
return Optional.of(origStmt);
} else {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,9 @@ public void testHandleStmtTruncationForNonInsertStmt() {
// 4. Test statement with newlines, tabs, carriage returns
String stmtWithSpecialChars = "SELECT *\nFROM table1\tWHERE id = 1\r";
result = AuditLogHelper.handleStmt(stmtWithSpecialChars, nonInsertStmt);
Assert.assertTrue("Should escape newlines", result.contains("\\n"));
Assert.assertTrue("Should escape tabs", result.contains("\\t"));
Assert.assertTrue("Should escape carriage returns", result.contains("\\r"));
Assert.assertFalse("Should not contain actual newlines", result.contains("\n"));
Assert.assertFalse("Should not contain actual tabs", result.contains("\t"));
Assert.assertFalse("Should not contain actual carriage returns", result.contains("\r"));
Assert.assertTrue("Should contain actual newlines", result.contains("\n"));
Assert.assertTrue("Should contain actual tabs", result.contains("\t"));
Assert.assertTrue("Should contain actual carriage returns", result.contains("\r"));

// 5. Test long statement with Chinese characters truncation
String chineseStmt
Expand Down Expand Up @@ -118,12 +115,6 @@ public void testHandleStmtTruncationForNonInsertStmt() {
String emptyStmt = "";
result = AuditLogHelper.handleStmt(emptyStmt, nonInsertStmt);
Assert.assertEquals("Empty string should remain empty", "", result);

// 9. Test statement with only special characters
String specialCharsStmt = "\n\t\r\n\t\r";
result = AuditLogHelper.handleStmt(specialCharsStmt, nonInsertStmt);
Assert.assertEquals("Should escape all special characters", "\\n\\t\\r\\n\\t\\r", result);

} finally {
// Restore original values
GlobalVariable.auditPluginMaxSqlLength = originalMaxSqlLength;
Expand Down Expand Up @@ -172,12 +163,9 @@ public void testHandleStmtTruncationForInsertStmt() {
result = AuditLogHelper.handleStmt(insertWithSpecialChars, insertStmt);

// Verify special characters are properly escaped
Assert.assertTrue("Should escape newlines in INSERT", result.contains("\\n"));
Assert.assertTrue("Should escape tabs in INSERT", result.contains("\\t"));
Assert.assertTrue("Should escape carriage returns in INSERT", result.contains("\\r"));
Assert.assertFalse("Should not contain actual newlines", result.contains("\n"));
Assert.assertFalse("Should not contain actual tabs", result.contains("\t"));
Assert.assertFalse("Should not contain actual carriage returns", result.contains("\r"));
Assert.assertTrue("Should contain actual newlines", result.contains("\n"));
Assert.assertTrue("Should contain actual tabs", result.contains("\t"));
Assert.assertTrue("Should contain actual carriage returns", result.contains("\r"));

// 4. Test comparison: same length statements, different handling for INSERT vs non-INSERT
// Create a statement with length between 80-200
Expand Down
Loading