Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Commit

Permalink
Separate API command queue implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Eugene Smirnov <evgenii.smirnov@here.com>
  • Loading branch information
Eugene Smirnov committed Mar 1, 2019
1 parent 5330551 commit e0c3be3
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 132 deletions.
1 change: 0 additions & 1 deletion src/hmi_stub/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ int main(int argc, char *argv[]) {
while (std::getline(std::cin, buffer)) {
boost::algorithm::to_lower(buffer);
if (buffer == "shutdown") {
aktualizr.Shutdown();
break;
} else if (buffer == "senddevicedata") {
aktualizr.SendDeviceData();
Expand Down
90 changes: 22 additions & 68 deletions src/libaktualizr/primary/aktualizr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,18 @@ void Aktualizr::systemSetup() {

void Aktualizr::Initialize() {
uptane_client_->initialize();
api_thread_ = std::thread([&]() {
while (!shutdown_) {
auto task = api_queue_.dequeue();
if (shutdown_) {
return;
}
task();
}
});
api_queue_.run();
}

void Aktualizr::UptaneCycle() {
bool Aktualizr::UptaneCycle() {
result::UpdateCheck update_result = CheckUpdates().get();
if (update_result.updates.size() == 0) {
return;
return true;
}

result::Download download_result = Download(update_result.updates).get();
if (download_result.status != result::DownloadStatus::kSuccess || download_result.updates.size() == 0) {
return;
return true;
}

uptane_client_->uptaneInstall(download_result.updates);
Expand All @@ -72,21 +64,22 @@ void Aktualizr::UptaneCycle() {
// If there are some pending updates then effectively either reboot (ostree) or aktualizr restart (fake pack mngr)
// is required to apply the update(s)
LOG_INFO << "About to exit aktualizr so the pending updates can be applied after reboot";
Shutdown();
return false;
}

if (!uptane_client_->hasPendingUpdates()) {
// If updates were applied and no any reboot/finalization is required then send/put manifest
// as soon as possible, don't wait for config_.uptane.polling_sec
uptane_client_->putManifest();
}

return true;
}

std::future<void> Aktualizr::RunForever() {
std::future<void> future = std::async(std::launch::async, [&]() {
SendDeviceData().get();
while (!shutdown_) {
UptaneCycle();
while (UptaneCycle()) {
std::this_thread::sleep_for(std::chrono::seconds(config_.uptane.polling_sec));
}
uptane_client_->completeInstall();
Expand All @@ -98,81 +91,42 @@ void Aktualizr::AddSecondary(const std::shared_ptr<Uptane::SecondaryInterface> &
uptane_client_->addNewSecondary(secondary);
}

void Aktualizr::Shutdown() {
if (!shutdown_) {
shutdown_ = true;
api_queue_.shutDown();
}
}

std::future<result::CampaignCheck> Aktualizr::CampaignCheck() {
auto promise = std::make_shared<std::promise<result::CampaignCheck>>();
std::function<void()> task([&, promise]() { promise->set_value(uptane_client_->campaignCheck()); });
api_queue_.enqueue(task);
return promise->get_future();
std::function<result::CampaignCheck()> task([this] { return uptane_client_->campaignCheck(); });
return api_queue_.enqueue(task);
}

std::future<void> Aktualizr::CampaignAccept(const std::string &campaign_id) {
auto promise = std::make_shared<std::promise<void>>();
std::function<void()> task([&, promise, campaign_id]() {
uptane_client_->campaignAccept(campaign_id);
promise->set_value();
});
api_queue_.enqueue(task);
return promise->get_future();
std::function<void()> task([this, &campaign_id] { uptane_client_->campaignAccept(campaign_id); });
return api_queue_.enqueue(task);
}

std::future<void> Aktualizr::SendDeviceData() {
auto promise = std::make_shared<std::promise<void>>();
std::function<void()> task([&, promise]() {
uptane_client_->sendDeviceData();
promise->set_value();
});
api_queue_.enqueue(task);
return promise->get_future();
std::function<void()> task([this] { uptane_client_->sendDeviceData(); });
return api_queue_.enqueue(task);
}

std::future<result::UpdateCheck> Aktualizr::CheckUpdates() {
auto promise = std::make_shared<std::promise<result::UpdateCheck>>();
std::function<void()> task([&, promise]() { promise->set_value(uptane_client_->fetchMeta()); });
api_queue_.enqueue(task);
return promise->get_future();
std::function<result::UpdateCheck()> task([this] { return uptane_client_->fetchMeta(); });
return api_queue_.enqueue(task);
}

std::future<result::Download> Aktualizr::Download(const std::vector<Uptane::Target> &updates) {
auto promise = std::make_shared<std::promise<result::Download>>();
std::function<void()> task(
[this, promise, updates]() { promise->set_value(uptane_client_->downloadImages(updates)); });
api_queue_.enqueue(task);
return promise->get_future();
std::function<result::Download()> task([this, &updates] { return uptane_client_->downloadImages(updates); });
return api_queue_.enqueue(task);
}

std::future<result::Install> Aktualizr::Install(const std::vector<Uptane::Target> &updates) {
auto promise = std::make_shared<std::promise<result::Install>>();
std::function<void()> task(
[this, promise, updates]() { promise->set_value(uptane_client_->uptaneInstall(updates)); });
api_queue_.enqueue(task);
return promise->get_future();
std::function<result::Install()> task([this, &updates] { return uptane_client_->uptaneInstall(updates); });
return api_queue_.enqueue(task);
}

result::Pause Aktualizr::Pause() {
if (api_queue_.pause(true)) {
uptane_client_->pauseFetching();

return {result::PauseStatus::kSuccess};
}

return {result::PauseStatus::kAlreadyPaused};
return api_queue_.pause(true) ? result::PauseStatus::kSuccess : result::PauseStatus::kAlreadyPaused;
}

result::Pause Aktualizr::Resume() {
if (api_queue_.pause(false)) {
uptane_client_->resumeFetching();

return {result::PauseStatus::kSuccess};
}

return {result::PauseStatus::kAlreadyRunning};
return api_queue_.pause(false) ? result::PauseStatus::kSuccess : result::PauseStatus::kAlreadyRunning;
}

boost::signals2::connection Aktualizr::SetSignalHandler(std::function<void(shared_ptr<event::BaseEvent>)> &handler) {
Expand Down
62 changes: 3 additions & 59 deletions src/libaktualizr/primary/aktualizr.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,7 @@
#include "sotauptaneclient.h"
#include "storage/invstorage.h"
#include "uptane/secondaryinterface.h"

class ApiQueue {
public:
void enqueue(const std::function<void()>& t) {
std::lock_guard<std::mutex> lock(m_);
q_.push(t);
c_.notify_one();
}

std::function<void()> dequeue() {
std::unique_lock<std::mutex> wait_lock(m_);
while (q_.empty() || paused_) {
c_.wait(wait_lock);
if (shutdown_) {
return std::function<void()>();
}
}
std::function<void()> val = q_.front();
q_.pop();
return val;
}

void shutDown() {
shutdown_ = true;
enqueue(std::function<void()>());
}

// returns true iff pause→resume or resume→pause
bool pause(bool do_pause) {
std::lock_guard<std::mutex> lock(m_);
bool has_effect = paused_ != do_pause;
paused_ = do_pause;
c_.notify_one();

return has_effect;
}

private:
std::queue<std::function<void()>> q_;
mutable std::mutex m_;
std::condition_variable c_;
std::atomic_bool shutdown_{false};
std::atomic_bool paused_{false};
};
#include "utilities/apiqueue.h"

/**
* This class provides the main APIs necessary for launching and controlling
Expand All @@ -66,12 +23,6 @@ class Aktualizr {
/** Aktualizr requires a configuration object. Examples can be found in the
* config directory. */
explicit Aktualizr(Config& config);
~Aktualizr() {
Shutdown();
if (api_thread_.joinable()) {
api_thread_.join();
}
}
Aktualizr(const Aktualizr&) = delete;
Aktualizr& operator=(const Aktualizr&) = delete;

Expand All @@ -88,11 +39,6 @@ class Aktualizr {
*/
std::future<void> RunForever();

/**
* Asynchronously shut aktualizr down.
*/
void Shutdown();

/**
* Check for campaigns.
* Campaigns are a concept outside of Uptane, and allow for user approval of
Expand Down Expand Up @@ -170,7 +116,7 @@ class Aktualizr {
* Synchronously run an uptane cycle: check for updates, download any new
* targets, install them, and send a manifest back to the server.
*/
void UptaneCycle();
bool UptaneCycle();

/**
* Add new secondary to aktualizr. Must be called before Initialize.
Expand Down Expand Up @@ -215,9 +161,7 @@ class Aktualizr {
std::shared_ptr<INvStorage> storage_;
std::shared_ptr<SotaUptaneClient> uptane_client_;
std::shared_ptr<event::Channel> sig_;
std::atomic<bool> shutdown_ = {false};
ApiQueue api_queue_;
std::thread api_thread_;
api::CommandQueue api_queue_;
};

#endif // AKTUALIZR_H_
2 changes: 0 additions & 2 deletions src/libaktualizr/primary/aktualizr_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,6 @@ TEST(Aktualizr, APICheck) {
aktualizr.CheckUpdates();
}
std::this_thread::sleep_for(std::chrono::seconds(12));
aktualizr.Shutdown();
// Wait for the Aktualizr dtor to run in order that processing has finished
}

Expand All @@ -1133,7 +1132,6 @@ TEST(Aktualizr, APICheck) {
aktualizr.CheckUpdates();
}
std::this_thread::sleep_for(std::chrono::milliseconds(800));
aktualizr.Shutdown();
// Wait for the Aktualizr dtor to run in order that processing has finished
}
EXPECT_LT(counter2.total_events(), 100);
Expand Down
51 changes: 50 additions & 1 deletion src/libaktualizr/utilities/apiqueue.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "apiqueue.h"
#include "logging/logging.h"

namespace api {

Expand Down Expand Up @@ -37,4 +38,52 @@ bool FlowControlToken::canContinue(bool blocking) const {
return state_ == State::kRunning;
}

} // namespace Api
CommandQueue::~CommandQueue() {
try {
{
std::lock_guard<std::mutex> l(m_);
shutdown_ = true;
}
cv_.notify_all();
if (thread_.joinable()) {
thread_.join();
}
} catch (std::exception& ex) {
LOG_ERROR << "~CommandQueue() exception: " << ex.what() << std::endl;
} catch (...) {
LOG_ERROR << "~CommandQueue() unknown exception" << std::endl;
}
}

void CommandQueue::run() {
thread_ = std::thread([this] {
std::unique_lock<std::mutex> lock(m_);
for (;;) {
cv_.wait(lock, [this] { return (!queue_.empty() && !paused_) || shutdown_; });
if (shutdown_) {
break;
}
auto task = std::move(queue_.front());
queue_.pop();
lock.unlock();
task();
lock.lock();
}
});
}

bool CommandQueue::pause(bool do_pause) {
bool has_effect;
{
std::lock_guard<std::mutex> lock(m_);
has_effect = paused_ != do_pause;
paused_ = do_pause;
token_.setPause(do_pause);
}
cv_.notify_all();

return has_effect;
}

void CommandQueue::abort() {}
} // namespace api
36 changes: 35 additions & 1 deletion src/libaktualizr/utilities/apiqueue.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
#ifndef AKTUALIZR_APIQUEUE_H
#define AKTUALIZR_APIQUEUE_H

#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>

namespace api {

Expand Down Expand Up @@ -44,5 +49,34 @@ class FlowControlToken {
mutable std::condition_variable cv_;
};

} // namespace Api
class CommandQueue {
public:
~CommandQueue();
void run();
bool pause(bool do_pause); // returns true iff pause→resume or resume→pause
void abort();

template <class R>
std::future<R> enqueue(const std::function<R()>& f) {
std::packaged_task<R()> task(f);
auto r = task.get_future();
{
std::lock_guard<std::mutex> lock(m_);
queue_.push(std::packaged_task<void()>(std::move(task)));
}
cv_.notify_all();
return r;
}

private:
std::atomic_bool shutdown_{false};
std::atomic_bool paused_{false};
std::thread thread_;
std::queue<std::packaged_task<void()>> queue_;
std::mutex m_;
std::condition_variable cv_;
FlowControlToken token_;
};

} // namespace api
#endif // AKTUALIZR_APIQUEUE_H

0 comments on commit e0c3be3

Please sign in to comment.