Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

editoast: add 'core' connectivity to the healthcheck #9393

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions editoast/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3525,6 +3525,25 @@ components:
enum:
- STANDARD
- MARECO
EditoastAppHealthErrorCore:
type: object
required:
- type
- status
- message
properties:
context:
type: object
message:
type: string
status:
type: integer
enum:
- 400
type:
type: string
enum:
- editoast:app_health:Core
EditoastAppHealthErrorDatabase:
type: object
required:
Expand Down Expand Up @@ -4145,6 +4164,7 @@ components:
- editoast:electrical_profiles:NotFound
EditoastError:
oneOf:
- $ref: '#/components/schemas/EditoastAppHealthErrorCore'
- $ref: '#/components/schemas/EditoastAppHealthErrorDatabase'
- $ref: '#/components/schemas/EditoastAppHealthErrorTimeout'
- $ref: '#/components/schemas/EditoastAppHealthErrorValkey'
Expand Down
25 changes: 16 additions & 9 deletions editoast/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub enum Commands {
#[command(subcommand, about, long_about = "Roles related commands")]
Roles(RolesCommand),
#[command(about, long_about = "Healthcheck")]
Healthcheck,
Healthcheck(CoreArgs),
}

#[derive(Args, Debug, Derivative, Clone)]
Expand All @@ -100,6 +100,19 @@ pub struct MapLayersConfig {
pub max_tiles: u64,
}

#[derive(Args, Debug)]
#[command(about, long_about = "Launch the server")]
pub struct CoreArgs {
#[clap(long, env = "OSRD_MQ_URL", default_value_t = String::from("amqp://osrd:password@127.0.0.1:5672/%2f"))]
pub mq_url: String,
#[clap(long, env = "EDITOAST_CORE_TIMEOUT", default_value_t = 180)]
pub core_timeout: u64,
#[clap(long, env = "EDITOAST_CORE_SINGLE_WORKER", default_value_t = false)]
pub core_single_worker: bool,
#[clap(long, env = "CORE_CLIENT_CHANNELS_SIZE", default_value_t = 8)]
pub core_client_channels_size: usize,
}

#[derive(Args, Debug)]
#[command(about, long_about = "Launch the server")]
pub struct RunserverArgs {
Expand All @@ -109,12 +122,8 @@ pub struct RunserverArgs {
pub port: u16,
#[arg(long, env = "EDITOAST_ADDRESS", default_value_t = String::from("0.0.0.0"))]
pub address: String,
#[clap(long, env = "OSRD_MQ_URL", default_value_t = String::from("amqp://osrd:password@127.0.0.1:5672/%2f"))]
pub mq_url: String,
#[clap(long, env = "EDITOAST_CORE_TIMEOUT", default_value_t = 180)]
pub core_timeout: u64,
#[clap(long, env = "EDITOAST_CORE_SINGLE_WORKER", default_value_t = false)]
pub core_single_worker: bool,
#[command(flatten)]
pub core: CoreArgs,
#[clap(long, env = "ROOT_PATH", default_value_t = String::new())]
pub root_path: String,
#[clap(long)]
Expand All @@ -131,8 +140,6 @@ pub struct RunserverArgs {
/// The timeout to use when performing the healthcheck, in milliseconds
#[clap(long, env = "EDITOAST_HEALTH_CHECK_TIMEOUT_MS", default_value_t = 500)]
pub health_check_timeout_ms: u64,
#[clap(long, env = "CORE_CLIENT_CHANNELS_SIZE", default_value_t = 8)]
pub core_client_channels_size: usize,
}

#[derive(Args, Debug)]
Expand Down
12 changes: 11 additions & 1 deletion editoast/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ impl CoreClient {
error
}

pub async fn ping(&self) -> Result<bool, CoreError> {
match self {
CoreClient::MessageQueue(mq_client) => {
mq_client.ping().await.map_err(|_| CoreError::BrokenPipe)
}
#[cfg(test)]
CoreClient::Mocked(_) => Ok(true),
}
}

#[tracing::instrument(
target = "editoast::coreclient",
name = "core:fetch",
Expand Down Expand Up @@ -248,7 +258,7 @@ impl CoreResponse for () {
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error, EditoastError)]
#[editoast_error(base_id = "coreclient")]
enum CoreError {
pub enum CoreError {
#[error("Cannot parse Core response: {msg}")]
#[editoast_error(status = 500)]
CoreResponseFormatError { msg: String },
Expand Down
16 changes: 13 additions & 3 deletions editoast/src/core/mq_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ impl RabbitMQClient {
})
}

pub async fn ping(&self) -> Result<bool, MqClientError> {
let channel_worker = self
.pool
.get()
.await
.map_err(|_| MqClientError::PoolChannelFail)?;
let channel = channel_worker.get_channel();
Ok(channel.status().connected())
}

async fn connection_ok(connection: &Arc<RwLock<Option<Connection>>>) -> bool {
let guard = connection.as_ref().read().await;
let conn = guard.as_ref();
Expand All @@ -245,10 +255,10 @@ impl RabbitMQClient {
Some(conn) => conn.status().state(),
};
match status {
lapin::ConnectionState::Initial => true,
lapin::ConnectionState::Connecting => true,
lapin::ConnectionState::Initial => false,
lapin::ConnectionState::Connecting => false,
lapin::ConnectionState::Connected => true,
lapin::ConnectionState::Closing => true,
lapin::ConnectionState::Closing => false,
lapin::ConnectionState::Closed => false,
lapin::ConnectionState::Error => false,
}
Expand Down
24 changes: 18 additions & 6 deletions editoast/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use client::roles::RolesCommand;
use client::search_commands::*;
use client::stdcm_search_env_commands::handle_stdcm_search_env_command;
use client::timetables_commands::*;
use client::CoreArgs;
use client::{Client, Color, Commands, RunserverArgs, ValkeyConfig};
use client::{MapLayersConfig, PostgresConfig};
use dashmap::DashMap;
Expand Down Expand Up @@ -248,16 +249,27 @@ async fn run() -> Result<(), Box<dyn Error + Send + Sync>> {
.map_err(Into::into)
}
},
Commands::Healthcheck => healthcheck_cmd(db_pool.into(), valkey_config).await,
Commands::Healthcheck(core_config) => {
healthcheck_cmd(db_pool.into(), valkey_config, core_config).await
}
}
}

async fn healthcheck_cmd(
db_pool: Arc<DbConnectionPoolV2>,
valkey_config: ValkeyConfig,
core_config: CoreArgs,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let valkey = ValkeyClient::new(valkey_config).unwrap();
check_health(db_pool, valkey.into())
let core_client = CoreClient::new_mq(mq_client::Options {
uri: core_config.mq_url,
worker_pool_identifier: String::from("core"),
timeout: core_config.core_timeout,
single_worker: core_config.core_single_worker,
num_channels: core_config.core_client_channels_size,
})
.await?;
check_health(db_pool, valkey.into(), core_client.into())
.await
.map_err(|e| CliError::new(1, format!("❌ healthcheck failed: {0}", e)))?;
println!("✅ Healthcheck passed");
Expand Down Expand Up @@ -312,11 +324,11 @@ impl AppState {

// Build Core client
let core_client = CoreClient::new_mq(mq_client::Options {
uri: args.mq_url.clone(),
uri: args.core.mq_url.clone(),
worker_pool_identifier: "core".into(),
timeout: args.core_timeout,
single_worker: args.core_single_worker,
num_channels: args.core_client_channels_size,
timeout: args.core.core_timeout,
single_worker: args.core.core_single_worker,
num_channels: args.core.core_client_channels_size,
})
.await?
.into();
Expand Down
18 changes: 14 additions & 4 deletions editoast/src/views/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ use utoipa::ToSchema;
use crate::client::get_app_version;
use crate::core::version::CoreVersionRequest;
use crate::core::AsCoreRequest;
use crate::core::CoreClient;
use crate::core::CoreError;
use crate::core::{self};
use crate::error::Result;
use crate::error::{self};
Expand Down Expand Up @@ -235,6 +237,8 @@ pub enum AppHealthError {
Database(#[from] editoast_models::db_connection_pool::PingError),
#[error(transparent)]
Valkey(#[from] redis::RedisError),
#[error(transparent)]
Core(#[from] CoreError),
}

#[utoipa::path(
Expand All @@ -248,23 +252,29 @@ async fn health(
db_pool_v2: db_pool,
valkey,
health_check_timeout,
core_client,
..
}): State<AppState>,
) -> Result<&'static str> {
timeout(health_check_timeout, check_health(db_pool, valkey))
.await
.map_err(|_| AppHealthError::Timeout)??;
timeout(
health_check_timeout,
check_health(db_pool, valkey, core_client),
)
.await
.map_err(|_| AppHealthError::Timeout)??;
Ok("ok")
}

pub async fn check_health(
db_pool: Arc<DbConnectionPoolV2>,
valkey_client: Arc<ValkeyClient>,
core_client: Arc<CoreClient>,
) -> Result<()> {
let mut db_connection = db_pool.clone().get().await?;
tokio::try_join!(
ping_database(&mut db_connection).map_err(AppHealthError::Database),
valkey_client.ping_valkey().map_err(|e| e.into())
valkey_client.ping_valkey().map_err(AppHealthError::Valkey),
core_client.ping().map_err(AppHealthError::Core),
)?;
Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion front/public/locales/en/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@
"app_health": {
"Timeout": "Service has not responded in time",
"Database": "Database is in error",
"Valkey": "Valkey is in error"
"Valkey": "Valkey is in error",
"Core": "Core is in error"
}
}
}
3 changes: 2 additions & 1 deletion front/public/locales/fr/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@
"app_health": {
"Timeout": "Le serveur n'a pas répondu à temps",
"Database": "Erreur de base de données",
"Valkey": "Erreur de Valkey"
"Valkey": "Erreur de Valkey",
"Core": "Erreur de core"
}
}
}
Loading