From a74e846f0b9e3727863c9d2cee886c344f464260 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Tue, 5 Mar 2024 15:35:47 +0800 Subject: [PATCH] [INLONG-9772][Agent] Increase auditing for sending exceptions and resending --- .../agent/metrics/audit/AuditUtils.java | 8 ++++++- .../sinks/filecollect/SenderManager.java | 23 +++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java index c2d946b9233..fc6f4d90c39 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java @@ -54,8 +54,14 @@ public class AuditUtils { public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009; public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010; public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011; + public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30013; public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014; - public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30015; + public static final int AUDIT_ID_AGENT_TRY_SEND = 30020; + public static final int AUDIT_ID_AGENT_TRY_SEND_REAL_TIME = 30021; + public static final int AUDIT_ID_AGENT_SEND_EXCEPTION = 30022; + public static final int AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME = 30023; + public static final int AUDIT_ID_AGENT_RESEND = 30024; + public static final int AUDIT_ID_AGENT_RESEND_REAL_TIME = 30025; private static boolean IS_AUDIT = true; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index 1c6790df734..51056a1975b 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -127,14 +127,14 @@ public SenderManager(InstanceProfile profile, String inlongGroupId, String sourc CommonConstants.PROXY_SENDER_MAX_TIMEOUT, CommonConstants.DEFAULT_PROXY_SENDER_MAX_TIMEOUT); maxSenderRetry = profile.getInt( CommonConstants.PROXY_SENDER_MAX_RETRY, CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY); - retrySleepTime = profile.getLong( + retrySleepTime = agentConf.getLong( CommonConstants.PROXY_RETRY_SLEEP, CommonConstants.DEFAULT_PROXY_RETRY_SLEEP); isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE, CommonConstants.DEFAULT_IS_FILE); ioThreadNum = profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM, CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM); enableBusyWait = profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT, CommonConstants.DEFAULT_PROXY_CLIENT_ENABLE_BUSY_WAIT); - batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL); + batchFlushInterval = agentConf.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL); authSecretId = agentConf.get(AGENT_MANAGER_AUTH_SECRET_ID); authSecretKey = agentConf.get(AGENT_MANAGER_AUTH_SECRET_KEY); @@ -231,6 +231,12 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) { while (!suc) { try { AgentSenderCallback cb = new AgentSenderCallback(message, retry); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND, message.getGroupId(), + message.getStreamId(), message.getDataTime(), message.getMsgCnt(), + message.getTotalSize()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND_REAL_TIME, message.getGroupId(), + message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(), + message.getTotalSize()); asyncSendByMessageSender(cb, message.getDataList(), message.getGroupId(), message.getStreamId(), message.getDataTime(), SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS, message.getExtraMap(), proxySend); @@ -238,6 +244,12 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) { message.getMsgCnt()); suc = true; } catch (Exception exception) { + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION, message.getGroupId(), + message.getStreamId(), message.getDataTime(), message.getMsgCnt(), + message.getTotalSize()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME, message.getGroupId(), + message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(), + message.getTotalSize()); suc = false; if (retry > maxSenderRetry) { if (retry % 10 == 0) { @@ -276,6 +288,13 @@ private Runnable flushResendQueue() { try { AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS); if (callback != null) { + SenderMessage message = callback.message; + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, message.getGroupId(), + message.getStreamId(), message.getDataTime(), message.getMsgCnt(), + message.getTotalSize()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND_REAL_TIME, message.getGroupId(), + message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(), + message.getTotalSize()); sendBatchWithRetryCount(callback.message, callback.retry + 1); } } catch (Exception ex) {