diff --git a/src/agent/include/agent.hpp b/src/agent/include/agent.hpp index bda3591082..04b228be95 100644 --- a/src/agent/include/agent.hpp +++ b/src/agent/include/agent.hpp @@ -13,7 +13,9 @@ #include +#include #include +#include #include /// @brief Agent class @@ -76,4 +78,13 @@ class Agent /// @brief Centralized configuration centralized_configuration::CentralizedConfiguration m_centralizedConfiguration; + + /// @brief Mutex to coordinate agent reload + std::mutex m_reloadMutex; + + /// @brief Indicates if the agent is running + std::atomic m_running = true; + + /// @brief Agent thread count + size_t m_agentThreadCount; }; diff --git a/src/agent/src/agent.cpp b/src/agent/src/agent.cpp index 63b06ce415..893760834b 100644 --- a/src/agent/src/agent.cpp +++ b/src/agent/src/agent.cpp @@ -58,16 +58,14 @@ Agent::Agent(const std::string& configFilePath, std::unique_ptr m_centralizedConfiguration.ReloadModulesFunction([this]() { ReloadModules(); }); - auto agentThreadCount = + m_agentThreadCount = m_configurationParser->GetConfig("agent", "thread_count").value_or(config::DEFAULT_THREAD_COUNT); - if (agentThreadCount < config::DEFAULT_THREAD_COUNT) + if (m_agentThreadCount < config::DEFAULT_THREAD_COUNT) { LogWarn("thread_count must be greater than {}. Using default value.", config::DEFAULT_THREAD_COUNT); - agentThreadCount = config::DEFAULT_THREAD_COUNT; + m_agentThreadCount = config::DEFAULT_THREAD_COUNT; } - - m_taskManager.Start(agentThreadCount); } Agent::~Agent() @@ -77,15 +75,34 @@ Agent::~Agent() void Agent::ReloadModules() { - LogInfo("Reloading Modules"); - m_configurationParser->ReloadConfiguration(); - m_moduleManager.Stop(); - m_moduleManager.Setup(); - m_moduleManager.Start(); + std::lock_guard lock(m_reloadMutex); + + if (m_running.load()) + { + try + { + LogInfo("Reloading Modules"); + m_configurationParser->ReloadConfiguration(); + m_moduleManager.Stop(); + m_moduleManager.Setup(); + m_moduleManager.Start(); + LogInfo("Modules reloaded"); + } + catch (const std::exception& e) + { + LogError("Error reloading modules: {}", e.what()); + } + } + else + { + LogWarn("Agent cannot reload modules while start up or shutdown is in progress."); + } } void Agent::Run() { + m_taskManager.Start(m_agentThreadCount); + // Check if the server recognizes the agent m_communicator.SendAuthenticationRequest(); @@ -144,8 +161,18 @@ void Agent::Run() }), "CommandsProcessing"); + { + std::unique_lock lock(m_reloadMutex); + m_running.store(true); + } + m_signalHandler->WaitForSignal(); + { + std::unique_lock lock(m_reloadMutex); + m_running.store(false); + } + m_commandHandler.Stop(); m_communicator.Stop(); m_moduleManager.Stop(); diff --git a/src/agent/src/signal_handler.cpp b/src/agent/src/signal_handler.cpp index 41613d6488..6515e78261 100644 --- a/src/agent/src/signal_handler.cpp +++ b/src/agent/src/signal_handler.cpp @@ -17,6 +17,7 @@ void SignalHandler::HandleSignal([[maybe_unused]] int signal) void SignalHandler::WaitForSignal() { + KeepRunning.store(true); std::unique_lock lock(m_cvMutex); m_cv.wait(lock, [] { return !KeepRunning.load(); }); } diff --git a/src/agent/task_manager/include/task_manager.hpp b/src/agent/task_manager/include/task_manager.hpp index 694618663b..8625a5bcee 100644 --- a/src/agent/task_manager/include/task_manager.hpp +++ b/src/agent/task_manager/include/task_manager.hpp @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -53,4 +54,7 @@ class TaskManager : public ITaskManager> /// @brief Number of enqueued threads size_t m_numEnqueuedThreads = 0; + + /// @brief Mutex to control Start and Stop operations + mutable std::mutex m_mutex; }; diff --git a/src/agent/task_manager/src/task_manager.cpp b/src/agent/task_manager/src/task_manager.cpp index 99f6e760ef..1409da843a 100644 --- a/src/agent/task_manager/src/task_manager.cpp +++ b/src/agent/task_manager/src/task_manager.cpp @@ -15,6 +15,8 @@ TaskManager::~TaskManager() void TaskManager::Start(size_t numThreads) { + std::lock_guard lock(m_mutex); + if (m_work || !m_threads.empty()) { LogError("Task manager already started"); @@ -32,9 +34,11 @@ void TaskManager::Start(size_t numThreads) void TaskManager::Stop() { + std::lock_guard lock(m_mutex); + if (m_work) { - m_work->reset(); + m_work.reset(); } if (!m_ioContext.stopped()) @@ -59,6 +63,8 @@ void TaskManager::Stop() void TaskManager::EnqueueTask(std::function task, const std::string& taskID) { + std::lock_guard lock(m_mutex); + if (++m_numEnqueuedThreads > m_threads.size()) { LogError("Enqueued more threaded tasks than available threads"); @@ -82,6 +88,8 @@ void TaskManager::EnqueueTask(std::function task, const std::string& tas void TaskManager::EnqueueTask(boost::asio::awaitable task, const std::string& taskID) { + std::lock_guard lock(m_mutex); + // NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines) boost::asio::co_spawn( m_ioContext, @@ -105,5 +113,6 @@ void TaskManager::EnqueueTask(boost::asio::awaitable task, const std::stri size_t TaskManager::GetNumEnqueuedThreads() const { + std::lock_guard lock(m_mutex); return m_numEnqueuedThreads; } diff --git a/src/modules/include/moduleManager.hpp b/src/modules/include/moduleManager.hpp index c8e7ce1e8b..fe1127d0a6 100644 --- a/src/modules/include/moduleManager.hpp +++ b/src/modules/include/moduleManager.hpp @@ -1,15 +1,18 @@ #pragma once -#include -#include -#include -#include #include #include #include #include +#include +#include +#include +#include +#include + + class ModuleManager { public: ModuleManager(const std::function& pushMessage, std::shared_ptr configurationParser, std::string uuid) @@ -46,6 +49,15 @@ class ModuleManager { void AddModules(); std::shared_ptr GetModule(const std::string & name); + + /// @brief Start the modules + /// + /// This function begins the procedure to start the modules and blocks until the Start function + /// for each module has been called. However, it does not guarantee that the modules are fully + /// operational upon return; they may still be in the process of initializing. + /// + /// @note Call this function before interacting with the modules to ensure the startup process is initiated. + /// @warning Ensure the modules have fully started before performing any operations that depend on them. void Start(); void Setup(); void Stop(); @@ -56,4 +68,6 @@ class ModuleManager { std::function m_pushMessage; std::shared_ptr m_configurationParser; std::string m_agentUUID; + std::mutex m_mutex; + std::atomic m_started {0}; }; diff --git a/src/modules/src/moduleManager.cpp b/src/modules/src/moduleManager.cpp index 6c044222ca..d08ff7adbb 100644 --- a/src/modules/src/moduleManager.cpp +++ b/src/modules/src/moduleManager.cpp @@ -24,31 +24,62 @@ void ModuleManager::AddModules() { Setup(); } -std::shared_ptr ModuleManager::GetModule(const std::string & name) { - auto it = m_modules.find(name); - if (it != m_modules.end()) { +std::shared_ptr ModuleManager::GetModule(const std::string & name) +{ + if (auto it = m_modules.find(name); it != m_modules.end()) + { return it->second; } return nullptr; } -void ModuleManager::Start() { +void ModuleManager::Start() +{ + std::unique_lock lock(m_mutex); + m_taskManager.Start(m_modules.size()); + std::condition_variable cv; + for (const auto &[_, module] : m_modules) { - m_taskManager.EnqueueTask([module]() { module->Start(); }, module->Name()); + m_taskManager.EnqueueTask( + [module, this, &cv] + { + ++m_started; + cv.notify_one(); + module->Start(); + } + , module->Name() + ); } + + cv.wait( + lock, + [this] + { + return m_started.load() == static_cast(m_modules.size()); + } + ); } -void ModuleManager::Setup() { - for (const auto &[_, module] : m_modules) { +void ModuleManager::Setup() +{ + std::lock_guard lock(m_mutex); + + for (const auto &[_, module] : m_modules) + { module->Setup(m_configurationParser); } } -void ModuleManager::Stop() { - for (const auto &[_, module] : m_modules) { +void ModuleManager::Stop() +{ + std::lock_guard lock(m_mutex); + + for (const auto &[_, module] : m_modules) + { module->Stop(); + m_started--; } m_taskManager.Stop(); }