Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add messages batch size and interval options #377

Merged
merged 15 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions etc/config/wazuh-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ agent:
thread_count: 4
server_url: https://localhost:27000
retry_interval: 30s
events:
batch_interval: 10s
batch_size: 1000
inventory:
enabled: true
interval: 1h
Expand Down
39 changes: 32 additions & 7 deletions src/agent/communicator/include/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,47 @@ namespace communicator

m_retryInterval = getConfigValue.template operator()<std::time_t>("agent", "retry_interval")
.value_or(config::agent::DEFAULT_RETRY_INTERVAL);

m_batchInterval = getConfigValue.template operator()<std::time_t>("events", "batch_interval")
.value_or(config::agent::DEFAULT_BATCH_INTERVAL);

if (m_batchInterval < 1'000 || m_batchInterval > (1'000 * 60 * 60))
{
LogWarn("batch_interval must be between 1s and 1h. Using default value.");
m_batchInterval = config::agent::DEFAULT_BATCH_INTERVAL;
}

m_batchSize = getConfigValue.template operator()<int>("events", "batch_size")
.value_or(config::agent::DEFAULT_BATCH_SIZE);

if (m_batchSize < 1'000 || m_batchSize > 1'000'000)
{
LogWarn("batch_size must be between 1000 and 1000000. Using default value.");
m_batchSize = config::agent::DEFAULT_BATCH_SIZE;
}
}

/// @brief Waits for the authentication token to expire and authenticates again
boost::asio::awaitable<void> WaitForTokenExpirationAndAuthenticate();

/// @brief Retrieves commands from the manager
/// @param onSuccess A callback function to execute when a command is received
boost::asio::awaitable<void> GetCommandsFromManager(std::function<void(const std::string&)> onSuccess);
boost::asio::awaitable<void>
GetCommandsFromManager(std::function<void(const int, const std::string&)> onSuccess);

/// @brief Processes messages in a stateful manner
/// @param getMessages A function to retrieve a message from the queue
/// @param onSuccess A callback function to execute when a message is processed
boost::asio::awaitable<void>
StatefulMessageProcessingTask(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess);
boost::asio::awaitable<void> StatefulMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
std::function<void(const int, const std::string&)> onSuccess);

/// @brief Processes messages in a stateless manner
/// @param getMessages A function to retrieve a message from the queue
/// @param onSuccess A callback function to execute when a message is processed
boost::asio::awaitable<void>
StatelessMessageProcessingTask(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess);
boost::asio::awaitable<void> StatelessMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
std::function<void(const int, const std::string&)> onSuccess);

/// @brief Retrieves group configuration from the manager
/// @param groupName The name of the group to retrieve the configuration for
Expand Down Expand Up @@ -116,6 +135,12 @@ namespace communicator
/// @brief Time in milliseconds between authentication attemps in case of failure
std::time_t m_retryInterval = config::agent::DEFAULT_RETRY_INTERVAL;

/// @brief Time between batch requests
std::time_t m_batchInterval = config::agent::DEFAULT_BATCH_INTERVAL;

/// @brief Maximum number of messages to batch
int m_batchSize = config::agent::DEFAULT_BATCH_SIZE;

/// @brief The server URL
std::string m_serverUrl;

Expand Down
20 changes: 12 additions & 8 deletions src/agent/communicator/include/http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@ namespace http_client
/// @param messageGetter Function to get the message body asynchronously
/// @param onUnauthorized Callback for unauthorized access
/// @param connectionRetry Time in milliseconds to wait before retrying the connection
/// @param batchInterval Time to wait between requests
/// @param batchSize The maximum number of messages to batch
/// @param onSuccess Callback for successful request completion
/// @param loopRequestCondition Condition to continue looping requests
/// @return Awaitable task for the HTTP request
boost::asio::awaitable<void>
Co_PerformHttpRequest(std::shared_ptr<std::string> token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::string>()> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::function<void(const std::string&)> onSuccess = {},
std::function<bool()> loopRequestCondition = {}) override;
boost::asio::awaitable<void> Co_PerformHttpRequest(
std::shared_ptr<std::string> token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::time_t batchInterval,
int batchSize,
std::function<void(const int, const std::string&)> onSuccess = {},
std::function<bool()> loopRequestCondition = {}) override;

/// @brief Performs a synchronous HTTP request
/// @param params Parameters for the request
Expand Down
21 changes: 13 additions & 8 deletions src/agent/communicator/include/ihttp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <memory>
#include <optional>
#include <string>
#include <tuple>

namespace http_client
{
Expand All @@ -36,17 +37,21 @@ namespace http_client
/// @param messageGetter Function to retrieve messages
/// @param onUnauthorized Action to take on unauthorized access
/// @param connectionRetry Time to wait before retrying the connection
/// @param batchInterval Time to wait between requests
/// @param batchSize The maximum number of messages to batch
/// @param onSuccess Action to take on successful request
/// @param loopRequestCondition Condition to continue the request loop
/// @return Awaitable task for the HTTP request
virtual boost::asio::awaitable<void>
Co_PerformHttpRequest(std::shared_ptr<std::string> token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::string>()> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::function<void(const std::string&)> onSuccess = {},
std::function<bool()> loopRequestCondition = {}) = 0;
virtual boost::asio::awaitable<void> Co_PerformHttpRequest(
std::shared_ptr<std::string> token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::time_t batchInterval,
int batchSize,
std::function<void(const int, const std::string&)> onSuccess = {},
std::function<bool()> loopRequestCondition = {}) = 0;

/// @brief Perform an HTTP request and receive the response
/// @param params The parameters for the request
Expand Down
48 changes: 35 additions & 13 deletions src/agent/communicator/src/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ namespace communicator
return std::max(0L, static_cast<long>(m_tokenExpTimeInSeconds - now_seconds));
}

boost::asio::awaitable<void> Communicator::GetCommandsFromManager(std::function<void(const std::string&)> onSuccess)
boost::asio::awaitable<void>
Communicator::GetCommandsFromManager(std::function<void(const int, const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
{
Expand All @@ -79,8 +80,15 @@ namespace communicator

const auto reqParams = http_client::HttpRequestParams(
boost::beast::http::verb::get, m_serverUrl, "/api/v1/commands", m_getHeaderInfo ? m_getHeaderInfo() : "");
co_await m_httpClient->Co_PerformHttpRequest(
m_token, reqParams, {}, onAuthenticationFailed, m_retryInterval, onSuccess, loopCondition);
co_await m_httpClient->Co_PerformHttpRequest(m_token,
reqParams,
{},
onAuthenticationFailed,
m_retryInterval,
m_batchInterval,
m_batchSize,
onSuccess,
loopCondition);
}

boost::asio::awaitable<void> Communicator::WaitForTokenExpirationAndAuthenticate()
Expand Down Expand Up @@ -124,9 +132,9 @@ namespace communicator
}
}

boost::asio::awaitable<void>
Communicator::StatefulMessageProcessingTask(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess)
boost::asio::awaitable<void> Communicator::StatefulMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
std::function<void(const int, const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
{
Expand All @@ -142,13 +150,20 @@ namespace communicator
m_serverUrl,
"/api/v1/events/stateful",
m_getHeaderInfo ? m_getHeaderInfo() : "");
co_await m_httpClient->Co_PerformHttpRequest(
m_token, reqParams, getMessages, onAuthenticationFailed, m_retryInterval, onSuccess, loopCondition);
co_await m_httpClient->Co_PerformHttpRequest(m_token,
reqParams,
getMessages,
onAuthenticationFailed,
m_retryInterval,
m_batchInterval,
m_batchSize,
onSuccess,
loopCondition);
}

boost::asio::awaitable<void>
Communicator::StatelessMessageProcessingTask(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess)
boost::asio::awaitable<void> Communicator::StatelessMessageProcessingTask(
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
std::function<void(const int, const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
{
Expand All @@ -164,8 +179,15 @@ namespace communicator
m_serverUrl,
"/api/v1/events/stateless",
m_getHeaderInfo ? m_getHeaderInfo() : "");
co_await m_httpClient->Co_PerformHttpRequest(
m_token, reqParams, getMessages, onAuthenticationFailed, m_retryInterval, onSuccess, loopCondition);
co_await m_httpClient->Co_PerformHttpRequest(m_token,
reqParams,
getMessages,
onAuthenticationFailed,
m_retryInterval,
m_batchInterval,
m_batchSize,
onSuccess,
loopCondition);
}

void Communicator::TryReAuthenticate()
Expand Down
43 changes: 33 additions & 10 deletions src/agent/communicator/src/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,16 @@ namespace http_client
return req;
}

boost::asio::awaitable<void>
HttpClient::Co_PerformHttpRequest(std::shared_ptr<std::string> token,
HttpRequestParams reqParams,
std::function<boost::asio::awaitable<std::string>()> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::function<void(const std::string&)> onSuccess,
std::function<bool()> loopRequestCondition)
boost::asio::awaitable<void> HttpClient::Co_PerformHttpRequest(
std::shared_ptr<std::string> token,
HttpRequestParams reqParams,
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> messageGetter,
std::function<void()> onUnauthorized,
std::time_t connectionRetry,
std::time_t batchInterval,
int batchSize,
std::function<void(const int, const std::string&)> onSuccess,
std::function<bool()> loopRequestCondition)
{
using namespace std::chrono_literals;

Expand Down Expand Up @@ -150,9 +152,30 @@ namespace http_client
continue;
}

auto messagesCount = 0;

if (messageGetter != nullptr)
{
reqParams.Body = co_await messageGetter();
boost::asio::steady_timer refreshTimer(co_await boost::asio::this_coro::executor);
boost::asio::steady_timer batchTimeoutTimer(co_await boost::asio::this_coro::executor);
batchTimeoutTimer.expires_after(std::chrono::milliseconds(batchInterval));

while (loopRequestCondition != nullptr && loopRequestCondition())
{
const auto messages = co_await messageGetter(batchSize);
messagesCount = std::get<0>(messages);

if (messagesCount >= batchSize || batchTimeoutTimer.expiry() <= std::chrono::steady_clock::now())
{
LogTrace("Messages count: {}", messagesCount);
reqParams.Body = std::get<1>(messages);
break;
}

constexpr int refreshInterval = 100;
refreshTimer.expires_after(std::chrono::milliseconds(refreshInterval));
co_await refreshTimer.async_wait(boost::asio::use_awaitable);
}
}
else
{
Expand Down Expand Up @@ -190,7 +213,7 @@ namespace http_client
{
if (onSuccess != nullptr)
{
onSuccess(boost::beast::buffers_to_string(res.body().data()));
onSuccess(messagesCount, boost::beast::buffers_to_string(res.body().data()));
}
}
else if (res.result() == boost::beast::http::status::unauthorized ||
Expand Down
Loading
Loading