From b0b92b4eb137a681df6bc971791653e30f981ddc Mon Sep 17 00:00:00 2001 From: camby <104178625@qq.com> Date: Fri, 1 Aug 2025 00:56:32 +0800 Subject: [PATCH] [fix](audit) fix audit loader thread hang and label already exists issue (#54031) ### What problem does this PR solve? audit loader thread hang, and there are no new audit log in table __internal_schema.audit_log ``` "audit loader thread" #59 prio=5 os_prio=0 cpu=19288.63ms elapsed=714273.53s tid=0x00007f7edaf559d0 nid=0x302e1e runnable [0x00007f7e9c3fe000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.SocketDispatcher.read0(java.base@17.0.13/Native Method) at sun.nio.ch.SocketDispatcher.read(java.base@17.0.13/SocketDispatcher.java:47) at sun.nio.ch.NioSocketImpl.tryRead(java.base@17.0.13/NioSocketImpl.java:266) at sun.nio.ch.NioSocketImpl.implRead(java.base@17.0.13/NioSocketImpl.java:317) at sun.nio.ch.NioSocketImpl.read(java.base@17.0.13/NioSocketImpl.java:355) at sun.nio.ch.NioSocketImpl$1.read(java.base@17.0.13/NioSocketImpl.java:808) at java.net.Socket$SocketInputStream.read(java.base@17.0.13/Socket.java:985) at java.io.BufferedInputStream.fill(java.base@17.0.13/BufferedInputStream.java:244) at java.io.BufferedInputStream.read1(java.base@17.0.13/BufferedInputStream.java:284) at java.io.BufferedInputStream.read(java.base@17.0.13/BufferedInputStream.java:343) - locked <0x00000005dbc6ecf8> (a java.io.BufferedInputStream) at sun.net.www.http.HttpClient.parseHTTPHeader(java.base@17.0.13/HttpClient.java:826) at sun.net.www.http.HttpClient.parseHTTP(java.base@17.0.13/HttpClient.java:761) at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(java.base@17.0.13/HttpURLConnection.java:1724) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(java.base@17.0.13/HttpURLConnection.java:1625) at java.net.HttpURLConnection.getResponseCode(java.base@17.0.13/HttpURLConnection.java:529) at org.apache.doris.plugin.audit.AuditStreamLoader.loadBatch(AuditStreamLoader.java:138) at org.apache.doris.plugin.audit.AuditLoader.loadIfNecessary(AuditLoader.java:196) - locked <0x00000005c6182238> (a org.apache.doris.plugin.audit.AuditLoader) at org.apache.doris.plugin.audit.AuditLoader$LoadWorker.run(AuditLoader.java:234) at java.lang.Thread.run(java.base@17.0.13/Thread.java:840) ``` Also modify the label of audit log load, use true FE ip to avoid conflict between FEs --- .../org/apache/doris/plugin/audit/AuditStreamLoader.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java index 6d071dafda732e..4176aace4e3df3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java @@ -17,6 +17,7 @@ package org.apache.doris.plugin.audit; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.InternalSchema; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; @@ -37,6 +38,8 @@ public class AuditStreamLoader { private static final Logger LOG = LogManager.getLogger(AuditStreamLoader.class); private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"; + // timeout for both connection and read. 10 seconds is long enough. + private static final int HTTP_TIMEOUT_MS = 10000; private String hostPort; private String db; private String auditLogTbl; @@ -48,8 +51,8 @@ public AuditStreamLoader() { this.db = FeConstants.INTERNAL_DB_NAME; this.auditLogTbl = AuditLoader.AUDIT_LOG_TABLE; this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, auditLogTbl); - // currently, FE identity is FE's IP, so we replace the "." in IP to make it suitable for label - this.feIdentity = hostPort.replaceAll("\\.", "_").replaceAll(":", "_"); + // currently, FE identity is FE's IP:port, so we replace the "." and ":" to make it suitable for label + this.feIdentity = Env.getCurrentEnv().getSelfNode().getIdent().replaceAll("\\.", "_").replaceAll(":", "_"); } private HttpURLConnection getConnection(String urlStr, String label, String clusterToken) throws IOException { @@ -62,6 +65,8 @@ private HttpURLConnection getConnection(String urlStr, String label, String clus conn.addRequestProperty("Expect", "100-continue"); conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8"); conn.addRequestProperty("label", label); + conn.setConnectTimeout(HTTP_TIMEOUT_MS); + conn.setReadTimeout(HTTP_TIMEOUT_MS); conn.setRequestProperty("timeout", String.valueOf(GlobalVariable.auditPluginLoadTimeoutS)); conn.addRequestProperty("max_filter_ratio", "1.0"); conn.addRequestProperty("columns",