From e0c3be3d4b77aa2c9fbbd00a01fcbd631152b740 Mon Sep 17 00:00:00 2001 From: Eugene Smirnov Date: Wed, 27 Feb 2019 17:49:48 +0100 Subject: [PATCH] Separate API command queue implementation Signed-off-by: Eugene Smirnov --- src/hmi_stub/main.cc | 1 - src/libaktualizr/primary/aktualizr.cc | 90 ++++++---------------- src/libaktualizr/primary/aktualizr.h | 62 +-------------- src/libaktualizr/primary/aktualizr_test.cc | 2 - src/libaktualizr/utilities/apiqueue.cc | 51 +++++++++++- src/libaktualizr/utilities/apiqueue.h | 36 ++++++++- 6 files changed, 110 insertions(+), 132 deletions(-) diff --git a/src/hmi_stub/main.cc b/src/hmi_stub/main.cc index e9912fe404..a3b37f9fad 100644 --- a/src/hmi_stub/main.cc +++ b/src/hmi_stub/main.cc @@ -147,7 +147,6 @@ int main(int argc, char *argv[]) { while (std::getline(std::cin, buffer)) { boost::algorithm::to_lower(buffer); if (buffer == "shutdown") { - aktualizr.Shutdown(); break; } else if (buffer == "senddevicedata") { aktualizr.SendDeviceData(); diff --git a/src/libaktualizr/primary/aktualizr.cc b/src/libaktualizr/primary/aktualizr.cc index cb06d0f0c4..90dc7847c8 100644 --- a/src/libaktualizr/primary/aktualizr.cc +++ b/src/libaktualizr/primary/aktualizr.cc @@ -44,26 +44,18 @@ void Aktualizr::systemSetup() { void Aktualizr::Initialize() { uptane_client_->initialize(); - api_thread_ = std::thread([&]() { - while (!shutdown_) { - auto task = api_queue_.dequeue(); - if (shutdown_) { - return; - } - task(); - } - }); + api_queue_.run(); } -void Aktualizr::UptaneCycle() { +bool Aktualizr::UptaneCycle() { result::UpdateCheck update_result = CheckUpdates().get(); if (update_result.updates.size() == 0) { - return; + return true; } result::Download download_result = Download(update_result.updates).get(); if (download_result.status != result::DownloadStatus::kSuccess || download_result.updates.size() == 0) { - return; + return true; } uptane_client_->uptaneInstall(download_result.updates); @@ -72,7 +64,7 @@ void Aktualizr::UptaneCycle() { // If there are some pending updates then effectively either reboot (ostree) or aktualizr restart (fake pack mngr) // is required to apply the update(s) LOG_INFO << "About to exit aktualizr so the pending updates can be applied after reboot"; - Shutdown(); + return false; } if (!uptane_client_->hasPendingUpdates()) { @@ -80,13 +72,14 @@ void Aktualizr::UptaneCycle() { // as soon as possible, don't wait for config_.uptane.polling_sec uptane_client_->putManifest(); } + + return true; } std::future Aktualizr::RunForever() { std::future future = std::async(std::launch::async, [&]() { SendDeviceData().get(); - while (!shutdown_) { - UptaneCycle(); + while (UptaneCycle()) { std::this_thread::sleep_for(std::chrono::seconds(config_.uptane.polling_sec)); } uptane_client_->completeInstall(); @@ -98,81 +91,42 @@ void Aktualizr::AddSecondary(const std::shared_ptr & uptane_client_->addNewSecondary(secondary); } -void Aktualizr::Shutdown() { - if (!shutdown_) { - shutdown_ = true; - api_queue_.shutDown(); - } -} - std::future Aktualizr::CampaignCheck() { - auto promise = std::make_shared>(); - std::function task([&, promise]() { promise->set_value(uptane_client_->campaignCheck()); }); - api_queue_.enqueue(task); - return promise->get_future(); + std::function task([this] { return uptane_client_->campaignCheck(); }); + return api_queue_.enqueue(task); } std::future Aktualizr::CampaignAccept(const std::string &campaign_id) { - auto promise = std::make_shared>(); - std::function task([&, promise, campaign_id]() { - uptane_client_->campaignAccept(campaign_id); - promise->set_value(); - }); - api_queue_.enqueue(task); - return promise->get_future(); + std::function task([this, &campaign_id] { uptane_client_->campaignAccept(campaign_id); }); + return api_queue_.enqueue(task); } std::future Aktualizr::SendDeviceData() { - auto promise = std::make_shared>(); - std::function task([&, promise]() { - uptane_client_->sendDeviceData(); - promise->set_value(); - }); - api_queue_.enqueue(task); - return promise->get_future(); + std::function task([this] { uptane_client_->sendDeviceData(); }); + return api_queue_.enqueue(task); } std::future Aktualizr::CheckUpdates() { - auto promise = std::make_shared>(); - std::function task([&, promise]() { promise->set_value(uptane_client_->fetchMeta()); }); - api_queue_.enqueue(task); - return promise->get_future(); + std::function task([this] { return uptane_client_->fetchMeta(); }); + return api_queue_.enqueue(task); } std::future Aktualizr::Download(const std::vector &updates) { - auto promise = std::make_shared>(); - std::function task( - [this, promise, updates]() { promise->set_value(uptane_client_->downloadImages(updates)); }); - api_queue_.enqueue(task); - return promise->get_future(); + std::function task([this, &updates] { return uptane_client_->downloadImages(updates); }); + return api_queue_.enqueue(task); } std::future Aktualizr::Install(const std::vector &updates) { - auto promise = std::make_shared>(); - std::function task( - [this, promise, updates]() { promise->set_value(uptane_client_->uptaneInstall(updates)); }); - api_queue_.enqueue(task); - return promise->get_future(); + std::function task([this, &updates] { return uptane_client_->uptaneInstall(updates); }); + return api_queue_.enqueue(task); } result::Pause Aktualizr::Pause() { - if (api_queue_.pause(true)) { - uptane_client_->pauseFetching(); - - return {result::PauseStatus::kSuccess}; - } - - return {result::PauseStatus::kAlreadyPaused}; + return api_queue_.pause(true) ? result::PauseStatus::kSuccess : result::PauseStatus::kAlreadyPaused; } result::Pause Aktualizr::Resume() { - if (api_queue_.pause(false)) { - uptane_client_->resumeFetching(); - - return {result::PauseStatus::kSuccess}; - } - - return {result::PauseStatus::kAlreadyRunning}; + return api_queue_.pause(false) ? result::PauseStatus::kSuccess : result::PauseStatus::kAlreadyRunning; } boost::signals2::connection Aktualizr::SetSignalHandler(std::function)> &handler) { diff --git a/src/libaktualizr/primary/aktualizr.h b/src/libaktualizr/primary/aktualizr.h index cc1a28d8c9..900e65c742 100644 --- a/src/libaktualizr/primary/aktualizr.h +++ b/src/libaktualizr/primary/aktualizr.h @@ -12,50 +12,7 @@ #include "sotauptaneclient.h" #include "storage/invstorage.h" #include "uptane/secondaryinterface.h" - -class ApiQueue { - public: - void enqueue(const std::function& t) { - std::lock_guard lock(m_); - q_.push(t); - c_.notify_one(); - } - - std::function dequeue() { - std::unique_lock wait_lock(m_); - while (q_.empty() || paused_) { - c_.wait(wait_lock); - if (shutdown_) { - return std::function(); - } - } - std::function val = q_.front(); - q_.pop(); - return val; - } - - void shutDown() { - shutdown_ = true; - enqueue(std::function()); - } - - // returns true iff pause→resume or resume→pause - bool pause(bool do_pause) { - std::lock_guard lock(m_); - bool has_effect = paused_ != do_pause; - paused_ = do_pause; - c_.notify_one(); - - return has_effect; - } - - private: - std::queue> q_; - mutable std::mutex m_; - std::condition_variable c_; - std::atomic_bool shutdown_{false}; - std::atomic_bool paused_{false}; -}; +#include "utilities/apiqueue.h" /** * This class provides the main APIs necessary for launching and controlling @@ -66,12 +23,6 @@ class Aktualizr { /** Aktualizr requires a configuration object. Examples can be found in the * config directory. */ explicit Aktualizr(Config& config); - ~Aktualizr() { - Shutdown(); - if (api_thread_.joinable()) { - api_thread_.join(); - } - } Aktualizr(const Aktualizr&) = delete; Aktualizr& operator=(const Aktualizr&) = delete; @@ -88,11 +39,6 @@ class Aktualizr { */ std::future RunForever(); - /** - * Asynchronously shut aktualizr down. - */ - void Shutdown(); - /** * Check for campaigns. * Campaigns are a concept outside of Uptane, and allow for user approval of @@ -170,7 +116,7 @@ class Aktualizr { * Synchronously run an uptane cycle: check for updates, download any new * targets, install them, and send a manifest back to the server. */ - void UptaneCycle(); + bool UptaneCycle(); /** * Add new secondary to aktualizr. Must be called before Initialize. @@ -215,9 +161,7 @@ class Aktualizr { std::shared_ptr storage_; std::shared_ptr uptane_client_; std::shared_ptr sig_; - std::atomic shutdown_ = {false}; - ApiQueue api_queue_; - std::thread api_thread_; + api::CommandQueue api_queue_; }; #endif // AKTUALIZR_H_ diff --git a/src/libaktualizr/primary/aktualizr_test.cc b/src/libaktualizr/primary/aktualizr_test.cc index 9459733faa..0a6dc35444 100644 --- a/src/libaktualizr/primary/aktualizr_test.cc +++ b/src/libaktualizr/primary/aktualizr_test.cc @@ -1117,7 +1117,6 @@ TEST(Aktualizr, APICheck) { aktualizr.CheckUpdates(); } std::this_thread::sleep_for(std::chrono::seconds(12)); - aktualizr.Shutdown(); // Wait for the Aktualizr dtor to run in order that processing has finished } @@ -1133,7 +1132,6 @@ TEST(Aktualizr, APICheck) { aktualizr.CheckUpdates(); } std::this_thread::sleep_for(std::chrono::milliseconds(800)); - aktualizr.Shutdown(); // Wait for the Aktualizr dtor to run in order that processing has finished } EXPECT_LT(counter2.total_events(), 100); diff --git a/src/libaktualizr/utilities/apiqueue.cc b/src/libaktualizr/utilities/apiqueue.cc index 3f62af8547..793c43a6a4 100644 --- a/src/libaktualizr/utilities/apiqueue.cc +++ b/src/libaktualizr/utilities/apiqueue.cc @@ -1,4 +1,5 @@ #include "apiqueue.h" +#include "logging/logging.h" namespace api { @@ -37,4 +38,52 @@ bool FlowControlToken::canContinue(bool blocking) const { return state_ == State::kRunning; } -} // namespace Api +CommandQueue::~CommandQueue() { + try { + { + std::lock_guard l(m_); + shutdown_ = true; + } + cv_.notify_all(); + if (thread_.joinable()) { + thread_.join(); + } + } catch (std::exception& ex) { + LOG_ERROR << "~CommandQueue() exception: " << ex.what() << std::endl; + } catch (...) { + LOG_ERROR << "~CommandQueue() unknown exception" << std::endl; + } +} + +void CommandQueue::run() { + thread_ = std::thread([this] { + std::unique_lock lock(m_); + for (;;) { + cv_.wait(lock, [this] { return (!queue_.empty() && !paused_) || shutdown_; }); + if (shutdown_) { + break; + } + auto task = std::move(queue_.front()); + queue_.pop(); + lock.unlock(); + task(); + lock.lock(); + } + }); +} + +bool CommandQueue::pause(bool do_pause) { + bool has_effect; + { + std::lock_guard lock(m_); + has_effect = paused_ != do_pause; + paused_ = do_pause; + token_.setPause(do_pause); + } + cv_.notify_all(); + + return has_effect; +} + +void CommandQueue::abort() {} +} // namespace api diff --git a/src/libaktualizr/utilities/apiqueue.h b/src/libaktualizr/utilities/apiqueue.h index 6810b3e4f2..e1dfdc1788 100644 --- a/src/libaktualizr/utilities/apiqueue.h +++ b/src/libaktualizr/utilities/apiqueue.h @@ -1,9 +1,14 @@ #ifndef AKTUALIZR_APIQUEUE_H #define AKTUALIZR_APIQUEUE_H +#include #include +#include +#include #include #include +#include +#include namespace api { @@ -44,5 +49,34 @@ class FlowControlToken { mutable std::condition_variable cv_; }; -} // namespace Api +class CommandQueue { + public: + ~CommandQueue(); + void run(); + bool pause(bool do_pause); // returns true iff pause→resume or resume→pause + void abort(); + + template + std::future enqueue(const std::function& f) { + std::packaged_task task(f); + auto r = task.get_future(); + { + std::lock_guard lock(m_); + queue_.push(std::packaged_task(std::move(task))); + } + cv_.notify_all(); + return r; + } + + private: + std::atomic_bool shutdown_{false}; + std::atomic_bool paused_{false}; + std::thread thread_; + std::queue> queue_; + std::mutex m_; + std::condition_variable cv_; + FlowControlToken token_; +}; + +} // namespace api #endif // AKTUALIZR_APIQUEUE_H