diff --git a/newrelic-security-agent/src/main/java/com/newrelic/agent/security/intcodeagent/controlcommand/ControlCommandProcessorThreadPool.java b/newrelic-security-agent/src/main/java/com/newrelic/agent/security/intcodeagent/controlcommand/ControlCommandProcessorThreadPool.java index 8f159ad04..7115409d3 100644 --- a/newrelic-security-agent/src/main/java/com/newrelic/agent/security/intcodeagent/controlcommand/ControlCommandProcessorThreadPool.java +++ b/newrelic-security-agent/src/main/java/com/newrelic/agent/security/intcodeagent/controlcommand/ControlCommandProcessorThreadPool.java @@ -98,6 +98,10 @@ public Thread newThread(Runnable r) { }); } + public BlockingQueue getQueue() { + return executor.getQueue(); + } + public static ControlCommandProcessorThreadPool getInstance() { if (instance == null) { diff --git a/newrelic-security-agent/src/main/java/com/newrelic/agent/security/intcodeagent/websocket/WSClient.java b/newrelic-security-agent/src/main/java/com/newrelic/agent/security/intcodeagent/websocket/WSClient.java index d2f07257d..65a209302 100644 --- a/newrelic-security-agent/src/main/java/com/newrelic/agent/security/intcodeagent/websocket/WSClient.java +++ b/newrelic-security-agent/src/main/java/com/newrelic/agent/security/intcodeagent/websocket/WSClient.java @@ -7,6 +7,7 @@ import com.newrelic.agent.security.instrumentator.utils.AgentUtils; import com.newrelic.agent.security.instrumentator.utils.INRSettingsKey; import com.newrelic.agent.security.intcodeagent.controlcommand.ControlCommandProcessor; +import com.newrelic.agent.security.intcodeagent.controlcommand.ControlCommandProcessorThreadPool; import com.newrelic.agent.security.intcodeagent.filelogging.FileLoggerThreadPool; import com.newrelic.api.agent.security.utils.logging.LogLevel; import com.newrelic.agent.security.intcodeagent.logging.IAgentConstants; @@ -182,10 +183,7 @@ public void onOpen(ServerHandshake handshakedata) { logger.logInit(LogLevel.INFO, String.format(IAgentConstants.INIT_WS_CONNECTION, AgentConfig.getInstance().getConfig().getK2ServiceInfo().getValidatorServiceEndpointURL()), WSClient.class.getName()); logger.logInit(LogLevel.INFO, String.format(IAgentConstants.SENDING_APPLICATION_INFO_ON_WS_CONNECT, AgentInfo.getInstance().getApplicationInfo()), WSClient.class.getName()); -// RestRequestThreadPool.getInstance().resetIASTProcessing(); -// GrpcClientRequestReplayHelper.getInstance().resetIASTProcessing(); -// DispatcherPool.getInstance().reset(); -// EventSendPool.getInstance().reset(); + cleanIASTState(); super.send(JsonConverter.toJSON(AgentInfo.getInstance().getApplicationInfo())); WSUtils.getInstance().setReconnecting(false); synchronized (WSUtils.getInstance()) { @@ -196,6 +194,15 @@ public void onOpen(ServerHandshake handshakedata) { logger.logInit(LogLevel.INFO, String.format(IAgentConstants.APPLICATION_INFO_SENT_ON_WS_CONNECT, AgentInfo.getInstance().getApplicationInfo()), WSClient.class.getName()); } + private static void cleanIASTState() { + RestRequestThreadPool.getInstance().resetIASTProcessing(); + GrpcClientRequestReplayHelper.getInstance().resetIASTProcessing(); + RestRequestThreadPool.getInstance().getRejectedIds().clear(); + GrpcClientRequestReplayHelper.getInstance().getRejectedIds().clear(); + DispatcherPool.getInstance().reset(); + EventSendPool.getInstance().reset(); + } + @Override public void onMessage(String message) { // Receive communication from IC side. @@ -218,6 +225,8 @@ public void onClose(int code, String reason, boolean remote) { if (code == CloseFrame.NEVER_CONNECTED) { return; } + ControlCommandProcessorThreadPool.getInstance().getQueue().clear(); + cleanIASTState(); WSUtils.getInstance().setConnected(false); if (code == CloseFrame.POLICY_VALIDATION) { WSReconnectionST.cancelTask(true);