Skip to content

Commit

Permalink
refactor: use max batch size only in communicator
Browse files Browse the repository at this point in the history
get messages callback will now take an int parameter for the batch size
  • Loading branch information
jr0me committed Dec 3, 2024
1 parent 6210dd4 commit 410da58
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 99 deletions.
8 changes: 4 additions & 4 deletions src/agent/communicator/include/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ namespace communicator
/// @brief Processes messages in a stateful manner
/// @param getMessages A function to retrieve a message from the queue
/// @param onSuccess A callback function to execute when a message is processed
boost::asio::awaitable<void>
StatefulMessageProcessingTask(std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> getMessages,
std::function<void(const int, const std::string&)> onSuccess);
boost::asio::awaitable<void> StatefulMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
std::function<void(const int, const std::string&)> onSuccess);

/// @brief Processes messages in a stateless manner
/// @param getMessages A function to retrieve a message from the queue
/// @param onSuccess A callback function to execute when a message is processed
boost::asio::awaitable<void> StatelessMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> getMessages,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
std::function<void(const int, const std::string&)> onSuccess);

/// @brief Retrieves group configuration from the manager
Expand Down
20 changes: 10 additions & 10 deletions src/agent/communicator/include/http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ namespace http_client
/// @param onSuccess Callback for successful request completion
/// @param loopRequestCondition Condition to continue looping requests
/// @return Awaitable task for the HTTP request
boost::asio::awaitable<void>
Co_PerformHttpRequest(std::shared_ptr<std::string> token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::time_t batchInterval,
int batchSize,
std::function<void(const int, const std::string&)> onSuccess = {},
std::function<bool()> loopRequestCondition = {}) override;
boost::asio::awaitable<void> Co_PerformHttpRequest(
std::shared_ptr<std::string> token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::time_t batchInterval,
int batchSize,
std::function<void(const int, const std::string&)> onSuccess = {},
std::function<bool()> loopRequestCondition = {}) override;

/// @brief Performs a synchronous HTTP request
/// @param params Parameters for the request
Expand Down
20 changes: 10 additions & 10 deletions src/agent/communicator/include/ihttp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ namespace http_client
/// @param onSuccess Action to take on successful request
/// @param loopRequestCondition Condition to continue the request loop
/// @return Awaitable task for the HTTP request
virtual boost::asio::awaitable<void>
Co_PerformHttpRequest(std::shared_ptr<std::string> token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::time_t batchInterval,
int batchSize,
std::function<void(const int, const std::string&)> onSuccess = {},
std::function<bool()> loopRequestCondition = {}) = 0;
virtual boost::asio::awaitable<void> Co_PerformHttpRequest(
std::shared_ptr<std::string> token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::time_t batchInterval,
int batchSize,
std::function<void(const int, const std::string&)> onSuccess = {},
std::function<bool()> loopRequestCondition = {}) = 0;

/// @brief Perform an HTTP request and receive the response
/// @param params The parameters for the request
Expand Down
4 changes: 2 additions & 2 deletions src/agent/communicator/src/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ namespace communicator
}

boost::asio::awaitable<void> Communicator::StatefulMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> getMessages,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
std::function<void(const int, const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
Expand Down Expand Up @@ -162,7 +162,7 @@ namespace communicator
}

boost::asio::awaitable<void> Communicator::StatelessMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> getMessages,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
std::function<void(const int, const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
Expand Down
2 changes: 1 addition & 1 deletion src/agent/communicator/src/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ namespace http_client
boost::asio::awaitable<void> HttpClient::Co_PerformHttpRequest(
std::shared_ptr<std::string> token,
HttpRequestParams reqParams,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> messageGetter,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::time_t batchInterval,
Expand Down
107 changes: 57 additions & 50 deletions src/agent/communicator/tests/communicator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines)

using namespace testing;
using GetMessagesFuncType = std::function<boost::asio::awaitable<intStringTuple>(const int)>;

namespace
{
Expand Down Expand Up @@ -51,32 +52,34 @@ TEST(CommunicatorTest, StatefulMessageProcessingTask_Success)
{
auto mockHttpClient = std::make_unique<MockHttpClient>();

auto getMessages = []() -> boost::asio::awaitable<std::tuple<int, std::string>>
auto getMessages = [](const int) -> boost::asio::awaitable<intStringTuple>
{
co_return std::tuple<int, std::string> {1, std::string("message-content")};
co_return intStringTuple {1, std::string("message-content")};
};

std::function<void(const int, const std::string&)> onSuccess = [](const int, const std::string& message)
{
EXPECT_EQ(message, "message-content");
};

auto MockCo_PerformHttpRequest =
[](std::shared_ptr<std::string>,
http_client::HttpRequestParams,
GetMessagesFuncType pGetMessages,
std::function<void()>,
[[maybe_unused]] std::time_t connectionRetry,
[[maybe_unused]] std::time_t batchInterval,
[[maybe_unused]] int batchSize,
std::function<void(const int, const std::string&)> pOnSuccess,
[[maybe_unused]] std::function<bool()> loopRequestCondition) -> boost::asio::awaitable<void>
{
const auto message = co_await pGetMessages(1);
pOnSuccess(std::get<0>(message), std::get<1>(message));
co_return;
};

EXPECT_CALL(*mockHttpClient, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _))
.WillOnce(Invoke(
[](std::shared_ptr<std::string>,
http_client::HttpRequestParams,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> pGetMessages,
std::function<void()>,
[[maybe_unused]] std::time_t connectionRetry,
[[maybe_unused]] std::time_t batchInterval,
[[maybe_unused]] int batchSize,
std::function<void(const int, const std::string&)> pOnSuccess,
[[maybe_unused]] std::function<bool()> loopRequestCondition) -> boost::asio::awaitable<void>
{
const auto message = co_await pGetMessages();
pOnSuccess(std::get<0>(message), std::get<1>(message));
co_return;
}));
.WillOnce(Invoke(MockCo_PerformHttpRequest));

communicator::Communicator communicator(std::move(mockHttpClient), "uuid", "key", nullptr, FUNC);

Expand Down Expand Up @@ -109,22 +112,24 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio
return std::nullopt;
}));

auto MockCo_PerformHttpRequest =
[](std::shared_ptr<std::string> token,
http_client::HttpRequestParams,
[[maybe_unused]] GetMessagesFuncType pGetMessages,
[[maybe_unused]] std::function<void()> onUnauthorized,
[[maybe_unused]] std::time_t connectionRetry,
[[maybe_unused]] std::time_t batchInterval,
[[maybe_unused]] int batchSize,
[[maybe_unused]] std::function<void(const int, const std::string&)> onSuccess,
[[maybe_unused]] std::function<bool()> loopCondition) -> boost::asio::awaitable<void>
{
EXPECT_TRUE(token->empty());
co_return;
};

// A following call to Co_PerformHttpRequest should not have a token
EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _))
.WillOnce(Invoke(
[](std::shared_ptr<std::string> token,
http_client::HttpRequestParams,
[[maybe_unused]] std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> getMessages,
[[maybe_unused]] std::function<void()> onUnauthorized,
[[maybe_unused]] std::time_t connectionRetry,
[[maybe_unused]] std::time_t batchInterval,
[[maybe_unused]] int batchSize,
[[maybe_unused]] std::function<void(const int, const std::string&)> onSuccess,
[[maybe_unused]] std::function<bool()> loopCondition) -> boost::asio::awaitable<void>
{
EXPECT_TRUE(token->empty());
co_return;
}));
.WillOnce(Invoke(MockCo_PerformHttpRequest));

boost::asio::io_context ioContext;

Expand All @@ -134,8 +139,8 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio
{
co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate();
co_await communicatorPtr->StatelessMessageProcessingTask(
[]() -> boost::asio::awaitable<std::tuple<int, std::string>>
{ co_return std::tuple<int, std::string>(1, std::string {"message"}); },
[](const int) -> boost::asio::awaitable<intStringTuple>
{ co_return intStringTuple(1, std::string {"message"}); },
[]([[maybe_unused]] const int, const std::string&) {});
}(),
boost::asio::detached);
Expand Down Expand Up @@ -167,22 +172,24 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken)
}));

std::string capturedToken;

auto MockCo_PerformHttpRequest =
[&capturedToken](std::shared_ptr<std::string> token,
http_client::HttpRequestParams,
[[maybe_unused]] GetMessagesFuncType pGetMessages,
[[maybe_unused]] std::function<void()> onUnauthorized,
[[maybe_unused]] std::time_t connectionRetry,
[[maybe_unused]] std::time_t batchInterval,
[[maybe_unused]] int batchSize,
[[maybe_unused]] std::function<void(const int, const std::string&)> onSuccess,
[[maybe_unused]] std::function<bool()> loopCondition) -> boost::asio::awaitable<void>
{
capturedToken = *token;
co_return;
};

EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _))
.WillOnce(Invoke(
[&capturedToken](
std::shared_ptr<std::string> token,
http_client::HttpRequestParams,
[[maybe_unused]] std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> getMessages,
[[maybe_unused]] std::function<void()> onUnauthorized,
[[maybe_unused]] std::time_t connectionRetry,
[[maybe_unused]] std::time_t batchInterval,
[[maybe_unused]] int batchSize,
[[maybe_unused]] std::function<void(const int, const std::string&)> onSuccess,
[[maybe_unused]] std::function<bool()> loopCondition) -> boost::asio::awaitable<void>
{
capturedToken = *token;
co_return;
}));
.WillOnce(Invoke(MockCo_PerformHttpRequest));

boost::asio::io_context ioContext;

Expand All @@ -192,8 +199,8 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken)
{
co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate();
co_await communicatorPtr->StatelessMessageProcessingTask(
[]() -> boost::asio::awaitable<std::tuple<int, std::string>>
{ co_return std::tuple<int, std::string>(1, std::string {"message"}); },
[](const int) -> boost::asio::awaitable<intStringTuple>
{ co_return intStringTuple(1, std::string {"message"}); },
[]([[maybe_unused]] const int, const std::string&) {});
}(),
boost::asio::detached);
Expand Down
10 changes: 5 additions & 5 deletions src/agent/communicator/tests/http_client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_Success)
SetupMockSocketReadExpectations(boost::beast::http::status::ok);

auto getMessagesCalled = false;
auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable<std::tuple<int, std::string>>
auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable<std::tuple<int, std::string>>
{
getMessagesCalled = true;
co_return std::tuple<int, std::string>(1, "test message");
Expand Down Expand Up @@ -270,7 +270,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_CallbacksNotCalledIfCannotConnect)
SetupMockSocketConnectExpectations(boost::system::errc::make_error_code(boost::system::errc::bad_address));

auto getMessagesCalled = false;
auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable<std::tuple<int, std::string>>
auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable<std::tuple<int, std::string>>
{
getMessagesCalled = true;
co_return std::tuple<int, std::string>(1, "test message");
Expand Down Expand Up @@ -318,7 +318,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncWriteFails
SetupMockSocketWriteExpectations(boost::system::errc::make_error_code(boost::system::errc::bad_address));

auto getMessagesCalled = false;
auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable<std::tuple<int, std::string>>
auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable<std::tuple<int, std::string>>
{
getMessagesCalled = true;
co_return std::tuple<int, std::string>(1, "test message");
Expand Down Expand Up @@ -374,7 +374,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncReadFails)
boost::system::errc::make_error_code(boost::system::errc::bad_address));

auto getMessagesCalled = false;
auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable<std::tuple<int, std::string>>
auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable<std::tuple<int, std::string>>
{
getMessagesCalled = true;
co_return std::tuple<int, std::string>(1, "test message");
Expand Down Expand Up @@ -429,7 +429,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_UnauthorizedCalledWhenAuthorization
SetupMockSocketReadExpectations(boost::beast::http::status::unauthorized);

auto getMessagesCalled = false;
auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable<std::tuple<int, std::string>>
auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable<std::tuple<int, std::string>>
{
getMessagesCalled = true;
co_return std::tuple<int, std::string>(1, "test message");
Expand Down
2 changes: 1 addition & 1 deletion src/agent/communicator/tests/mocks/mock_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class MockHttpClient : public http_client::IHttpClient
Co_PerformHttpRequest,
(std::shared_ptr<std::string> token,
http_client::HttpRequestParams params,
std::function<boost::asio::awaitable<intStringTuple>()> messageGetter,
std::function<boost::asio::awaitable<intStringTuple>(const int)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::time_t batchInterval,
Expand Down
3 changes: 0 additions & 3 deletions src/agent/include/agent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,4 @@ class Agent

/// @brief Centralized configuration
centralized_configuration::CentralizedConfiguration m_centralizedConfiguration;

/// @brief Maximum messages batch size
int m_maxBatchingSize = config::agent::DEFAULT_BATCH_SIZE;
};
17 changes: 4 additions & 13 deletions src/agent/src/agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,6 @@ Agent::Agent(const std::string& configFilePath, std::unique_ptr<ISignalHandler>
[this](std::function<void()> task) { m_taskManager.EnqueueTask(std::move(task)); })
, m_commandHandler(m_dataPath)
{
m_maxBatchingSize =
m_configurationParser.GetConfig<int>("agent", "max_batching_size").value_or(config::agent::DEFAULT_BATCH_SIZE);

if (m_maxBatchingSize < 1)
{
LogWarn("max_batching_size cannot be lower than 1s. Using default value.");
m_maxBatchingSize = config::agent::DEFAULT_BATCH_SIZE;
}

m_centralizedConfiguration.SetGroupIdFunction([this](const std::vector<std::string>& groups)
{ return m_agentInfo.SetGroups(groups); });

Expand All @@ -66,22 +57,22 @@ void Agent::Run()
[this](const int, const std::string& response) { PushCommandsToQueue(m_messageQueue, response); }));

m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask(
[this]()
[this](const int numMessages)
{
return GetMessagesFromQueue(m_messageQueue,
MessageType::STATEFUL,
m_maxBatchingSize,
numMessages,
[this]() { return m_agentInfo.GetMetadataInfo(false); });
},
[this]([[maybe_unused]] const int messageCount, const std::string&)
{ PopMessagesFromQueue(m_messageQueue, MessageType::STATEFUL, messageCount); }));

m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask(
[this]()
[this](const int numMessages)
{
return GetMessagesFromQueue(m_messageQueue,
MessageType::STATELESS,
m_maxBatchingSize,
numMessages,
[this]() { return m_agentInfo.GetMetadataInfo(false); });
},
[this]([[maybe_unused]] const int messageCount, const std::string&)
Expand Down

0 comments on commit 410da58

Please sign in to comment.