diff --git a/etc/config/wazuh-agent.yml b/etc/config/wazuh-agent.yml index 26c05df6c9..d15aa850c4 100644 --- a/etc/config/wazuh-agent.yml +++ b/etc/config/wazuh-agent.yml @@ -2,6 +2,9 @@ agent: thread_count: 4 server_url: https://localhost:27000 retry_interval: 30s +events: + batch_interval: 10s + batch_size: 1000 inventory: enabled: true interval: 1h diff --git a/src/agent/communicator/include/communicator.hpp b/src/agent/communicator/include/communicator.hpp index 62b45c2a68..ba51021155 100644 --- a/src/agent/communicator/include/communicator.hpp +++ b/src/agent/communicator/include/communicator.hpp @@ -57,6 +57,24 @@ namespace communicator m_retryInterval = getConfigValue.template operator()("agent", "retry_interval") .value_or(config::agent::DEFAULT_RETRY_INTERVAL); + + m_batchInterval = getConfigValue.template operator()("events", "batch_interval") + .value_or(config::agent::DEFAULT_BATCH_INTERVAL); + + if (m_batchInterval < 1'000 || m_batchInterval > (1'000 * 60 * 60)) + { + LogWarn("batch_interval must be between 1s and 1h. Using default value."); + m_batchInterval = config::agent::DEFAULT_BATCH_INTERVAL; + } + + m_batchSize = getConfigValue.template operator()("events", "batch_size") + .value_or(config::agent::DEFAULT_BATCH_SIZE); + + if (m_batchSize < 1'000 || m_batchSize > 1'000'000) + { + LogWarn("batch_size must be between 1000 and 1000000. Using default value."); + m_batchSize = config::agent::DEFAULT_BATCH_SIZE; + } } /// @brief Waits for the authentication token to expire and authenticates again @@ -64,21 +82,22 @@ namespace communicator /// @brief Retrieves commands from the manager /// @param onSuccess A callback function to execute when a command is received - boost::asio::awaitable GetCommandsFromManager(std::function onSuccess); + boost::asio::awaitable + GetCommandsFromManager(std::function onSuccess); /// @brief Processes messages in a stateful manner /// @param getMessages A function to retrieve a message from the queue /// @param onSuccess A callback function to execute when a message is processed - boost::asio::awaitable - StatefulMessageProcessingTask(std::function()> getMessages, - std::function onSuccess); + boost::asio::awaitable StatefulMessageProcessingTask( + std::function>(const int)> getMessages, + std::function onSuccess); /// @brief Processes messages in a stateless manner /// @param getMessages A function to retrieve a message from the queue /// @param onSuccess A callback function to execute when a message is processed - boost::asio::awaitable - StatelessMessageProcessingTask(std::function()> getMessages, - std::function onSuccess); + boost::asio::awaitable StatelessMessageProcessingTask( + std::function>(const int)> getMessages, + std::function onSuccess); /// @brief Retrieves group configuration from the manager /// @param groupName The name of the group to retrieve the configuration for @@ -116,6 +135,12 @@ namespace communicator /// @brief Time in milliseconds between authentication attemps in case of failure std::time_t m_retryInterval = config::agent::DEFAULT_RETRY_INTERVAL; + /// @brief Time between batch requests + std::time_t m_batchInterval = config::agent::DEFAULT_BATCH_INTERVAL; + + /// @brief Maximum number of messages to batch + int m_batchSize = config::agent::DEFAULT_BATCH_SIZE; + /// @brief The server URL std::string m_serverUrl; diff --git a/src/agent/communicator/include/http_client.hpp b/src/agent/communicator/include/http_client.hpp index 1d20f3165e..cd9857caab 100644 --- a/src/agent/communicator/include/http_client.hpp +++ b/src/agent/communicator/include/http_client.hpp @@ -39,17 +39,21 @@ namespace http_client /// @param messageGetter Function to get the message body asynchronously /// @param onUnauthorized Callback for unauthorized access /// @param connectionRetry Time in milliseconds to wait before retrying the connection + /// @param batchInterval Time to wait between requests + /// @param batchSize The maximum number of messages to batch /// @param onSuccess Callback for successful request completion /// @param loopRequestCondition Condition to continue looping requests /// @return Awaitable task for the HTTP request - boost::asio::awaitable - Co_PerformHttpRequest(std::shared_ptr token, - HttpRequestParams params, - std::function()> messageGetter, - std::function onUnauthorized, - std::time_t connectionRetry, - std::function onSuccess = {}, - std::function loopRequestCondition = {}) override; + boost::asio::awaitable Co_PerformHttpRequest( + std::shared_ptr token, + HttpRequestParams params, + std::function>(const int)> messageGetter, + std::function onUnauthorized, + std::time_t connectionRetry, + std::time_t batchInterval, + int batchSize, + std::function onSuccess = {}, + std::function loopRequestCondition = {}) override; /// @brief Performs a synchronous HTTP request /// @param params Parameters for the request diff --git a/src/agent/communicator/include/ihttp_client.hpp b/src/agent/communicator/include/ihttp_client.hpp index 820bfccd10..27a6197ef3 100644 --- a/src/agent/communicator/include/ihttp_client.hpp +++ b/src/agent/communicator/include/ihttp_client.hpp @@ -11,6 +11,7 @@ #include #include #include +#include namespace http_client { @@ -36,17 +37,21 @@ namespace http_client /// @param messageGetter Function to retrieve messages /// @param onUnauthorized Action to take on unauthorized access /// @param connectionRetry Time to wait before retrying the connection + /// @param batchInterval Time to wait between requests + /// @param batchSize The maximum number of messages to batch /// @param onSuccess Action to take on successful request /// @param loopRequestCondition Condition to continue the request loop /// @return Awaitable task for the HTTP request - virtual boost::asio::awaitable - Co_PerformHttpRequest(std::shared_ptr token, - HttpRequestParams params, - std::function()> messageGetter, - std::function onUnauthorized, - std::time_t connectionRetry, - std::function onSuccess = {}, - std::function loopRequestCondition = {}) = 0; + virtual boost::asio::awaitable Co_PerformHttpRequest( + std::shared_ptr token, + HttpRequestParams params, + std::function>(const int)> messageGetter, + std::function onUnauthorized, + std::time_t connectionRetry, + std::time_t batchInterval, + int batchSize, + std::function onSuccess = {}, + std::function loopRequestCondition = {}) = 0; /// @brief Perform an HTTP request and receive the response /// @param params The parameters for the request diff --git a/src/agent/communicator/src/communicator.cpp b/src/agent/communicator/src/communicator.cpp index 34bb459d34..52c15b341d 100644 --- a/src/agent/communicator/src/communicator.cpp +++ b/src/agent/communicator/src/communicator.cpp @@ -65,7 +65,8 @@ namespace communicator return std::max(0L, static_cast(m_tokenExpTimeInSeconds - now_seconds)); } - boost::asio::awaitable Communicator::GetCommandsFromManager(std::function onSuccess) + boost::asio::awaitable + Communicator::GetCommandsFromManager(std::function onSuccess) { auto onAuthenticationFailed = [this]() { @@ -79,8 +80,15 @@ namespace communicator const auto reqParams = http_client::HttpRequestParams( boost::beast::http::verb::get, m_serverUrl, "/api/v1/commands", m_getHeaderInfo ? m_getHeaderInfo() : ""); - co_await m_httpClient->Co_PerformHttpRequest( - m_token, reqParams, {}, onAuthenticationFailed, m_retryInterval, onSuccess, loopCondition); + co_await m_httpClient->Co_PerformHttpRequest(m_token, + reqParams, + {}, + onAuthenticationFailed, + m_retryInterval, + m_batchInterval, + m_batchSize, + onSuccess, + loopCondition); } boost::asio::awaitable Communicator::WaitForTokenExpirationAndAuthenticate() @@ -124,9 +132,9 @@ namespace communicator } } - boost::asio::awaitable - Communicator::StatefulMessageProcessingTask(std::function()> getMessages, - std::function onSuccess) + boost::asio::awaitable Communicator::StatefulMessageProcessingTask( + std::function>(const int)> getMessages, + std::function onSuccess) { auto onAuthenticationFailed = [this]() { @@ -142,13 +150,20 @@ namespace communicator m_serverUrl, "/api/v1/events/stateful", m_getHeaderInfo ? m_getHeaderInfo() : ""); - co_await m_httpClient->Co_PerformHttpRequest( - m_token, reqParams, getMessages, onAuthenticationFailed, m_retryInterval, onSuccess, loopCondition); + co_await m_httpClient->Co_PerformHttpRequest(m_token, + reqParams, + getMessages, + onAuthenticationFailed, + m_retryInterval, + m_batchInterval, + m_batchSize, + onSuccess, + loopCondition); } - boost::asio::awaitable - Communicator::StatelessMessageProcessingTask(std::function()> getMessages, - std::function onSuccess) + boost::asio::awaitable Communicator::StatelessMessageProcessingTask( + std::function>(const int)> getMessages, + std::function onSuccess) { auto onAuthenticationFailed = [this]() { @@ -164,8 +179,15 @@ namespace communicator m_serverUrl, "/api/v1/events/stateless", m_getHeaderInfo ? m_getHeaderInfo() : ""); - co_await m_httpClient->Co_PerformHttpRequest( - m_token, reqParams, getMessages, onAuthenticationFailed, m_retryInterval, onSuccess, loopCondition); + co_await m_httpClient->Co_PerformHttpRequest(m_token, + reqParams, + getMessages, + onAuthenticationFailed, + m_retryInterval, + m_batchInterval, + m_batchSize, + onSuccess, + loopCondition); } void Communicator::TryReAuthenticate() diff --git a/src/agent/communicator/src/http_client.cpp b/src/agent/communicator/src/http_client.cpp index cb7be806e7..ed0909c9ad 100644 --- a/src/agent/communicator/src/http_client.cpp +++ b/src/agent/communicator/src/http_client.cpp @@ -95,14 +95,16 @@ namespace http_client return req; } - boost::asio::awaitable - HttpClient::Co_PerformHttpRequest(std::shared_ptr token, - HttpRequestParams reqParams, - std::function()> messageGetter, - std::function onUnauthorized, - std::time_t connectionRetry, - std::function onSuccess, - std::function loopRequestCondition) + boost::asio::awaitable HttpClient::Co_PerformHttpRequest( + std::shared_ptr token, + HttpRequestParams reqParams, + std::function>(const int)> messageGetter, + std::function onUnauthorized, + std::time_t connectionRetry, + std::time_t batchInterval, + int batchSize, + std::function onSuccess, + std::function loopRequestCondition) { using namespace std::chrono_literals; @@ -150,9 +152,30 @@ namespace http_client continue; } + auto messagesCount = 0; + if (messageGetter != nullptr) { - reqParams.Body = co_await messageGetter(); + boost::asio::steady_timer refreshTimer(co_await boost::asio::this_coro::executor); + boost::asio::steady_timer batchTimeoutTimer(co_await boost::asio::this_coro::executor); + batchTimeoutTimer.expires_after(std::chrono::milliseconds(batchInterval)); + + while (loopRequestCondition != nullptr && loopRequestCondition()) + { + const auto messages = co_await messageGetter(batchSize); + messagesCount = std::get<0>(messages); + + if (messagesCount >= batchSize || batchTimeoutTimer.expiry() <= std::chrono::steady_clock::now()) + { + LogTrace("Messages count: {}", messagesCount); + reqParams.Body = std::get<1>(messages); + break; + } + + constexpr int refreshInterval = 100; + refreshTimer.expires_after(std::chrono::milliseconds(refreshInterval)); + co_await refreshTimer.async_wait(boost::asio::use_awaitable); + } } else { @@ -190,7 +213,7 @@ namespace http_client { if (onSuccess != nullptr) { - onSuccess(boost::beast::buffers_to_string(res.body().data())); + onSuccess(messagesCount, boost::beast::buffers_to_string(res.body().data())); } } else if (res.result() == boost::beast::http::status::unauthorized || diff --git a/src/agent/communicator/tests/communicator_test.cpp b/src/agent/communicator/tests/communicator_test.cpp index 5350e999ea..62a393fb9a 100644 --- a/src/agent/communicator/tests/communicator_test.cpp +++ b/src/agent/communicator/tests/communicator_test.cpp @@ -20,6 +20,7 @@ // NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines) using namespace testing; +using GetMessagesFuncType = std::function(const int)>; namespace { @@ -51,30 +52,34 @@ TEST(CommunicatorTest, StatefulMessageProcessingTask_Success) { auto mockHttpClient = std::make_unique(); - auto getMessages = []() -> boost::asio::awaitable + auto getMessages = [](const int) -> boost::asio::awaitable { - co_return std::string("message-content"); + co_return intStringTuple {1, std::string("message-content")}; }; - std::function onSuccess = [](const std::string& message) + std::function onSuccess = [](const int, const std::string& message) { EXPECT_EQ(message, "message-content"); }; - EXPECT_CALL(*mockHttpClient, Co_PerformHttpRequest(_, _, _, _, _, _, _)) - .WillOnce(Invoke( - [](std::shared_ptr, - http_client::HttpRequestParams, - std::function()> pGetMessages, - std::function, - [[maybe_unused]] std::time_t connectionRetry, - std::function pOnSuccess, - [[maybe_unused]] std::function loopRequestCondition) -> boost::asio::awaitable - { - const auto message = co_await pGetMessages(); - pOnSuccess(message); - co_return; - })); + auto MockCo_PerformHttpRequest = + [](std::shared_ptr, + http_client::HttpRequestParams, + GetMessagesFuncType pGetMessages, + std::function, + [[maybe_unused]] std::time_t connectionRetry, + [[maybe_unused]] std::time_t batchInterval, + [[maybe_unused]] int batchSize, + std::function pOnSuccess, + [[maybe_unused]] std::function loopRequestCondition) -> boost::asio::awaitable + { + const auto message = co_await pGetMessages(1); + pOnSuccess(std::get<0>(message), std::get<1>(message)); + co_return; + }; + + EXPECT_CALL(*mockHttpClient, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _)) + .WillOnce(Invoke(MockCo_PerformHttpRequest)); communicator::Communicator communicator(std::move(mockHttpClient), "uuid", "key", nullptr, FUNC); @@ -107,20 +112,24 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio return std::nullopt; })); + auto MockCo_PerformHttpRequest = + [](std::shared_ptr token, + http_client::HttpRequestParams, + [[maybe_unused]] GetMessagesFuncType pGetMessages, + [[maybe_unused]] std::function onUnauthorized, + [[maybe_unused]] std::time_t connectionRetry, + [[maybe_unused]] std::time_t batchInterval, + [[maybe_unused]] int batchSize, + [[maybe_unused]] std::function onSuccess, + [[maybe_unused]] std::function loopCondition) -> boost::asio::awaitable + { + EXPECT_TRUE(token->empty()); + co_return; + }; + // A following call to Co_PerformHttpRequest should not have a token - EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _)) - .WillOnce(Invoke( - [](std::shared_ptr token, - http_client::HttpRequestParams, - [[maybe_unused]] std::function()> getMessages, - [[maybe_unused]] std::function onUnauthorized, - [[maybe_unused]] std::time_t connectionRetry, - [[maybe_unused]] std::function onSuccess, - [[maybe_unused]] std::function loopCondition) -> boost::asio::awaitable - { - EXPECT_TRUE(token->empty()); - co_return; - })); + EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _)) + .WillOnce(Invoke(MockCo_PerformHttpRequest)); boost::asio::io_context ioContext; @@ -129,9 +138,10 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio [communicatorPtr]() mutable -> boost::asio::awaitable { co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate(); - co_await communicatorPtr->StatelessMessageProcessingTask([]() -> boost::asio::awaitable - { co_return "message"; }, - []([[maybe_unused]] const std::string& msg) {}); + co_await communicatorPtr->StatelessMessageProcessingTask( + [](const int) -> boost::asio::awaitable + { co_return intStringTuple(1, std::string {"message"}); }, + []([[maybe_unused]] const int, const std::string&) {}); }(), boost::asio::detached); @@ -162,19 +172,24 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken) })); std::string capturedToken; - EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _)) - .WillOnce(Invoke( - [&capturedToken](std::shared_ptr token, - http_client::HttpRequestParams, - [[maybe_unused]] std::function()> getMessages, - [[maybe_unused]] std::function onUnauthorized, - [[maybe_unused]] std::time_t connectionRetry, - [[maybe_unused]] std::function onSuccess, - [[maybe_unused]] std::function loopCondition) -> boost::asio::awaitable - { - capturedToken = *token; - co_return; - })); + + auto MockCo_PerformHttpRequest = + [&capturedToken](std::shared_ptr token, + http_client::HttpRequestParams, + [[maybe_unused]] GetMessagesFuncType pGetMessages, + [[maybe_unused]] std::function onUnauthorized, + [[maybe_unused]] std::time_t connectionRetry, + [[maybe_unused]] std::time_t batchInterval, + [[maybe_unused]] int batchSize, + [[maybe_unused]] std::function onSuccess, + [[maybe_unused]] std::function loopCondition) -> boost::asio::awaitable + { + capturedToken = *token; + co_return; + }; + + EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _)) + .WillOnce(Invoke(MockCo_PerformHttpRequest)); boost::asio::io_context ioContext; @@ -183,9 +198,10 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken) [communicatorPtr]() mutable -> boost::asio::awaitable { co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate(); - co_await communicatorPtr->StatelessMessageProcessingTask([]() -> boost::asio::awaitable - { co_return "message"; }, - []([[maybe_unused]] const std::string& msg) {}); + co_await communicatorPtr->StatelessMessageProcessingTask( + [](const int) -> boost::asio::awaitable + { co_return intStringTuple(1, std::string {"message"}); }, + []([[maybe_unused]] const int, const std::string&) {}); }(), boost::asio::detached); diff --git a/src/agent/communicator/tests/http_client_test.cpp b/src/agent/communicator/tests/http_client_test.cpp index 7737330ebe..dd73fbcde9 100644 --- a/src/agent/communicator/tests/http_client_test.cpp +++ b/src/agent/communicator/tests/http_client_test.cpp @@ -14,6 +14,7 @@ #include #include #include +#include // NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines,cppcoreguidelines-avoid-reference-coroutine-parameters) @@ -215,15 +216,14 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_Success) SetupMockSocketReadExpectations(boost::beast::http::status::ok); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable + auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> { getMessagesCalled = true; - co_return std::string("test message"); + co_return std::tuple(1, "test message"); }; auto onSuccessCalled = false; - std::function onSuccess = - [&onSuccessCalled]([[maybe_unused]] const std::string& responseBody) + std::function onSuccess = [&onSuccessCalled](const int, const std::string&) { onSuccessCalled = true; }; @@ -234,6 +234,12 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_Success) unauthorizedCalled = true; }; + auto loopCondition = true; + std::function loopRequestCondition = [&loopCondition]() + { + return std::exchange(loopCondition, false); + }; + const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::get, "https://localhost:8080", "/", "Wazuh 5.0.0"); @@ -242,8 +248,10 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_Success) getMessages, onUnauthorized, 5, // NOLINT + 1, // NOLINT + 1, // NOLINT onSuccess, - nullptr); + loopRequestCondition); boost::asio::io_context ioContext; boost::asio::co_spawn(ioContext, std::move(task), boost::asio::detached); @@ -262,15 +270,14 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_CallbacksNotCalledIfCannotConnect) SetupMockSocketConnectExpectations(boost::system::errc::make_error_code(boost::system::errc::bad_address)); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable + auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> { getMessagesCalled = true; - co_return std::string("test message"); + co_return std::tuple(1, "test message"); }; auto onSuccessCalled = false; - std::function onSuccess = - [&onSuccessCalled]([[maybe_unused]] const std::string& responseBody) + std::function onSuccess = [&onSuccessCalled](const int, const std::string&) { onSuccessCalled = true; }; @@ -288,6 +295,8 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_CallbacksNotCalledIfCannotConnect) getMessages, onUnauthorized, 5, // NOLINT + 1, // NOLINT + 1, // NOLINT onSuccess, nullptr); @@ -309,15 +318,14 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncWriteFails SetupMockSocketWriteExpectations(boost::system::errc::make_error_code(boost::system::errc::bad_address)); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable + auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> { getMessagesCalled = true; - co_return std::string("test message"); + co_return std::tuple(1, "test message"); }; auto onSuccessCalled = false; - std::function onSuccess = - [&onSuccessCalled]([[maybe_unused]] const std::string& responseBody) + std::function onSuccess = [&onSuccessCalled](const int, const std::string&) { onSuccessCalled = true; }; @@ -328,6 +336,12 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncWriteFails unauthorizedCalled = true; }; + auto loopCondition = true; + std::function loopRequestCondition = [&loopCondition]() + { + return std::exchange(loopCondition, false); + }; + const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::get, "https://localhost:8080", "/", "Wazuh 5.0.0"); auto task = client->Co_PerformHttpRequest(std::make_shared("token"), @@ -335,8 +349,10 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncWriteFails getMessages, onUnauthorized, 5, // NOLINT + 1, // NOLINT + 1, // NOLINT onSuccess, - nullptr); + loopRequestCondition); boost::asio::io_context ioContext; boost::asio::co_spawn(ioContext, std::move(task), boost::asio::detached); @@ -358,15 +374,14 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncReadFails) boost::system::errc::make_error_code(boost::system::errc::bad_address)); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable + auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> { getMessagesCalled = true; - co_return std::string("test message"); + co_return std::tuple(1, "test message"); }; auto onSuccessCalled = false; - std::function onSuccess = - [&onSuccessCalled]([[maybe_unused]] const std::string& responseBody) + std::function onSuccess = [&onSuccessCalled](const int, const std::string&) { onSuccessCalled = true; }; @@ -377,6 +392,12 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncReadFails) unauthorizedCalled = true; }; + auto loopCondition = true; + std::function loopRequestCondition = [&loopCondition]() + { + return std::exchange(loopCondition, false); + }; + const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::get, "https://localhost:8080", "/", "Wazuh 5.0.0"); auto task = client->Co_PerformHttpRequest(std::make_shared("token"), @@ -384,8 +405,10 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncReadFails) getMessages, onUnauthorized, 5, // NOLINT + 1, // NOLINT + 1, // NOLINT onSuccess, - nullptr); + loopRequestCondition); boost::asio::io_context ioContext; boost::asio::co_spawn(ioContext, std::move(task), boost::asio::detached); @@ -406,15 +429,14 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_UnauthorizedCalledWhenAuthorization SetupMockSocketReadExpectations(boost::beast::http::status::unauthorized); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable + auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> { getMessagesCalled = true; - co_return std::string("test message"); + co_return std::tuple(1, "test message"); }; auto onSuccessCalled = false; - std::function onSuccess = - [&onSuccessCalled]([[maybe_unused]] const std::string& responseBody) + std::function onSuccess = [&onSuccessCalled](const int, const std::string&) { onSuccessCalled = true; }; @@ -425,6 +447,12 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_UnauthorizedCalledWhenAuthorization unauthorizedCalled = true; }; + auto loopCondition = true; + std::function loopRequestCondition = [&loopCondition]() + { + return std::exchange(loopCondition, false); + }; + const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::get, "https://localhost:8080", "/", "Wazuh 5.0.0"); auto task = client->Co_PerformHttpRequest(std::make_shared("token"), @@ -432,8 +460,10 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_UnauthorizedCalledWhenAuthorization getMessages, onUnauthorized, 5, // NOLINT + 1, // NOLINT + 1, // NOLINT onSuccess, - nullptr); + loopRequestCondition); boost::asio::io_context ioContext; boost::asio::co_spawn(ioContext, std::move(task), boost::asio::detached); diff --git a/src/agent/communicator/tests/mocks/mock_http_client.hpp b/src/agent/communicator/tests/mocks/mock_http_client.hpp index c20cd2a93c..01d784e720 100644 --- a/src/agent/communicator/tests/mocks/mock_http_client.hpp +++ b/src/agent/communicator/tests/mocks/mock_http_client.hpp @@ -2,6 +2,8 @@ #include +using intStringTuple = std::tuple; + class MockHttpClient : public http_client::IHttpClient { public: @@ -14,10 +16,12 @@ class MockHttpClient : public http_client::IHttpClient Co_PerformHttpRequest, (std::shared_ptr token, http_client::HttpRequestParams params, - std::function()> messageGetter, + std::function(const int)> messageGetter, std::function onUnauthorized, std::time_t connectionRetry, - std::function onSuccess, + std::time_t batchInterval, + int batchSize, + std::function onSuccess, std::function loopRequestCondition), (override)); diff --git a/src/agent/src/agent.cpp b/src/agent/src/agent.cpp index a526ba1072..153357d777 100644 --- a/src/agent/src/agent.cpp +++ b/src/agent/src/agent.cpp @@ -54,25 +54,29 @@ void Agent::Run() m_taskManager.EnqueueTask(m_communicator.WaitForTokenExpirationAndAuthenticate()); m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager( - [this](const std::string& response) { PushCommandsToQueue(m_messageQueue, response); })); + [this](const int, const std::string& response) { PushCommandsToQueue(m_messageQueue, response); })); m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask( - [this]() + [this](const int numMessages) { - return GetMessagesFromQueue( - m_messageQueue, MessageType::STATEFUL, [this]() { return m_agentInfo.GetMetadataInfo(false); }); + return GetMessagesFromQueue(m_messageQueue, + MessageType::STATEFUL, + numMessages, + [this]() { return m_agentInfo.GetMetadataInfo(false); }); }, - [this]([[maybe_unused]] const std::string& response) - { PopMessagesFromQueue(m_messageQueue, MessageType::STATEFUL); })); + [this]([[maybe_unused]] const int messageCount, const std::string&) + { PopMessagesFromQueue(m_messageQueue, MessageType::STATEFUL, messageCount); })); m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask( - [this]() + [this](const int numMessages) { - return GetMessagesFromQueue( - m_messageQueue, MessageType::STATELESS, [this]() { return m_agentInfo.GetMetadataInfo(false); }); + return GetMessagesFromQueue(m_messageQueue, + MessageType::STATELESS, + numMessages, + [this]() { return m_agentInfo.GetMetadataInfo(false); }); }, - [this]([[maybe_unused]] const std::string& response) - { PopMessagesFromQueue(m_messageQueue, MessageType::STATELESS); })); + [this]([[maybe_unused]] const int messageCount, const std::string&) + { PopMessagesFromQueue(m_messageQueue, MessageType::STATELESS, messageCount); })); m_moduleManager.AddModules(); m_taskManager.EnqueueTask([this]() { m_moduleManager.Start(); }); diff --git a/src/agent/src/message_queue_utils.cpp b/src/agent/src/message_queue_utils.cpp index d484b92ea0..235be09123 100644 --- a/src/agent/src/message_queue_utils.cpp +++ b/src/agent/src/message_queue_utils.cpp @@ -3,17 +3,13 @@ #include -namespace +boost::asio::awaitable> +GetMessagesFromQueue(std::shared_ptr multiTypeQueue, + MessageType messageType, + int numMessages, + std::function getMetadataInfo) { - // This should eventually be replaced with a configuration parameter. - constexpr int NUM_EVENTS = 1; -} // namespace - -boost::asio::awaitable GetMessagesFromQueue(std::shared_ptr multiTypeQueue, - MessageType messageType, - std::function getMetadataInfo) -{ - const auto messages = co_await multiTypeQueue->getNextNAwaitable(messageType, NUM_EVENTS, "", ""); + const auto messages = co_await multiTypeQueue->getNextNAwaitable(messageType, numMessages, "", ""); std::string output; @@ -27,12 +23,12 @@ boost::asio::awaitable GetMessagesFromQueue(std::shared_ptr {static_cast(messages.size()), output}; } -void PopMessagesFromQueue(std::shared_ptr multiTypeQueue, MessageType messageType) +void PopMessagesFromQueue(std::shared_ptr multiTypeQueue, MessageType messageType, int numMessages) { - multiTypeQueue->popN(messageType, NUM_EVENTS); + multiTypeQueue->popN(messageType, numMessages); } void PushCommandsToQueue(std::shared_ptr multiTypeQueue, const std::string& commands) diff --git a/src/agent/src/message_queue_utils.hpp b/src/agent/src/message_queue_utils.hpp index 98354563d9..a8b4d55a94 100644 --- a/src/agent/src/message_queue_utils.hpp +++ b/src/agent/src/message_queue_utils.hpp @@ -9,22 +9,27 @@ #include #include #include +#include class IMultiTypeQueue; /// @brief Gets messages from a queue and returns them as a JSON string /// @param multiTypeQueue The queue to get messages from /// @param messageType The type of messages to get from the queue +/// @param numMessages The number of messages to get /// @param getMetadataInfo Function to get the agent metadata /// @return A string containing the messages from the queue -boost::asio::awaitable GetMessagesFromQueue(std::shared_ptr multiTypeQueue, - MessageType messageType, - std::function getMetadataInfo); +boost::asio::awaitable> +GetMessagesFromQueue(std::shared_ptr multiTypeQueue, + MessageType messageType, + int numMessages, + std::function getMetadataInfo); /// @brief Removes a fixed number of messages from the specified queue /// @param multiTypeQueue The queue from which to remove messages /// @param messageType The type of messages to remove -void PopMessagesFromQueue(std::shared_ptr multiTypeQueue, MessageType messageType); +/// @param numMessages The number of messages to remove from the queue +void PopMessagesFromQueue(std::shared_ptr multiTypeQueue, MessageType messageType, int numMessages); /// @brief Pushes a batch of commands to the specified queue /// @param multiTypeQueue The queue to push commands to diff --git a/src/agent/tests/agent_registration_test.cpp b/src/agent/tests/agent_registration_test.cpp index 2babb7d75f..b64e1ec56a 100644 --- a/src/agent/tests/agent_registration_test.cpp +++ b/src/agent/tests/agent_registration_test.cpp @@ -7,6 +7,8 @@ #include #include +#include "../communicator/tests/mocks/mock_http_client.hpp" + #include #include #include @@ -15,48 +17,6 @@ #include #include -class MockHttpClient : public http_client::IHttpClient -{ -public: - MOCK_METHOD(boost::beast::http::request, - CreateHttpRequest, - (const http_client::HttpRequestParams& params), - (override)); - - MOCK_METHOD(boost::asio::awaitable, - Co_PerformHttpRequest, - (std::shared_ptr token, - http_client::HttpRequestParams params, - std::function()> messageGetter, - std::function onUnauthorized, - std::time_t connectionRetry, - std::function onSuccess, - std::function loopRequestCondition), - (override)); - - MOCK_METHOD(boost::beast::http::response, - PerformHttpRequest, - (const http_client::HttpRequestParams& params), - (override)); - - MOCK_METHOD( - std::optional, - AuthenticateWithUuidAndKey, - (const std::string& host, const std::string& userAgent, const std::string& uuid, const std::string& key), - (override)); - - MOCK_METHOD( - std::optional, - AuthenticateWithUserPassword, - (const std::string& host, const std::string& userAgent, const std::string& user, const std::string& password), - (override)); - - MOCK_METHOD(boost::beast::http::response, - PerformHttpRequestDownload, - (const http_client::HttpRequestParams& params, const std::string& dstFilePath), - (override)); -}; - class RegisterTest : public ::testing::Test { protected: diff --git a/src/agent/tests/message_queue_utils_test.cpp b/src/agent/tests/message_queue_utils_test.cpp index 255d5f8b45..329505f384 100644 --- a/src/agent/tests/message_queue_utils_test.cpp +++ b/src/agent/tests/message_queue_utils_test.cpp @@ -40,6 +40,8 @@ class MessageQueueUtilsTest : public ::testing::Test boost::asio::io_context io_context; std::shared_ptr mockQueue; + + const int MAX_MESSAGES = 1; }; TEST_F(MessageQueueUtilsTest, GetMessagesFromQueueTest) @@ -50,19 +52,22 @@ TEST_F(MessageQueueUtilsTest, GetMessagesFromQueueTest) testMessages.emplace_back(MessageType::STATELESS, data, "", "", metadata); // NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines) - EXPECT_CALL(*mockQueue, getNextNAwaitable(MessageType::STATELESS, 1, "", "")) + EXPECT_CALL(*mockQueue, getNextNAwaitable(MessageType::STATELESS, MAX_MESSAGES, "", "")) .WillOnce([&testMessages]() -> boost::asio::awaitable> { co_return testMessages; }); // NOLINTEND(cppcoreguidelines-avoid-capturing-lambda-coroutines) - auto result = boost::asio::co_spawn( - io_context, GetMessagesFromQueue(mockQueue, MessageType::STATELESS, nullptr), boost::asio::use_future); + auto awaitableResult = + boost::asio::co_spawn(io_context, + GetMessagesFromQueue(mockQueue, MessageType::STATELESS, MAX_MESSAGES, nullptr), + boost::asio::use_future); const auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(1); io_context.run_until(timeout); - ASSERT_TRUE(result.wait_for(std::chrono::milliseconds(1)) == std::future_status::ready); + ASSERT_TRUE(awaitableResult.wait_for(std::chrono::milliseconds(1)) == std::future_status::ready); - const auto jsonResult = result.get(); + const auto result = awaitableResult.get(); + const auto jsonResult = std::get<1>(result); std::string expectedString = std::string("\n") + R"({"module":"logcollector","type":"file"})" + std::string("\n") + R"(["{\"event\":{\"original\":\"Testing message!\"}}"])"; @@ -81,23 +86,25 @@ TEST_F(MessageQueueUtilsTest, GetMessagesFromQueueMetadataTest) metadata["agent"] = "test"; // NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines) - EXPECT_CALL(*mockQueue, getNextNAwaitable(MessageType::STATELESS, 1, "", "")) + EXPECT_CALL(*mockQueue, getNextNAwaitable(MessageType::STATELESS, MAX_MESSAGES, "", "")) .WillOnce([&testMessages]() -> boost::asio::awaitable> { co_return testMessages; }); // NOLINTEND(cppcoreguidelines-avoid-capturing-lambda-coroutines) io_context.restart(); - auto result = boost::asio::co_spawn( + auto awaitableResult = boost::asio::co_spawn( io_context, - GetMessagesFromQueue(mockQueue, MessageType::STATELESS, [&metadata]() { return metadata.dump(); }), + GetMessagesFromQueue( + mockQueue, MessageType::STATELESS, MAX_MESSAGES, [&metadata]() { return metadata.dump(); }), boost::asio::use_future); const auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(1); io_context.run_until(timeout); - ASSERT_TRUE(result.wait_for(std::chrono::milliseconds(1)) == std::future_status::ready); + ASSERT_TRUE(awaitableResult.wait_for(std::chrono::milliseconds(1)) == std::future_status::ready); - const auto jsonResult = result.get(); + const auto result = awaitableResult.get(); + const auto jsonResult = std::get<1>(result); std::string expectedString = R"({"agent":"test"})" + std::string("\n") + R"({"module":"logcollector","type":"file"})" + std::string("\n") + @@ -109,7 +116,7 @@ TEST_F(MessageQueueUtilsTest, GetMessagesFromQueueMetadataTest) TEST_F(MessageQueueUtilsTest, PopMessagesFromQueueTest) { EXPECT_CALL(*mockQueue, popN(MessageType::STATEFUL, 1, "")).Times(1); - PopMessagesFromQueue(mockQueue, MessageType::STATEFUL); + PopMessagesFromQueue(mockQueue, MessageType::STATEFUL, 1); } TEST_F(MessageQueueUtilsTest, PushCommandsToQueueTest) diff --git a/src/cmake/config.cmake b/src/cmake/config.cmake index e849a5b651..d332834e1b 100644 --- a/src/cmake/config.cmake +++ b/src/cmake/config.cmake @@ -14,6 +14,10 @@ set(DEFAULT_SERVER_URL "https://localhost:27000" CACHE STRING "Default Agent Ser set(DEFAULT_RETRY_INTERVAL 30000 CACHE STRING "Default Agent retry interval (30s)") +set(DEFAULT_BATCH_INTERVAL 10000 CACHE STRING "Default Agent batch interval (10s)") + +set(DEFAULT_BATCH_SIZE 1000 CACHE STRING "Default Agent batch size limit (1000)") + set(DEFAULT_LOGCOLLECTOR_ENABLED true CACHE BOOL "Default Logcollector enabled") set(BUFFER_SIZE 4096 CACHE STRING "Default Logcollector reading buffer size") diff --git a/src/common/config/include/config.h.in b/src/common/config/include/config.h.in index 5e450165e3..c59c6fd5e1 100644 --- a/src/common/config/include/config.h.in +++ b/src/common/config/include/config.h.in @@ -12,6 +12,8 @@ namespace config { constexpr auto DEFAULT_SERVER_URL = "@DEFAULT_SERVER_URL@"; constexpr auto DEFAULT_RETRY_INTERVAL = @DEFAULT_RETRY_INTERVAL@; + constexpr auto DEFAULT_BATCH_INTERVAL = @DEFAULT_BATCH_INTERVAL@; + constexpr auto DEFAULT_BATCH_SIZE = @DEFAULT_BATCH_SIZE@; } namespace logcollector