Skip to content

Commit

Permalink
Plug services lifecycle (Shutdown and Kill) (#27)
Browse files Browse the repository at this point in the history
* Plug lifecycle channel

* Pipe lifecycle to services.

* Add explaining doc on handle clone

* Fix tests

* Add missing break on overwatch kill

* Added shutdown service test

* Clippy happy

* Use try_from instead of from for checking duplicated service ids on ServicesLifeCycleHandle

* Added docs
  • Loading branch information
danielSanchezQ authored Oct 9, 2023
1 parent 6e6678b commit ac28d01
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 47 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]

resolver = "2"
members = [
"overwatch-rs",
"overwatch-derive",
Expand Down
8 changes: 3 additions & 5 deletions overwatch-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,14 @@ fn generate_start_all_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::To
let call_start = fields.iter().map(|field| {
let field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
quote! {
self.#field_identifier.service_runner().run()?;
self.#field_identifier.service_runner().run()?
}
});

quote! {
#[::tracing::instrument(skip(self), err)]
fn start_all(&mut self) -> Result<(), ::overwatch_rs::overwatch::Error> {
#( #call_start )*

::std::result::Result::Ok(())
fn start_all(&mut self) -> Result<::overwatch_rs::overwatch::ServicesLifeCycleHandle, ::overwatch_rs::overwatch::Error> {
::std::result::Result::Ok([#( #call_start ),*].try_into()?)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion overwatch-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ color-eyre = "0.6"
async-trait = "0.1"
futures = "0.3"
thiserror = "1.0"
tokio = { version = "1.17", features = ["rt-multi-thread", "sync", "time"] }
tokio = { version = "1.32", features = ["rt-multi-thread", "sync", "time"] }
tokio-stream = {version ="0.1", features = ["sync"] }
tokio-util = "0.7"
tracing = "0.1"
Expand Down
16 changes: 4 additions & 12 deletions overwatch-rs/src/overwatch/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

// crates
use crate::overwatch::AnySettings;
use crate::services::life_cycle::LifecycleMessage;
use tokio::sync::oneshot;

// internal
Expand Down Expand Up @@ -33,18 +34,9 @@ pub struct RelayCommand {
/// Command for managing [`ServiceCore`](crate::services::ServiceCore) lifecycle
#[allow(unused)]
#[derive(Debug)]
pub struct ServiceLifeCycle<R> {
service_id: ServiceId,
reply_channel: ReplyChannel<R>,
}

/// [`ServiceCore`](crate::services::ServiceCore) lifecycle related commands
#[derive(Debug)]
pub enum ServiceLifeCycleCommand {
Shutdown(ServiceLifeCycle<()>),
Kill(ServiceLifeCycle<()>),
Start(ServiceLifeCycle<()>),
Stop(ServiceLifeCycle<()>),
pub struct ServiceLifeCycleCommand {
pub service_id: ServiceId,
pub msg: LifecycleMessage,
}

/// [`Overwatch`](crate::overwatch::Overwatch) lifecycle related commands
Expand Down
87 changes: 87 additions & 0 deletions overwatch-rs/src/overwatch/life_cycle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// std
use std::borrow::Cow;
use std::collections::HashMap;
use std::default::Default;
use std::error::Error;
// crates
use tokio::sync::broadcast::Sender;
// internal
use crate::services::life_cycle::{FinishedSignal, LifecycleHandle, LifecycleMessage};
use crate::services::ServiceId;
use crate::DynError;

/// Grouper handle for the `LifecycleHandle` of each spawned service.
#[derive(Clone)]
pub struct ServicesLifeCycleHandle {
handlers: HashMap<ServiceId, LifecycleHandle>,
}

impl ServicesLifeCycleHandle {
pub fn empty() -> Self {
Self {
handlers: Default::default(),
}
}

/// Send a `Shutdown` message to the specified service
///
/// # Arguments
///
/// `service` - The `ServiceId` of the target service
/// `sender` - A sender side of a broadcast channel. A return signal when finished handling the
/// message will be sent.
pub fn shutdown(
&self,
service: ServiceId,
sender: Sender<FinishedSignal>,
) -> Result<(), DynError> {
self.handlers
.get(service)
.unwrap()
.send(LifecycleMessage::Shutdown(sender))?;
Ok(())
}

/// Send a `Kill` message to the specified service (`ServiceId`)
///
/// # Arguments
///
/// `service` - The `ServiceId` of the target service
pub fn kill(&self, service: ServiceId) -> Result<(), DynError> {
self.handlers
.get(service)
.unwrap()
.send(LifecycleMessage::Kill)
}

/// Send a `Kill` message to all services registered in this handle
pub fn kill_all(&self) -> Result<(), DynError> {
for service_id in self.services_ids() {
self.kill(service_id)?;
}
Ok(())
}

/// Get all services ids registered in this handle
pub fn services_ids(&self) -> impl Iterator<Item = ServiceId> + '_ {
self.handlers.keys().copied()
}
}

impl<const N: usize> TryFrom<[(ServiceId, LifecycleHandle); N]> for ServicesLifeCycleHandle {
// TODO: On errors refactor extract into a concrete error type with `thiserror`
type Error = Box<dyn Error + Send + Sync>;

fn try_from(value: [(ServiceId, LifecycleHandle); N]) -> Result<Self, Self::Error> {
let mut handlers = HashMap::new();
for (service_id, handle) in value {
if handlers.contains_key(service_id) {
return Err(Box::<dyn Error + Send + Sync>::from(Cow::Owned(format!(
"Duplicated serviceId: {service_id}"
))));
}
handlers.insert(service_id, handle);
}
Ok(Self { handlers })
}
}
63 changes: 47 additions & 16 deletions overwatch-rs/src/overwatch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod commands;
pub mod handle;
pub mod life_cycle;
// std

use std::any::Any;
Expand All @@ -14,14 +15,16 @@ use tokio::runtime::{Handle, Runtime};
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::{info, instrument};
use tracing::{error, info, instrument};

// internal

use crate::overwatch::commands::{
OverwatchCommand, OverwatchLifeCycleCommand, RelayCommand, SettingsCommand,
OverwatchCommand, OverwatchLifeCycleCommand, RelayCommand, ServiceLifeCycleCommand,
SettingsCommand,
};
use crate::overwatch::handle::OverwatchHandle;
pub use crate::overwatch::life_cycle::ServicesLifeCycleHandle;
use crate::services::life_cycle::LifecycleMessage;
use crate::services::relay::RelayResult;
use crate::services::{ServiceError, ServiceId};
use crate::utils::runtime::default_multithread_runtime;
Expand Down Expand Up @@ -79,7 +82,7 @@ pub trait Services: Sized {

// TODO: this probably will be removed once the services lifecycle is implemented
/// Start all services attached to the trait implementer
fn start_all(&mut self) -> Result<(), Error>;
fn start_all(&mut self) -> Result<ServicesLifeCycleHandle, Error>;

/// Stop a service attached to the trait implementer
fn stop(&mut self, service_id: ServiceId) -> Result<(), Error>;
Expand Down Expand Up @@ -124,12 +127,20 @@ where
let (commands_sender, commands_receiver) = tokio::sync::mpsc::channel(16);
let handle = OverwatchHandle::new(runtime.handle().clone(), commands_sender);
let services = S::new(settings, handle.clone())?;
let runner = OverwatchRunner {
let mut runner = OverwatchRunner {
services,
handle: handle.clone(),
finish_signal_sender,
};
runtime.spawn(async move { runner.run_(commands_receiver).await });

let lifecycle_handlers = runner.services.start_all()?;

runtime.spawn(async move {
runner
.run_(commands_receiver, lifecycle_handlers.clone())
.await
});

Ok(Overwatch {
runtime,
handle,
Expand All @@ -138,28 +149,48 @@ where
}

#[instrument(name = "overwatch-run", skip_all)]
async fn run_(self, mut receiver: Receiver<OverwatchCommand>) {
async fn run_(
self,
mut receiver: Receiver<OverwatchCommand>,
lifecycle_handlers: ServicesLifeCycleHandle,
) {
let Self {
mut services,
handle: _,
finish_signal_sender,
} = self;
// TODO: this probably need to be manually done, or at least handled by a flag
services.start_all().expect("Services to start running");
while let Some(command) = receiver.recv().await {
info!(command = ?command, "Overwatch command received");
match command {
OverwatchCommand::Relay(relay_command) => {
Self::handle_relay(&mut services, relay_command).await;
}
OverwatchCommand::ServiceLifeCycle(_) => {
unimplemented!("Services life cycle is still not supported!");
}
OverwatchCommand::ServiceLifeCycle(msg) => match msg {
ServiceLifeCycleCommand {
service_id,
msg: LifecycleMessage::Shutdown(channel),
} => {
if let Err(e) = lifecycle_handlers.shutdown(service_id, channel) {
error!(e);
}
}
ServiceLifeCycleCommand {
service_id,
msg: LifecycleMessage::Kill,
} => {
if let Err(e) = lifecycle_handlers.kill(service_id) {
error!(e);
}
}
},
OverwatchCommand::OverwatchLifeCycle(command) => {
if matches!(
command,
OverwatchLifeCycleCommand::Kill | OverwatchLifeCycleCommand::Shutdown
) {
if let Err(e) = lifecycle_handlers.kill_all() {
error!(e);
}
break;
}
}
Expand Down Expand Up @@ -216,7 +247,7 @@ impl Overwatch {
&self.handle
}

/// Get the underllaying tokio runtime handle
/// Get the underlaying tokio runtime handle
pub fn runtime(&self) -> &Handle {
self.runtime.handle()
}
Expand Down Expand Up @@ -247,7 +278,7 @@ impl Overwatch {
#[cfg(test)]
mod test {
use crate::overwatch::handle::OverwatchHandle;
use crate::overwatch::{Error, OverwatchRunner, Services};
use crate::overwatch::{Error, OverwatchRunner, Services, ServicesLifeCycleHandle};
use crate::services::relay::{RelayError, RelayResult};
use crate::services::ServiceId;
use std::time::Duration;
Expand All @@ -269,8 +300,8 @@ mod test {
Err(Error::Unavailable { service_id })
}

fn start_all(&mut self) -> Result<(), Error> {
Ok(())
fn start_all(&mut self) -> Result<ServicesLifeCycleHandle, Error> {
Ok(ServicesLifeCycleHandle::empty())
}

fn stop(&mut self, service_id: ServiceId) -> Result<(), Error> {
Expand Down
23 changes: 12 additions & 11 deletions overwatch-rs/src/services/handle.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// crates
use futures::future::{abortable, AbortHandle};
use tokio::runtime::Handle;
// internal
use crate::overwatch::handle::OverwatchHandle;
use crate::services::life_cycle::LifecycleHandle;
use crate::services::relay::{relay, InboundRelay, OutboundRelay};
use crate::services::settings::{SettingsNotifier, SettingsUpdater};
use crate::services::state::{StateHandle, StateOperator, StateUpdater};
use crate::services::{ServiceCore, ServiceData, ServiceId, ServiceState};

// TODO: Abstract handle over state, to diferentiate when the service is running and when it is not
// TODO: Abstract handle over state, to differentiate when the service is running and when it is not
// that way we can expose a better API depending on what is happenning. Would get rid of the probably
// unnecessary Option and cloning.
/// Service handle
Expand All @@ -33,14 +33,15 @@ pub struct ServiceStateHandle<S: ServiceData> {
pub overwatch_handle: OverwatchHandle,
pub settings_reader: SettingsNotifier<S::Settings>,
pub state_updater: StateUpdater<S::State>,
pub _lifecycle_handler: (),
pub lifecycle_handle: LifecycleHandle,
}

/// Main service executor
/// It is the object that hold the necessary information for the service to run
pub struct ServiceRunner<S: ServiceData> {
service_state: ServiceStateHandle<S>,
state_handle: StateHandle<S::State, S::StateOperator>,
lifecycle_handle: LifecycleHandle,
}

impl<S: ServiceData> ServiceHandle<S> {
Expand Down Expand Up @@ -94,17 +95,20 @@ impl<S: ServiceData> ServiceHandle<S> {
let (state_handle, state_updater) =
StateHandle::<S::State, S::StateOperator>::new(self.initial_state.clone(), operator);

let lifecycle_handle = LifecycleHandle::new();

let service_state = ServiceStateHandle {
inbound_relay,
overwatch_handle: self.overwatch_handle.clone(),
state_updater,
settings_reader,
_lifecycle_handler: (),
lifecycle_handle: lifecycle_handle.clone(),
};

ServiceRunner {
service_state,
state_handle,
lifecycle_handle,
}
}
}
Expand All @@ -124,22 +128,19 @@ where
/// Spawn the service main loop and handle it lifecycle
/// Return a handle to abort execution manually

pub fn run(self) -> Result<AbortHandle, crate::DynError> {
pub fn run(self) -> Result<(ServiceId, LifecycleHandle), crate::DynError> {
let ServiceRunner {
service_state,
state_handle,
..
lifecycle_handle,
} = self;

let runtime = service_state.overwatch_handle.runtime().clone();
let service = S::init(service_state)?;
let (runner, abortable_handle) = abortable(service.run());

runtime.spawn(runner);
runtime.spawn(service.run());
runtime.spawn(state_handle.run());

// TODO: Handle service lifecycle
// TODO: this handle should not scape this scope, it should actually be handled in the lifecycle part mentioned above
Ok(abortable_handle)
Ok((S::SERVICE_ID, lifecycle_handle))
}
}
Loading

0 comments on commit ac28d01

Please sign in to comment.