diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java index c2d946b9233..fc6f4d90c39 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java @@ -54,8 +54,14 @@ public class AuditUtils { public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009; public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010; public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011; + public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30013; public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014; - public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30015; + public static final int AUDIT_ID_AGENT_TRY_SEND = 30020; + public static final int AUDIT_ID_AGENT_TRY_SEND_REAL_TIME = 30021; + public static final int AUDIT_ID_AGENT_SEND_EXCEPTION = 30022; + public static final int AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME = 30023; + public static final int AUDIT_ID_AGENT_RESEND = 30024; + public static final int AUDIT_ID_AGENT_RESEND_REAL_TIME = 30025; private static boolean IS_AUDIT = true; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index 1c6790df734..51056a1975b 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -127,14 +127,14 @@ public SenderManager(InstanceProfile profile, String inlongGroupId, String sourc CommonConstants.PROXY_SENDER_MAX_TIMEOUT, CommonConstants.DEFAULT_PROXY_SENDER_MAX_TIMEOUT); maxSenderRetry = profile.getInt( CommonConstants.PROXY_SENDER_MAX_RETRY, CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY); - retrySleepTime = profile.getLong( + retrySleepTime = agentConf.getLong( CommonConstants.PROXY_RETRY_SLEEP, CommonConstants.DEFAULT_PROXY_RETRY_SLEEP); isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE, CommonConstants.DEFAULT_IS_FILE); ioThreadNum = profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM, CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM); enableBusyWait = profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT, CommonConstants.DEFAULT_PROXY_CLIENT_ENABLE_BUSY_WAIT); - batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL); + batchFlushInterval = agentConf.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL); authSecretId = agentConf.get(AGENT_MANAGER_AUTH_SECRET_ID); authSecretKey = agentConf.get(AGENT_MANAGER_AUTH_SECRET_KEY); @@ -231,6 +231,12 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) { while (!suc) { try { AgentSenderCallback cb = new AgentSenderCallback(message, retry); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND, message.getGroupId(), + message.getStreamId(), message.getDataTime(), message.getMsgCnt(), + message.getTotalSize()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND_REAL_TIME, message.getGroupId(), + message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(), + message.getTotalSize()); asyncSendByMessageSender(cb, message.getDataList(), message.getGroupId(), message.getStreamId(), message.getDataTime(), SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS, message.getExtraMap(), proxySend); @@ -238,6 +244,12 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) { message.getMsgCnt()); suc = true; } catch (Exception exception) { + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION, message.getGroupId(), + message.getStreamId(), message.getDataTime(), message.getMsgCnt(), + message.getTotalSize()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME, message.getGroupId(), + message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(), + message.getTotalSize()); suc = false; if (retry > maxSenderRetry) { if (retry % 10 == 0) { @@ -276,6 +288,13 @@ private Runnable flushResendQueue() { try { AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS); if (callback != null) { + SenderMessage message = callback.message; + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, message.getGroupId(), + message.getStreamId(), message.getDataTime(), message.getMsgCnt(), + message.getTotalSize()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND_REAL_TIME, message.getGroupId(), + message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(), + message.getTotalSize()); sendBatchWithRetryCount(callback.message, callback.retry + 1); } } catch (Exception ex) {