Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Commit

Permalink
Implement Abort() API call
Browse files Browse the repository at this point in the history
Signed-off-by: Eugene Smirnov <evgenii.smirnov@here.com>
  • Loading branch information
Eugene Smirnov committed Mar 1, 2019
1 parent e0c3be3 commit 2b32552
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 31 deletions.
5 changes: 4 additions & 1 deletion src/libaktualizr/primary/aktualizr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ std::future<result::UpdateCheck> Aktualizr::CheckUpdates() {
}

std::future<result::Download> Aktualizr::Download(const std::vector<Uptane::Target> &updates) {
std::function<result::Download()> task([this, &updates] { return uptane_client_->downloadImages(updates); });
std::function<result::Download(const api::FlowControlToken *)> task(
[this, &updates](const api::FlowControlToken *token) { return uptane_client_->downloadImages(updates, token); });
return api_queue_.enqueue(task);
}

Expand All @@ -129,6 +130,8 @@ result::Pause Aktualizr::Resume() {
return api_queue_.pause(false) ? result::PauseStatus::kSuccess : result::PauseStatus::kAlreadyRunning;
}

void Aktualizr::Abort() { api_queue_.abort(); }

boost::signals2::connection Aktualizr::SetSignalHandler(std::function<void(shared_ptr<event::BaseEvent>)> &handler) {
return sig_->connect(handler);
}
Expand Down
11 changes: 11 additions & 0 deletions src/libaktualizr/primary/aktualizr.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ class Aktualizr {
*/
result::Pause Resume();

/**
* Requests the currently running command to abort and flushes the command
* queue.
* The `Abort()` function will block until the command queue is empty and
* all currently executing commands have stopped. You can also call Abort()
* on a previously paused class instance, this will clean the command queue,
* but the Aktualizr will remain in the paused state. To continue execution
* at some later point one needs to call Resume().
*/
void Abort();

/**
* Synchronously run an uptane cycle: check for updates, download any new
* targets, install them, and send a manifest back to the server.
Expand Down
10 changes: 6 additions & 4 deletions src/libaktualizr/primary/sotauptaneclient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,8 @@ std::unique_ptr<Uptane::Target> SotaUptaneClient::findTargetInDelegationTree(con
return findTargetHelper(*toplevel_targets, target, 0, false);
}

result::Download SotaUptaneClient::downloadImages(const std::vector<Uptane::Target> &targets) {
result::Download SotaUptaneClient::downloadImages(const std::vector<Uptane::Target> &targets,
const api::FlowControlToken *token) {
// Uptane step 4 - download all the images and verify them against the metadata (for OSTree - pull without
// deploying)
std::lock_guard<std::mutex> guard(download_mutex);
Expand All @@ -922,7 +923,7 @@ result::Download SotaUptaneClient::downloadImages(const std::vector<Uptane::Targ
}
}
for (auto it = targets.cbegin(); it != targets.cend(); ++it) {
auto res = downloadImage(*it);
auto res = downloadImage(*it, token);
if (res.first) {
downloaded_targets.push_back(res.second);
}
Expand Down Expand Up @@ -954,7 +955,8 @@ void SotaUptaneClient::resumeFetching() {
report_queue->enqueue(std_::make_unique<DeviceResumedReport>(correlation_id));
}

std::pair<bool, Uptane::Target> SotaUptaneClient::downloadImage(Uptane::Target target) {
std::pair<bool, Uptane::Target> SotaUptaneClient::downloadImage(Uptane::Target target,
const api::FlowControlToken *token) {
// TODO: support downloading encrypted targets from director
// TODO: check if the file is already there before downloading

Expand All @@ -968,7 +970,7 @@ std::pair<bool, Uptane::Target> SotaUptaneClient::downloadImage(Uptane::Target t
int tries = 3;
std::chrono::milliseconds wait(500);
while ((tries--) != 0) {
success = uptane_fetcher->fetchVerifyTarget(target);
success = uptane_fetcher->fetchVerifyTarget(target, token);
if (success) {
break;
} else if (tries != 0) {
Expand Down
6 changes: 3 additions & 3 deletions src/libaktualizr/primary/sotauptaneclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ class SotaUptaneClient {

void initialize();
void addNewSecondary(const std::shared_ptr<Uptane::SecondaryInterface> &sec);
result::Download downloadImages(const std::vector<Uptane::Target> &targets);
result::Download downloadImages(const std::vector<Uptane::Target> &targets,
const api::FlowControlToken *token = nullptr);
void pauseFetching();
void resumeFetching();
void sendDeviceData();
Expand Down Expand Up @@ -162,8 +163,7 @@ class SotaUptaneClient {

bool putManifestSimple();
bool getNewTargets(std::vector<Uptane::Target> *new_targets, unsigned int *ecus_count = nullptr);
bool downloadTargets(const std::vector<Uptane::Target> &targets);
std::pair<bool, Uptane::Target> downloadImage(Uptane::Target target);
std::pair<bool, Uptane::Target> downloadImage(Uptane::Target target, const api::FlowControlToken *token = nullptr);
void rotateSecondaryRoot(Uptane::RepositoryType repo, Uptane::SecondaryInterface &secondary);
bool updateDirectorMeta();
bool checkImagesMetaOffline();
Expand Down
67 changes: 45 additions & 22 deletions src/libaktualizr/utilities/apiqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,14 @@ bool FlowControlToken::canContinue(bool blocking) const {
return state_ == State::kRunning;
}

void FlowControlToken::reset() {
std::lock_guard<std::mutex> g(m_);
state_ = State::kRunning;
}

CommandQueue::~CommandQueue() {
try {
{
std::lock_guard<std::mutex> l(m_);
shutdown_ = true;
}
cv_.notify_all();
if (thread_.joinable()) {
thread_.join();
}
abort(false);
} catch (std::exception& ex) {
LOG_ERROR << "~CommandQueue() exception: " << ex.what() << std::endl;
} catch (...) {
Expand All @@ -56,20 +54,23 @@ CommandQueue::~CommandQueue() {
}

void CommandQueue::run() {
thread_ = std::thread([this] {
std::unique_lock<std::mutex> lock(m_);
for (;;) {
cv_.wait(lock, [this] { return (!queue_.empty() && !paused_) || shutdown_; });
if (shutdown_) {
break;
std::lock_guard<std::mutex> g(thread_m_);
if (!thread_.joinable()) {
thread_ = std::thread([this] {
std::unique_lock<std::mutex> lock(m_);
for (;;) {
cv_.wait(lock, [this] { return (!queue_.empty() && !paused_) || shutdown_; });
if (shutdown_) {
break;
}
auto task = std::move(queue_.front());
queue_.pop();
lock.unlock();
task();
lock.lock();
}
auto task = std::move(queue_.front());
queue_.pop();
lock.unlock();
task();
lock.lock();
}
});
});
}
}

bool CommandQueue::pause(bool do_pause) {
Expand All @@ -85,5 +86,27 @@ bool CommandQueue::pause(bool do_pause) {
return has_effect;
}

void CommandQueue::abort() {}
void CommandQueue::abort(bool restart_thread) {
{
std::lock_guard<std::mutex> thread_g(thread_m_);
{
std::lock_guard<std::mutex> g(m_);
token_.setAbort();
shutdown_ = true;
}
cv_.notify_all();
if (thread_.joinable()) {
thread_.join();
}
{
std::lock_guard<std::mutex> g(m_);
std::queue<std::packaged_task<void()>>().swap(queue_);
token_.reset();
shutdown_ = false;
}
}
if (restart_thread) {
run();
}
}
} // namespace api
22 changes: 21 additions & 1 deletion src/libaktualizr/utilities/apiqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class FlowControlToken {
///
bool canContinue(bool blocking = true) const;

////
//// Sets token to the initial state
////
void reset();

private:
enum class State {
kRunning, // transitions: ->Paused, ->Aborted
Expand All @@ -54,7 +59,7 @@ class CommandQueue {
~CommandQueue();
void run();
bool pause(bool do_pause); // returns true iff pause→resume or resume→pause
void abort();
void abort(bool restart_thread = true);

template <class R>
std::future<R> enqueue(const std::function<R()>& f) {
Expand All @@ -68,10 +73,25 @@ class CommandQueue {
return r;
}

template <class R>
std::future<R> enqueue(const std::function<R(const api::FlowControlToken*)>& f) {
std::packaged_task<R()> task(std::bind(f, &token_));
auto r = task.get_future();
{
std::lock_guard<std::mutex> lock(m_);
queue_.push(std::packaged_task<void()>(std::move(task)));
}
cv_.notify_all();
return r;
}

private:
std::atomic_bool shutdown_{false};
std::atomic_bool paused_{false};

std::thread thread_;
std::mutex thread_m_;

std::queue<std::packaged_task<void()>> queue_;
std::mutex m_;
std::condition_variable cv_;
Expand Down

0 comments on commit 2b32552

Please sign in to comment.