From 410da5824cb6c357e2b7190f92828864e6f9fe3e Mon Sep 17 00:00:00 2001 From: jr0me Date: Mon, 2 Dec 2024 20:59:58 -0300 Subject: [PATCH] refactor: use max batch size only in communicator get messages callback will now take an int parameter for the batch size --- .../communicator/include/communicator.hpp | 8 +- .../communicator/include/http_client.hpp | 20 ++-- .../communicator/include/ihttp_client.hpp | 20 ++-- src/agent/communicator/src/communicator.cpp | 4 +- src/agent/communicator/src/http_client.cpp | 2 +- .../communicator/tests/communicator_test.cpp | 107 ++++++++++-------- .../communicator/tests/http_client_test.cpp | 10 +- .../tests/mocks/mock_http_client.hpp | 2 +- src/agent/include/agent.hpp | 3 - src/agent/src/agent.cpp | 17 +-- 10 files changed, 94 insertions(+), 99 deletions(-) diff --git a/src/agent/communicator/include/communicator.hpp b/src/agent/communicator/include/communicator.hpp index dccdf7eeef..03c6c6281b 100644 --- a/src/agent/communicator/include/communicator.hpp +++ b/src/agent/communicator/include/communicator.hpp @@ -88,15 +88,15 @@ namespace communicator /// @brief Processes messages in a stateful manner /// @param getMessages A function to retrieve a message from the queue /// @param onSuccess A callback function to execute when a message is processed - boost::asio::awaitable - StatefulMessageProcessingTask(std::function>()> getMessages, - std::function onSuccess); + boost::asio::awaitable StatefulMessageProcessingTask( + std::function>(const int)> getMessages, + std::function onSuccess); /// @brief Processes messages in a stateless manner /// @param getMessages A function to retrieve a message from the queue /// @param onSuccess A callback function to execute when a message is processed boost::asio::awaitable StatelessMessageProcessingTask( - std::function>()> getMessages, + std::function>(const int)> getMessages, std::function onSuccess); /// @brief Retrieves group configuration from the manager diff --git a/src/agent/communicator/include/http_client.hpp b/src/agent/communicator/include/http_client.hpp index 0193a3ab82..cd9857caab 100644 --- a/src/agent/communicator/include/http_client.hpp +++ b/src/agent/communicator/include/http_client.hpp @@ -44,16 +44,16 @@ namespace http_client /// @param onSuccess Callback for successful request completion /// @param loopRequestCondition Condition to continue looping requests /// @return Awaitable task for the HTTP request - boost::asio::awaitable - Co_PerformHttpRequest(std::shared_ptr token, - HttpRequestParams params, - std::function>()> messageGetter, - std::function onUnauthorized, - std::time_t connectionRetry, - std::time_t batchInterval, - int batchSize, - std::function onSuccess = {}, - std::function loopRequestCondition = {}) override; + boost::asio::awaitable Co_PerformHttpRequest( + std::shared_ptr token, + HttpRequestParams params, + std::function>(const int)> messageGetter, + std::function onUnauthorized, + std::time_t connectionRetry, + std::time_t batchInterval, + int batchSize, + std::function onSuccess = {}, + std::function loopRequestCondition = {}) override; /// @brief Performs a synchronous HTTP request /// @param params Parameters for the request diff --git a/src/agent/communicator/include/ihttp_client.hpp b/src/agent/communicator/include/ihttp_client.hpp index fb732c4788..27a6197ef3 100644 --- a/src/agent/communicator/include/ihttp_client.hpp +++ b/src/agent/communicator/include/ihttp_client.hpp @@ -42,16 +42,16 @@ namespace http_client /// @param onSuccess Action to take on successful request /// @param loopRequestCondition Condition to continue the request loop /// @return Awaitable task for the HTTP request - virtual boost::asio::awaitable - Co_PerformHttpRequest(std::shared_ptr token, - HttpRequestParams params, - std::function>()> messageGetter, - std::function onUnauthorized, - std::time_t connectionRetry, - std::time_t batchInterval, - int batchSize, - std::function onSuccess = {}, - std::function loopRequestCondition = {}) = 0; + virtual boost::asio::awaitable Co_PerformHttpRequest( + std::shared_ptr token, + HttpRequestParams params, + std::function>(const int)> messageGetter, + std::function onUnauthorized, + std::time_t connectionRetry, + std::time_t batchInterval, + int batchSize, + std::function onSuccess = {}, + std::function loopRequestCondition = {}) = 0; /// @brief Perform an HTTP request and receive the response /// @param params The parameters for the request diff --git a/src/agent/communicator/src/communicator.cpp b/src/agent/communicator/src/communicator.cpp index ac031b4341..52c15b341d 100644 --- a/src/agent/communicator/src/communicator.cpp +++ b/src/agent/communicator/src/communicator.cpp @@ -133,7 +133,7 @@ namespace communicator } boost::asio::awaitable Communicator::StatefulMessageProcessingTask( - std::function>()> getMessages, + std::function>(const int)> getMessages, std::function onSuccess) { auto onAuthenticationFailed = [this]() @@ -162,7 +162,7 @@ namespace communicator } boost::asio::awaitable Communicator::StatelessMessageProcessingTask( - std::function>()> getMessages, + std::function>(const int)> getMessages, std::function onSuccess) { auto onAuthenticationFailed = [this]() diff --git a/src/agent/communicator/src/http_client.cpp b/src/agent/communicator/src/http_client.cpp index c291c71b4c..bbefdfef1c 100644 --- a/src/agent/communicator/src/http_client.cpp +++ b/src/agent/communicator/src/http_client.cpp @@ -98,7 +98,7 @@ namespace http_client boost::asio::awaitable HttpClient::Co_PerformHttpRequest( std::shared_ptr token, HttpRequestParams reqParams, - std::function>()> messageGetter, + std::function>(const int)> messageGetter, std::function onUnauthorized, std::time_t connectionRetry, std::time_t batchInterval, diff --git a/src/agent/communicator/tests/communicator_test.cpp b/src/agent/communicator/tests/communicator_test.cpp index 5a7e78ab55..62a393fb9a 100644 --- a/src/agent/communicator/tests/communicator_test.cpp +++ b/src/agent/communicator/tests/communicator_test.cpp @@ -20,6 +20,7 @@ // NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines) using namespace testing; +using GetMessagesFuncType = std::function(const int)>; namespace { @@ -51,9 +52,9 @@ TEST(CommunicatorTest, StatefulMessageProcessingTask_Success) { auto mockHttpClient = std::make_unique(); - auto getMessages = []() -> boost::asio::awaitable> + auto getMessages = [](const int) -> boost::asio::awaitable { - co_return std::tuple {1, std::string("message-content")}; + co_return intStringTuple {1, std::string("message-content")}; }; std::function onSuccess = [](const int, const std::string& message) @@ -61,22 +62,24 @@ TEST(CommunicatorTest, StatefulMessageProcessingTask_Success) EXPECT_EQ(message, "message-content"); }; + auto MockCo_PerformHttpRequest = + [](std::shared_ptr, + http_client::HttpRequestParams, + GetMessagesFuncType pGetMessages, + std::function, + [[maybe_unused]] std::time_t connectionRetry, + [[maybe_unused]] std::time_t batchInterval, + [[maybe_unused]] int batchSize, + std::function pOnSuccess, + [[maybe_unused]] std::function loopRequestCondition) -> boost::asio::awaitable + { + const auto message = co_await pGetMessages(1); + pOnSuccess(std::get<0>(message), std::get<1>(message)); + co_return; + }; + EXPECT_CALL(*mockHttpClient, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _)) - .WillOnce(Invoke( - [](std::shared_ptr, - http_client::HttpRequestParams, - std::function>()> pGetMessages, - std::function, - [[maybe_unused]] std::time_t connectionRetry, - [[maybe_unused]] std::time_t batchInterval, - [[maybe_unused]] int batchSize, - std::function pOnSuccess, - [[maybe_unused]] std::function loopRequestCondition) -> boost::asio::awaitable - { - const auto message = co_await pGetMessages(); - pOnSuccess(std::get<0>(message), std::get<1>(message)); - co_return; - })); + .WillOnce(Invoke(MockCo_PerformHttpRequest)); communicator::Communicator communicator(std::move(mockHttpClient), "uuid", "key", nullptr, FUNC); @@ -109,22 +112,24 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio return std::nullopt; })); + auto MockCo_PerformHttpRequest = + [](std::shared_ptr token, + http_client::HttpRequestParams, + [[maybe_unused]] GetMessagesFuncType pGetMessages, + [[maybe_unused]] std::function onUnauthorized, + [[maybe_unused]] std::time_t connectionRetry, + [[maybe_unused]] std::time_t batchInterval, + [[maybe_unused]] int batchSize, + [[maybe_unused]] std::function onSuccess, + [[maybe_unused]] std::function loopCondition) -> boost::asio::awaitable + { + EXPECT_TRUE(token->empty()); + co_return; + }; + // A following call to Co_PerformHttpRequest should not have a token EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _)) - .WillOnce(Invoke( - [](std::shared_ptr token, - http_client::HttpRequestParams, - [[maybe_unused]] std::function>()> getMessages, - [[maybe_unused]] std::function onUnauthorized, - [[maybe_unused]] std::time_t connectionRetry, - [[maybe_unused]] std::time_t batchInterval, - [[maybe_unused]] int batchSize, - [[maybe_unused]] std::function onSuccess, - [[maybe_unused]] std::function loopCondition) -> boost::asio::awaitable - { - EXPECT_TRUE(token->empty()); - co_return; - })); + .WillOnce(Invoke(MockCo_PerformHttpRequest)); boost::asio::io_context ioContext; @@ -134,8 +139,8 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio { co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate(); co_await communicatorPtr->StatelessMessageProcessingTask( - []() -> boost::asio::awaitable> - { co_return std::tuple(1, std::string {"message"}); }, + [](const int) -> boost::asio::awaitable + { co_return intStringTuple(1, std::string {"message"}); }, []([[maybe_unused]] const int, const std::string&) {}); }(), boost::asio::detached); @@ -167,22 +172,24 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken) })); std::string capturedToken; + + auto MockCo_PerformHttpRequest = + [&capturedToken](std::shared_ptr token, + http_client::HttpRequestParams, + [[maybe_unused]] GetMessagesFuncType pGetMessages, + [[maybe_unused]] std::function onUnauthorized, + [[maybe_unused]] std::time_t connectionRetry, + [[maybe_unused]] std::time_t batchInterval, + [[maybe_unused]] int batchSize, + [[maybe_unused]] std::function onSuccess, + [[maybe_unused]] std::function loopCondition) -> boost::asio::awaitable + { + capturedToken = *token; + co_return; + }; + EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _)) - .WillOnce(Invoke( - [&capturedToken]( - std::shared_ptr token, - http_client::HttpRequestParams, - [[maybe_unused]] std::function>()> getMessages, - [[maybe_unused]] std::function onUnauthorized, - [[maybe_unused]] std::time_t connectionRetry, - [[maybe_unused]] std::time_t batchInterval, - [[maybe_unused]] int batchSize, - [[maybe_unused]] std::function onSuccess, - [[maybe_unused]] std::function loopCondition) -> boost::asio::awaitable - { - capturedToken = *token; - co_return; - })); + .WillOnce(Invoke(MockCo_PerformHttpRequest)); boost::asio::io_context ioContext; @@ -192,8 +199,8 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken) { co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate(); co_await communicatorPtr->StatelessMessageProcessingTask( - []() -> boost::asio::awaitable> - { co_return std::tuple(1, std::string {"message"}); }, + [](const int) -> boost::asio::awaitable + { co_return intStringTuple(1, std::string {"message"}); }, []([[maybe_unused]] const int, const std::string&) {}); }(), boost::asio::detached); diff --git a/src/agent/communicator/tests/http_client_test.cpp b/src/agent/communicator/tests/http_client_test.cpp index b575d001a8..dd73fbcde9 100644 --- a/src/agent/communicator/tests/http_client_test.cpp +++ b/src/agent/communicator/tests/http_client_test.cpp @@ -216,7 +216,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_Success) SetupMockSocketReadExpectations(boost::beast::http::status::ok); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable> + auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> { getMessagesCalled = true; co_return std::tuple(1, "test message"); @@ -270,7 +270,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_CallbacksNotCalledIfCannotConnect) SetupMockSocketConnectExpectations(boost::system::errc::make_error_code(boost::system::errc::bad_address)); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable> + auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> { getMessagesCalled = true; co_return std::tuple(1, "test message"); @@ -318,7 +318,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncWriteFails SetupMockSocketWriteExpectations(boost::system::errc::make_error_code(boost::system::errc::bad_address)); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable> + auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> { getMessagesCalled = true; co_return std::tuple(1, "test message"); @@ -374,7 +374,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncReadFails) boost::system::errc::make_error_code(boost::system::errc::bad_address)); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable> + auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> { getMessagesCalled = true; co_return std::tuple(1, "test message"); @@ -429,7 +429,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_UnauthorizedCalledWhenAuthorization SetupMockSocketReadExpectations(boost::beast::http::status::unauthorized); auto getMessagesCalled = false; - auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable> + auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable> { getMessagesCalled = true; co_return std::tuple(1, "test message"); diff --git a/src/agent/communicator/tests/mocks/mock_http_client.hpp b/src/agent/communicator/tests/mocks/mock_http_client.hpp index 308ff4eb2c..01d784e720 100644 --- a/src/agent/communicator/tests/mocks/mock_http_client.hpp +++ b/src/agent/communicator/tests/mocks/mock_http_client.hpp @@ -16,7 +16,7 @@ class MockHttpClient : public http_client::IHttpClient Co_PerformHttpRequest, (std::shared_ptr token, http_client::HttpRequestParams params, - std::function()> messageGetter, + std::function(const int)> messageGetter, std::function onUnauthorized, std::time_t connectionRetry, std::time_t batchInterval, diff --git a/src/agent/include/agent.hpp b/src/agent/include/agent.hpp index 5fbd055451..835f9b3eee 100644 --- a/src/agent/include/agent.hpp +++ b/src/agent/include/agent.hpp @@ -69,7 +69,4 @@ class Agent /// @brief Centralized configuration centralized_configuration::CentralizedConfiguration m_centralizedConfiguration; - - /// @brief Maximum messages batch size - int m_maxBatchingSize = config::agent::DEFAULT_BATCH_SIZE; }; diff --git a/src/agent/src/agent.cpp b/src/agent/src/agent.cpp index 0aaf9b9045..153357d777 100644 --- a/src/agent/src/agent.cpp +++ b/src/agent/src/agent.cpp @@ -31,15 +31,6 @@ Agent::Agent(const std::string& configFilePath, std::unique_ptr [this](std::function task) { m_taskManager.EnqueueTask(std::move(task)); }) , m_commandHandler(m_dataPath) { - m_maxBatchingSize = - m_configurationParser.GetConfig("agent", "max_batching_size").value_or(config::agent::DEFAULT_BATCH_SIZE); - - if (m_maxBatchingSize < 1) - { - LogWarn("max_batching_size cannot be lower than 1s. Using default value."); - m_maxBatchingSize = config::agent::DEFAULT_BATCH_SIZE; - } - m_centralizedConfiguration.SetGroupIdFunction([this](const std::vector& groups) { return m_agentInfo.SetGroups(groups); }); @@ -66,22 +57,22 @@ void Agent::Run() [this](const int, const std::string& response) { PushCommandsToQueue(m_messageQueue, response); })); m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask( - [this]() + [this](const int numMessages) { return GetMessagesFromQueue(m_messageQueue, MessageType::STATEFUL, - m_maxBatchingSize, + numMessages, [this]() { return m_agentInfo.GetMetadataInfo(false); }); }, [this]([[maybe_unused]] const int messageCount, const std::string&) { PopMessagesFromQueue(m_messageQueue, MessageType::STATEFUL, messageCount); })); m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask( - [this]() + [this](const int numMessages) { return GetMessagesFromQueue(m_messageQueue, MessageType::STATELESS, - m_maxBatchingSize, + numMessages, [this]() { return m_agentInfo.GetMetadataInfo(false); }); }, [this]([[maybe_unused]] const int messageCount, const std::string&)