diff --git a/src/libaktualizr/primary/aktualizr.cc b/src/libaktualizr/primary/aktualizr.cc index 90dc7847c8..957090f69d 100644 --- a/src/libaktualizr/primary/aktualizr.cc +++ b/src/libaktualizr/primary/aktualizr.cc @@ -112,7 +112,8 @@ std::future Aktualizr::CheckUpdates() { } std::future Aktualizr::Download(const std::vector &updates) { - std::function task([this, &updates] { return uptane_client_->downloadImages(updates); }); + std::function task( + [this, &updates](const api::FlowControlToken *token) { return uptane_client_->downloadImages(updates, token); }); return api_queue_.enqueue(task); } @@ -129,6 +130,8 @@ result::Pause Aktualizr::Resume() { return api_queue_.pause(false) ? result::PauseStatus::kSuccess : result::PauseStatus::kAlreadyRunning; } +void Aktualizr::Abort() { api_queue_.abort(); } + boost::signals2::connection Aktualizr::SetSignalHandler(std::function)> &handler) { return sig_->connect(handler); } diff --git a/src/libaktualizr/primary/aktualizr.h b/src/libaktualizr/primary/aktualizr.h index 900e65c742..1f064e98fe 100644 --- a/src/libaktualizr/primary/aktualizr.h +++ b/src/libaktualizr/primary/aktualizr.h @@ -112,6 +112,17 @@ class Aktualizr { */ result::Pause Resume(); + /** + * Requests the currently running command to abort and flushes the command + * queue. + * The `Abort()` function will block until the command queue is empty and + * all currently executing commands have stopped. You can also call Abort() + * on a previously paused class instance, this will clean the command queue, + * but the Aktualizr will remain in the paused state. To continue execution + * at some later point one needs to call Resume(). + */ + void Abort(); + /** * Synchronously run an uptane cycle: check for updates, download any new * targets, install them, and send a manifest back to the server. diff --git a/src/libaktualizr/primary/sotauptaneclient.cc b/src/libaktualizr/primary/sotauptaneclient.cc index 243df54a0c..1c168b0370 100644 --- a/src/libaktualizr/primary/sotauptaneclient.cc +++ b/src/libaktualizr/primary/sotauptaneclient.cc @@ -905,7 +905,8 @@ std::unique_ptr SotaUptaneClient::findTargetInDelegationTree(con return findTargetHelper(*toplevel_targets, target, 0, false); } -result::Download SotaUptaneClient::downloadImages(const std::vector &targets) { +result::Download SotaUptaneClient::downloadImages(const std::vector &targets, + const api::FlowControlToken *token) { // Uptane step 4 - download all the images and verify them against the metadata (for OSTree - pull without // deploying) std::lock_guard guard(download_mutex); @@ -922,7 +923,7 @@ result::Download SotaUptaneClient::downloadImages(const std::vectorenqueue(std_::make_unique(correlation_id)); } -std::pair SotaUptaneClient::downloadImage(Uptane::Target target) { +std::pair SotaUptaneClient::downloadImage(Uptane::Target target, + const api::FlowControlToken *token) { // TODO: support downloading encrypted targets from director // TODO: check if the file is already there before downloading @@ -968,7 +970,7 @@ std::pair SotaUptaneClient::downloadImage(Uptane::Target t int tries = 3; std::chrono::milliseconds wait(500); while ((tries--) != 0) { - success = uptane_fetcher->fetchVerifyTarget(target); + success = uptane_fetcher->fetchVerifyTarget(target, token); if (success) { break; } else if (tries != 0) { diff --git a/src/libaktualizr/primary/sotauptaneclient.h b/src/libaktualizr/primary/sotauptaneclient.h index 6cee092a5a..f681559f05 100644 --- a/src/libaktualizr/primary/sotauptaneclient.h +++ b/src/libaktualizr/primary/sotauptaneclient.h @@ -95,7 +95,8 @@ class SotaUptaneClient { void initialize(); void addNewSecondary(const std::shared_ptr &sec); - result::Download downloadImages(const std::vector &targets); + result::Download downloadImages(const std::vector &targets, + const api::FlowControlToken *token = nullptr); void pauseFetching(); void resumeFetching(); void sendDeviceData(); @@ -162,8 +163,7 @@ class SotaUptaneClient { bool putManifestSimple(); bool getNewTargets(std::vector *new_targets, unsigned int *ecus_count = nullptr); - bool downloadTargets(const std::vector &targets); - std::pair downloadImage(Uptane::Target target); + std::pair downloadImage(Uptane::Target target, const api::FlowControlToken *token = nullptr); void rotateSecondaryRoot(Uptane::RepositoryType repo, Uptane::SecondaryInterface &secondary); bool updateDirectorMeta(); bool checkImagesMetaOffline(); diff --git a/src/libaktualizr/utilities/apiqueue.cc b/src/libaktualizr/utilities/apiqueue.cc index 793c43a6a4..e913636bde 100644 --- a/src/libaktualizr/utilities/apiqueue.cc +++ b/src/libaktualizr/utilities/apiqueue.cc @@ -38,16 +38,14 @@ bool FlowControlToken::canContinue(bool blocking) const { return state_ == State::kRunning; } +void FlowControlToken::reset() { + std::lock_guard g(m_); + state_ = State::kRunning; +} + CommandQueue::~CommandQueue() { try { - { - std::lock_guard l(m_); - shutdown_ = true; - } - cv_.notify_all(); - if (thread_.joinable()) { - thread_.join(); - } + abort(false); } catch (std::exception& ex) { LOG_ERROR << "~CommandQueue() exception: " << ex.what() << std::endl; } catch (...) { @@ -56,20 +54,23 @@ CommandQueue::~CommandQueue() { } void CommandQueue::run() { - thread_ = std::thread([this] { - std::unique_lock lock(m_); - for (;;) { - cv_.wait(lock, [this] { return (!queue_.empty() && !paused_) || shutdown_; }); - if (shutdown_) { - break; + std::lock_guard g(thread_m_); + if (!thread_.joinable()) { + thread_ = std::thread([this] { + std::unique_lock lock(m_); + for (;;) { + cv_.wait(lock, [this] { return (!queue_.empty() && !paused_) || shutdown_; }); + if (shutdown_) { + break; + } + auto task = std::move(queue_.front()); + queue_.pop(); + lock.unlock(); + task(); + lock.lock(); } - auto task = std::move(queue_.front()); - queue_.pop(); - lock.unlock(); - task(); - lock.lock(); - } - }); + }); + } } bool CommandQueue::pause(bool do_pause) { @@ -85,5 +86,27 @@ bool CommandQueue::pause(bool do_pause) { return has_effect; } -void CommandQueue::abort() {} +void CommandQueue::abort(bool restart_thread) { + { + std::lock_guard thread_g(thread_m_); + { + std::lock_guard g(m_); + token_.setAbort(); + shutdown_ = true; + } + cv_.notify_all(); + if (thread_.joinable()) { + thread_.join(); + } + { + std::lock_guard g(m_); + std::queue>().swap(queue_); + token_.reset(); + shutdown_ = false; + } + } + if (restart_thread) { + run(); + } +} } // namespace api diff --git a/src/libaktualizr/utilities/apiqueue.h b/src/libaktualizr/utilities/apiqueue.h index e1dfdc1788..30f51cdf87 100644 --- a/src/libaktualizr/utilities/apiqueue.h +++ b/src/libaktualizr/utilities/apiqueue.h @@ -39,6 +39,11 @@ class FlowControlToken { /// bool canContinue(bool blocking = true) const; + //// + //// Sets token to the initial state + //// + void reset(); + private: enum class State { kRunning, // transitions: ->Paused, ->Aborted @@ -54,7 +59,7 @@ class CommandQueue { ~CommandQueue(); void run(); bool pause(bool do_pause); // returns true iff pause→resume or resume→pause - void abort(); + void abort(bool restart_thread = true); template std::future enqueue(const std::function& f) { @@ -68,10 +73,25 @@ class CommandQueue { return r; } + template + std::future enqueue(const std::function& f) { + std::packaged_task task(std::bind(f, &token_)); + auto r = task.get_future(); + { + std::lock_guard lock(m_); + queue_.push(std::packaged_task(std::move(task))); + } + cv_.notify_all(); + return r; + } + private: std::atomic_bool shutdown_{false}; std::atomic_bool paused_{false}; + std::thread thread_; + std::mutex thread_m_; + std::queue> queue_; std::mutex m_; std::condition_variable cv_;