diff --git a/Cargo.lock b/Cargo.lock index 57a14275..fed754fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2527,6 +2527,7 @@ dependencies = [ "envsubst", "fail", "failure", + "failure_derive", "filetime", "futures", "glob", diff --git a/Cargo.toml b/Cargo.toml index eabeb8a7..74fb04e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ env_logger = "0.8" envsubst = "0.2" fail = "0.4" failure = "0.1" +failure_derive = "0.1.8" filetime = "0.2" futures = "0.3" glob = "0.3" diff --git a/src/dbus/mod.rs b/src/dbus/mod.rs index ec85a0fe..54778d60 100644 --- a/src/dbus/mod.rs +++ b/src/dbus/mod.rs @@ -2,6 +2,8 @@ mod experimental; use experimental::Experimental; +mod updates; +use updates::Updates; use crate::update_agent::UpdateAgent; use actix::prelude::*; @@ -12,6 +14,9 @@ use log::trace; use zbus::fdo; use zvariant::ObjectPath; +const ZINCATI_BUS_NAME: &str = "org.coreos.zincati"; +const ZINCATI_OBJECT_PATH: &str = "/org/coreos/zincati"; + pub struct DBusService { agent_addr: Addr, } @@ -31,7 +36,7 @@ impl DBusService { let connection = zbus::Connection::new_system()?; fdo::DBusProxy::new(&connection)?.request_name( - "org.coreos.zincati", + ZINCATI_BUS_NAME, fdo::RequestNameFlags::ReplaceExisting.into(), )?; @@ -40,9 +45,16 @@ impl DBusService { agent_addr: self.agent_addr.clone(), }; object_server.at( - &ObjectPath::try_from("/org/coreos/zincati")?, + &ObjectPath::try_from(ZINCATI_OBJECT_PATH)?, experimental_interface, )?; + let updates_interface = Updates { + agent_addr: self.agent_addr.clone(), + }; + object_server.at( + &ObjectPath::try_from(ZINCATI_OBJECT_PATH)?, + updates_interface, + )?; loop { if let Err(err) = object_server.try_handle_next() { diff --git a/src/dbus/updates.rs b/src/dbus/updates.rs new file mode 100644 index 00000000..a923fc77 --- /dev/null +++ b/src/dbus/updates.rs @@ -0,0 +1,82 @@ +//! Updates interface for ushering the update agent to various states. + +use crate::update_agent::{RefreshTick, RefreshTickCommand, UpdateAgent, UpdateAgentState}; +use actix::Addr; +use failure::Error; +use fdo::Error::Failed; +use futures::prelude::*; +use tokio::runtime::Runtime; +use zbus::{dbus_interface, fdo}; + +/// Updates interface for checking for and finalizing updates. +pub(crate) struct Updates { + pub(crate) agent_addr: Addr, +} + +impl Updates { + /// Send msg to the update agent actor and wait for the returned future to resolve. + fn send_msg_to_agent( + &self, + msg: RefreshTick, + ) -> Result, fdo::Error> { + let refresh_time_fut = self.agent_addr.send(msg).map_err(|e| { + let err_msg = format!("failed to get last refresh time from agent actor: {}", e); + log::error!("LastRefreshTime D-Bus method call: {}", err_msg); + Failed(err_msg) + }); + + Runtime::new() + .map_err(|e| { + let err_msg = format!("failed to create runtime to execute future: {}", e); + log::error!("{}", err_msg); + Failed(err_msg) + }) + .and_then(|mut runtime| runtime.block_on(refresh_time_fut)) + } +} + +#[dbus_interface(name = "org.coreos.zincati.Updates")] +impl Updates { + /// Check for update immediately. + fn check_update(&self) -> fdo::Result> { + let msg = RefreshTick { + command: RefreshTickCommand::CheckUpdate, + }; + + self.send_msg_to_agent(msg).and_then(|res| match res { + Ok(state) => match state { + UpdateAgentState::NoNewUpdate => Ok(vec![]), + UpdateAgentState::UpdateAvailable((release, _)) => Ok(vec![release.version]), + _ => { + let err_msg = "update agent reached unexpected state after update check"; + log::error!("CheckUpdate D-Bus method call: {}", err_msg); + Err(Failed(String::from(err_msg))) + } + }, + Err(e) => Err(Failed(format!("{}", e))), + }) + } + + /// Finalize update immediately. + fn finalize_update(&self, force: bool) -> fdo::Result> { + let msg = RefreshTick { + command: RefreshTickCommand::FinalizeUpdate { force }, + }; + + self.send_msg_to_agent(msg).and_then(|res| match res { + Ok(state) => match state { + UpdateAgentState::UpdateStaged(_) => { + Err(Failed(String::from("update finalization attempt failed"))) + } + UpdateAgentState::UpdateFinalized(release) => Ok(vec![release.version]), + _ => { + let err_msg = + "update agent reached unexpected state after finalization attempt"; + log::error!("FinalizeUpdate D-Bus method call: {}", err_msg); + Err(Failed(String::from(err_msg))) + } + }, + Err(e) => Err(Failed(format!("{}", e))), + }) + } +} diff --git a/src/main.rs b/src/main.rs index bb641302..552755e9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,8 @@ extern crate fail; #[macro_use] extern crate prometheus; +#[macro_use] +extern crate failure_derive; // Cincinnati client. mod cincinnati; diff --git a/src/update_agent/actor.rs b/src/update_agent/actor.rs index 488436bb..273f3f49 100644 --- a/src/update_agent/actor.rs +++ b/src/update_agent/actor.rs @@ -29,7 +29,7 @@ impl Actor for UpdateAgent { } // Kick-start the state machine. - Self::tick_now(ctx); + self.tick_now(ctx); } } @@ -48,16 +48,64 @@ impl Handler for UpdateAgent { } } -pub(crate) struct RefreshTick {} +/// Error thrown when the command that attempted to initiate a refresh tick +/// is not permitted to do so in the current state of the update agent. +#[derive(Debug, Fail)] +struct TickPermissionError {} + +impl std::fmt::Display for TickPermissionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "command not permitted in current update agent state") + } +} + +pub enum RefreshTickCommand { + /// Command to check for updates. + CheckUpdate, + /// Command to finalize an update. + FinalizeUpdate { force: bool }, + /// Tick initiated by update agent itself. + SelfTick, +} + +/// A message to trigger an update agent tick. +pub struct RefreshTick { + /// Command that initiated the tick. + pub(crate) command: RefreshTickCommand, +} + +impl RefreshTick { + /// Return whether the command in the command field is allowed to initiate + /// a refresh tick. + fn check_state(&self, cur_state: &UpdateAgentState) -> bool { + match self.command { + RefreshTickCommand::CheckUpdate => { + matches!(cur_state, UpdateAgentState::NoNewUpdate) + } + RefreshTickCommand::FinalizeUpdate { force: _ } => { + matches!(cur_state, UpdateAgentState::UpdateAvailable { .. }) + } + // SelfTicks are always allowed. + RefreshTickCommand::SelfTick => true, + } + } +} impl Message for RefreshTick { - type Result = Result<(), Error>; + type Result = Result; } impl Handler for UpdateAgent { - type Result = ResponseActFuture>; + type Result = ResponseActFuture>; + + /// Return the state of the update agent's state machine after msg is handled. + fn handle(&mut self, msg: RefreshTick, ctx: &mut Self::Context) -> Self::Result { + // Make sure that the command that sent the RefreshTick is permitted to be + // called in update agent's current state. + if !msg.check_state(&self.state) { + return Box::pin(actix::fut::err(Error::from(TickPermissionError {}))); + } - fn handle(&mut self, _msg: RefreshTick, ctx: &mut Self::Context) -> Self::Result { let tick_timestamp = chrono::Utc::now(); LAST_REFRESH.set(tick_timestamp.timestamp()); @@ -75,7 +123,7 @@ impl Handler for UpdateAgent { } UpdateAgentState::UpdateStaged((release, _)) => { let update = release.clone(); - self.tick_finalize_update(update) + self.tick_finalize_update(update, &msg) } UpdateAgentState::UpdateFinalized(release) => { let update = release.clone(); @@ -91,11 +139,11 @@ impl Handler for UpdateAgent { "scheduling next agent refresh in {} seconds", pause.as_secs() ); - Self::tick_later(ctx, pause); + actor.tick_later(ctx, pause); } else { let update_timestamp = chrono::Utc::now(); actor.state_changed = update_timestamp; - Self::tick_now(ctx); + actor.tick_now(ctx); } actix::fut::ready(()) }); @@ -103,19 +151,38 @@ impl Handler for UpdateAgent { // Process state machine refresh ticks sequentially. ctx.wait(update_machine); - Box::pin(actix::fut::ok(())) + Box::pin(actix::fut::ok(self.state.clone())) } } impl UpdateAgent { + /// Cancel the scheduled refresh tick whose handle is stored in the update agent's + /// `tick_later_handle` field, if any. + fn cancel_scheduled_ticks(&mut self, ctx: &mut Context) { + if let Some(handle) = self.tick_later_handle { + ctx.cancel_future(handle); + self.tick_later_handle = None; + } + } + /// Schedule an immediate refresh of the state machine. - pub fn tick_now(ctx: &mut Context) { - ctx.notify(RefreshTick {}) + pub fn tick_now(&mut self, ctx: &mut Context) { + // Cancel scheduled ticks, if any. + self.cancel_scheduled_ticks(ctx); + ctx.notify(RefreshTick { + command: RefreshTickCommand::SelfTick, + }) } /// Schedule a delayed refresh of the state machine. - pub fn tick_later(ctx: &mut Context, after: std::time::Duration) -> actix::SpawnHandle { - ctx.notify_later(RefreshTick {}, after) + pub fn tick_later(&mut self, ctx: &mut Context, after: std::time::Duration) { + let handle = ctx.notify_later( + RefreshTick { + command: RefreshTickCommand::SelfTick, + }, + after, + ); + self.tick_later_handle = Some(handle); } /// Pausing interval between state-machine refresh cycles. @@ -322,12 +389,23 @@ impl UpdateAgent { fn tick_finalize_update( &mut self, release: Release, + msg: &RefreshTick, ) -> ResponseActFuture> { trace!("trying to finalize an update"); - let strategy_can_finalize = self.strategy.can_finalize(); - let state_change = actix::fut::wrap_future::<_, Self>(strategy_can_finalize) - .then(|strategy_can_finalize, actor, _ctx| { + let mut strategy_can_finalize = false; + let mut usersessions_can_finalize = false; + if let RefreshTickCommand::FinalizeUpdate { force } = msg.command { + strategy_can_finalize = force; + // If msg's associated command is FinalizeUpdate, ignore logged in sessions + // and allow finalizations even when active user sessions are present. + usersessions_can_finalize = true; + } + + let strategy_can_finalize_fut = self.strategy.can_finalize(); + let state_change = actix::fut::wrap_future::<_, Self>(strategy_can_finalize_fut) + .then(move |can_finalize, actor, _ctx| { + strategy_can_finalize = strategy_can_finalize || can_finalize; if !strategy_can_finalize { update_unit_status(&format!( "update staged: {}; reboot pending due to update strategy", @@ -338,7 +416,8 @@ impl UpdateAgent { actor.state.update_staged(release); Box::pin(actix::fut::err(())) } else { - let usersessions_can_finalize = actor.state.usersessions_can_finalize(); + usersessions_can_finalize = + usersessions_can_finalize || actor.state.usersessions_can_finalize(); if !usersessions_can_finalize { update_unit_status(&format!( "update staged: {}; reboot delayed due to active user sessions", diff --git a/src/update_agent/mod.rs b/src/update_agent/mod.rs index 1aca4183..c2fb8491 100644 --- a/src/update_agent/mod.rs +++ b/src/update_agent/mod.rs @@ -1,14 +1,14 @@ //! Update agent. mod actor; -pub use actor::LastRefresh; +pub use actor::{LastRefresh, RefreshTick, RefreshTickCommand}; use crate::cincinnati::Cincinnati; use crate::config::Settings; use crate::identity::Identity; use crate::rpm_ostree::{Release, RpmOstreeClient}; use crate::strategy::UpdateStrategy; -use actix::Addr; +use actix::{Addr, SpawnHandle}; use chrono::prelude::*; use failure::{bail, Fallible, ResultExt}; use prometheus::{IntCounter, IntGauge}; @@ -89,7 +89,7 @@ where /// State machine for the agent. #[derive(Clone, Debug, PartialEq, Eq)] -enum UpdateAgentState { +pub enum UpdateAgentState { /// Initial state upon actor start. StartState, /// Agent initialized. @@ -349,6 +349,8 @@ pub(crate) struct UpdateAgent { state: UpdateAgentState, /// Timestamp of last state transition. state_changed: DateTime, + /// Handle to future created by `tick_later`. + tick_later_handle: Option, } impl UpdateAgent { @@ -368,6 +370,7 @@ impl UpdateAgent { state: UpdateAgentState::default(), strategy: cfg.strategy, state_changed: chrono::Utc::now(), + tick_later_handle: None, }; Ok(agent)