Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Horust: initial support for commands uds and status command #252

Merged
merged 3 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions horust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ shellexpand = "~3.1"
anyhow = "~1.0"
thiserror = "~1.0"
bytefmt = "0.1.7"
horust-commands-lib = {path = "../commands"}

[features]
default = ["http-healthcheck"]
Expand Down
106 changes: 106 additions & 0 deletions horust/src/horust/commands_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use crate::horust::bus::BusConnector;
use crate::horust::formats::{ServiceName, ServiceStatus};
use crate::horust::Event;
use anyhow::{anyhow, Result};
use horust_commands_lib::{CommandsHandlerTrait, HorustMsgServiceStatus};
use std::collections::HashMap;
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
use std::thread::JoinHandle;
use std::time::Duration;
use std::{fs, thread};

pub fn spawn(
bus: BusConnector<Event>,
uds_path: PathBuf,
services: Vec<ServiceName>,
) -> JoinHandle<()> {
thread::spawn(move || {
let mut commands_handler = CommandsHandler::new(bus, uds_path, services);
commands_handler.run();
})
}

struct CommandsHandler {
bus: BusConnector<Event>,
services: HashMap<ServiceName, ServiceStatus>,
uds_listener: UnixListener,
uds_path: PathBuf,
}

impl CommandsHandler {
fn new(bus: BusConnector<Event>, uds_path: PathBuf, services: Vec<ServiceName>) -> Self {
let uds_listener = UnixListener::bind(&uds_path).unwrap();
uds_listener.set_nonblocking(true).unwrap();
Self {
bus,
uds_path,
uds_listener,
services: services
.into_iter()
.map(|s| (s, ServiceStatus::Initial))
.collect(),
}
}
fn run(&mut self) {
loop {
let evs = self.bus.try_get_events();
for ev in evs {
match ev {
Event::StatusChanged(name, status) => {
let k = self.services.get_mut(&name).unwrap();
*k = status;
}
Event::ShuttingDownInitiated(_) => {
fs::remove_file(&self.uds_path).unwrap();
return;
}
_ => {}
}
}
self.accept().unwrap();
thread::sleep(Duration::from_millis(300));
}
}
}

impl CommandsHandlerTrait for CommandsHandler {
fn get_unix_listener(&mut self) -> &mut UnixListener {
&mut self.uds_listener
}
fn get_service_status(&self, service_name: &str) -> anyhow::Result<HorustMsgServiceStatus> {
self.services
.get(service_name)
.map(from_service_status)
.ok_or_else(|| anyhow!("Error: service {service_name} not found."))
}
fn update_service_status(
&self,
_service_name: &str,
_new_status: HorustMsgServiceStatus,
) -> Result<()> {
/*
match self.services.get(service_name) {
None => bail!("Service {service_name} not found."),
Some(service_status) if from_service_status(service_status) != new_status => {
//self.bus.send_event(Event::Kill())
}
_ => (),
};*/
todo!();
}
}

fn from_service_status(status: &ServiceStatus) -> HorustMsgServiceStatus {
match status {
ServiceStatus::Starting => HorustMsgServiceStatus::Starting,
ServiceStatus::Started => HorustMsgServiceStatus::Started,
ServiceStatus::Running => HorustMsgServiceStatus::Running,
ServiceStatus::InKilling => HorustMsgServiceStatus::Inkilling,
ServiceStatus::Success => HorustMsgServiceStatus::Success,
ServiceStatus::Finished => HorustMsgServiceStatus::Finished,
ServiceStatus::FinishedFailed => HorustMsgServiceStatus::Finishedfailed,
ServiceStatus::Failed => HorustMsgServiceStatus::Failed,
ServiceStatus::Initial => HorustMsgServiceStatus::Initial,
}
}
3 changes: 0 additions & 3 deletions horust/src/horust/formats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ pub enum ShuttingDown {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Event {
PidChanged(ServiceName, Pid),
ServiceStarted(ServiceName),
// This command updates the service status.
StatusUpdate(ServiceName, ServiceStatus),
// This event represents a status change.
Expand All @@ -27,8 +26,6 @@ pub enum Event {
Run(ServiceName),
ShuttingDownInitiated(ShuttingDown),
HealthCheck(ServiceName, HealthinessStatus),
// TODO: to allow changes of service at supervisor:
//ServiceCreated(ServiceHandler)
}

impl Event {
Expand Down
40 changes: 24 additions & 16 deletions horust/src/horust/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::horust::formats::{validate, Service};
pub use self::formats::{get_sample_service, ExitStatus, HorustConfig};

mod bus;
mod commands_handler;
mod error;
mod formats;
mod healthcheck;
Expand All @@ -24,35 +25,35 @@ mod supervisor;
#[derive(Debug)]
pub struct Horust {
services: Vec<Service>,
uds_path: PathBuf,
}

impl Horust {
fn new(services: Vec<Service>) -> Self {
Horust { services }
fn new(services: Vec<Service>, uds_path: PathBuf) -> Self {
Horust { services, uds_path }
}

pub fn get_services(&self) -> &[Service] {
&self.services
}
/// Creates a new Horust instance from a command.
/// The command will be wrapped in a service and run with sane defaults
pub fn from_command(command: String) -> Self {
Self::new(vec![Service::from_command(command)])
pub fn from_command(command: String, uds_path: PathBuf) -> Self {
Self::new(vec![Service::from_command(command)], uds_path)
}

/// Create a new horust instance from multiple paths of services.
pub fn from_services_dirs(paths: &[PathBuf]) -> Result<Self> {
let services = paths
fn load_services_from_folders(paths: &[PathBuf]) -> Result<Vec<Service>> {
paths
.iter()
.map(|path| fetch_services(path))
.flat_map(|result| match result {
Ok(vec) => vec.into_iter().map(Ok).collect(),
Err(err) => vec![Err(err)],
})
.collect::<Result<Vec<_>>>()?;

.collect::<Result<Vec<_>>>()
}
/// Create a new horust instance from multiple paths of services.
pub fn from_services_dirs(paths: &[PathBuf], uds_path: PathBuf) -> Result<Self> {
let services = Self::load_services_from_folders(paths)?;
let services = validate(services)?;
Ok(Horust::new(services))
Ok(Horust::new(services, uds_path))
}

/// Blocking call, will setup the event loop and the threads and run all the available services.
Expand All @@ -78,6 +79,11 @@ impl Horust {
debug!("Services: {:?}", self.services);
// Spawn helper threads:
healthcheck::spawn(dispatcher.join_bus(), self.services.clone());
commands_handler::spawn(
dispatcher.join_bus(),
self.uds_path.clone(),
self.services.iter().map(|s| s.name.clone()).collect(),
);
let handle = supervisor::spawn(dispatcher.join_bus(), self.services.clone());
dispatcher.run();
handle.join().unwrap()
Expand Down Expand Up @@ -152,11 +158,12 @@ mod test {
use std::io;
use std::path::{Path, PathBuf};

use crate::horust::fetch_services;
use crate::horust::formats::Service;
use crate::Horust;
use tempdir::TempDir;

use crate::horust::fetch_services;
use crate::horust::formats::Service;

const FIRST_SERVICE_FILENAME: &str = "my-first-service.toml";
const SECOND_SERVICE_FILENAME: &str = "my-second-service.toml";

Expand Down Expand Up @@ -239,7 +246,8 @@ mod test {
let base = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let services_path = base.join("../example_services/");
let services = list_files(&services_path).unwrap().len();
let horust = Horust::from_services_dirs(&[services_path]).unwrap();
let horust =
Horust::from_services_dirs(&[services_path], "/tmp/horust-test-uds".into()).unwrap();
assert_eq!(horust.services.len(), services);
}
}
37 changes: 26 additions & 11 deletions horust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,35 @@ use std::path::PathBuf;

use anyhow::{Context, Result};
use clap::Parser;
use horust::horust::ExitStatus;
use horust::horust::HorustConfig;
use horust::horust::{ExitStatus, HorustConfig};
use horust::Horust;
use log::{error, info};
use nix::unistd::getpid;

#[derive(clap::Parser, Debug)]
#[clap(author, about)]
/// Horust is a complete supervisor and init system, designed for running in containers.
struct Opts {
#[clap(long, default_value = "/etc/horust/horust.toml")]
#[arg(long, default_value = "/etc/horust/horust.toml")]
/// Horust's path to config.
config_path: PathBuf,

#[clap(flatten)]
horust_config: HorustConfig,

#[clap(long)]
#[arg(long)]
/// Print a sample service file with all the possible options
sample_service: bool,

#[clap(long = "services-path", default_value = "/etc/horust/services")]
#[arg(long = "services-path", default_value = "/etc/horust/services")]
/// Path to service file or a directory containing services to run. You can provide more than one argument to load multiple directories / services.
services_paths: Vec<PathBuf>,

#[clap(required = false, last = true)]
#[arg(required = false, long, default_value = "/var/run/horust")]
/// Path to the folder that contains the Unix Domain Socket, used to communicate with horustctl
uds_folder_path: PathBuf,

#[arg(required = false, last = true)]
/// Specify a command to run instead of load services path. Useful if you just want to use the reaping capability. Prefix your command with --
command: Vec<String>,
}
Expand All @@ -52,21 +56,32 @@ fn main() -> Result<()> {
&opts.config_path.display()
)
})?;
if !opts.uds_folder_path.exists() {
std::fs::create_dir_all(&opts.uds_folder_path)?;
}

let mut horust = if !opts.command.is_empty() {
info!("Running command: {:?}", opts.command);
Horust::from_command(opts.command.join(" "))
} else {
if !opts.uds_folder_path.is_dir() {
panic!(
"'{:?}' is not a directory. Use --uds-folder-path to select a different folder.",
opts.uds_folder_path
);
}
let uds_path = horust_commands_lib::get_path(&opts.uds_folder_path, getpid().into());

let mut horust = if opts.command.is_empty() {
info!(
"Loading services from {}",
display_directories(&opts.services_paths)
);
Horust::from_services_dirs(&opts.services_paths).with_context(|| {
Horust::from_services_dirs(&opts.services_paths, uds_path).with_context(|| {
format!(
"Failed loading services from {}",
display_directories(&opts.services_paths)
)
})?
} else {
info!("Running command: {:?}", opts.command);
Horust::from_command(opts.command.join(" "), uds_path)
};

if let ExitStatus::SomeServiceFailed = horust.run() {
Expand Down
Loading