From 8132f994f8efbdf0532fce9148c7a6a4984c2783 Mon Sep 17 00:00:00 2001 From: Vova Lando Date: Fri, 11 Oct 2024 00:32:49 +0300 Subject: [PATCH] feat: server health call implementation --- CHANGELOG.md | 6 ++ Cargo.lock | 2 +- Cargo.toml | 2 +- src/chain/definitions.rs | 3 +- src/chain/mod.rs | 125 +++++++++++++++++++++++++-------------- src/chain/tracker.rs | 34 ++++++++++- src/definitions.rs | 24 ++++---- src/handlers/health.rs | 14 +++-- src/state.rs | 39 +++++++++++- 9 files changed, 181 insertions(+), 68 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0629fe5..f025c17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to this project will be documented in this file. +## [0.2.3] - 2024-10-15 + +### 🚀 Features + +- Server health call implementation + ## [0.2.2] - 2024-10-10 ### 🚀 Features diff --git a/Cargo.lock b/Cargo.lock index f1c10d3..afe55ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1427,7 +1427,7 @@ dependencies = [ [[package]] name = "kalatori" -version = "0.2.2" +version = "0.2.3" dependencies = [ "ahash", "axum", diff --git a/Cargo.toml b/Cargo.toml index 05312fb..eadb2cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kalatori" authors = ["Alzymologist Oy "] -version = "0.2.2" +version = "0.2.3" edition = "2021" description = "A gateway daemon for Kalatori." license = "GPL-3.0-or-later" diff --git a/src/chain/definitions.rs b/src/chain/definitions.rs index cc7f8ab..4faa2b6 100644 --- a/src/chain/definitions.rs +++ b/src/chain/definitions.rs @@ -11,7 +11,7 @@ use crate::{ tracker::ChainWatcher, }, definitions::{ - api_v2::{OrderInfo, Timestamp}, + api_v2::{OrderInfo, RpcInfo, Timestamp}, Balance, }, error::{ChainError, NotHex}, @@ -89,6 +89,7 @@ pub enum ChainRequest { WatchAccount(WatchAccount), Reap(WatchAccount), Shutdown(oneshot::Sender<()>), + GetConnectedRpcs(oneshot::Sender>), } #[derive(Debug)] diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 67cfed8..94d08b0 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -1,7 +1,6 @@ //! Everything related to actual interaction with blockchain use std::collections::HashMap; - use substrate_crypto_light::common::AccountId32; use tokio::{ sync::{mpsc, oneshot}, @@ -23,6 +22,7 @@ pub mod rpc; pub mod tracker; pub mod utils; +use crate::definitions::api_v2::{Health, RpcInfo, ServerHealth}; use definitions::{ChainRequest, ChainTrackerRequest, WatchAccount}; use tracker::start_chain_watch; @@ -35,7 +35,7 @@ const SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(120000); /// RPC server handle #[derive(Clone, Debug)] pub struct ChainManager { - pub tx: tokio::sync::mpsc::Sender, + pub tx: mpsc::Sender, } impl ChainManager { @@ -54,6 +54,9 @@ impl ChainManager { let mut currency_map = HashMap::new(); + // Create a channel for receiving RPC status updates + let (rpc_update_tx, mut rpc_update_rx) = mpsc::channel(1024); + // start network monitors for c in chain { if c.endpoints.is_empty() { @@ -82,61 +85,86 @@ impl ChainManager { signer.interface(), task_tracker.clone(), cancellation_token.clone(), + rpc_update_tx.clone(), ); } task_tracker .clone() .spawn("Blockchain connections manager", async move { + let mut rpc_statuses: HashMap<(String, String), Health> = HashMap::new(); + // start requests engine - while let Some(request) = rx.recv().await { - match request { - ChainRequest::WatchAccount(request) => { - if let Some(chain) = currency_map.get(&request.currency) { - if let Some(receiver) = watch_chain.get(chain) { - let _unused = receiver - .send(ChainTrackerRequest::WatchAccount(request)) - .await; - } else { - let _unused = request - .res - .send(Err(ChainError::InvalidChain(chain.to_string()))); + loop { + tokio::select! { + Some(request) = rx.recv() => { + match request { + ChainRequest::WatchAccount(request) => { + if let Some(chain) = currency_map.get(&request.currency) { + if let Some(receiver) = watch_chain.get(chain) { + let _unused = receiver + .send(ChainTrackerRequest::WatchAccount(request)) + .await; + } else { + let _unused = request + .res + .send(Err(ChainError::InvalidChain(chain.to_string()))); + } + } else { + let _unused = request + .res + .send(Err(ChainError::InvalidCurrency(request.currency))); + } } - } else { - let _unused = request - .res - .send(Err(ChainError::InvalidCurrency(request.currency))); - } - } - ChainRequest::Reap(request) => { - if let Some(chain) = currency_map.get(&request.currency) { - if let Some(receiver) = watch_chain.get(chain) { - let _unused = - receiver.send(ChainTrackerRequest::Reap(request)).await; - } else { - let _unused = request - .res - .send(Err(ChainError::InvalidChain(chain.to_string()))); + ChainRequest::Reap(request) => { + if let Some(chain) = currency_map.get(&request.currency) { + if let Some(receiver) = watch_chain.get(chain) { + let _unused = + receiver.send(ChainTrackerRequest::Reap(request)).await; + } else { + let _unused = request + .res + .send(Err(ChainError::InvalidChain(chain.to_string()))); + } + } else { + let _unused = request + .res + .send(Err(ChainError::InvalidCurrency(request.currency))); + } } - } else { - let _unused = request - .res - .send(Err(ChainError::InvalidCurrency(request.currency))); - } - } - ChainRequest::Shutdown(res) => { - for (name, chain) in watch_chain.drain() { - let (tx, rx) = oneshot::channel(); - if chain.send(ChainTrackerRequest::Shutdown(tx)).await.is_ok() { - if timeout(SHUTDOWN_TIMEOUT, rx).await.is_err() { - tracing::error!("Chain monitor for {name} took too much time to wind down, probably it was frozen. Discarding it."); - }; + ChainRequest::Shutdown(res) => { + for (name, chain) in watch_chain.drain() { + let (tx, rx) = oneshot::channel(); + if chain.send(ChainTrackerRequest::Shutdown(tx)).await.is_ok() { + if timeout(SHUTDOWN_TIMEOUT, rx).await.is_err() { + tracing::error!("Chain monitor for {name} took too much time to wind down, probably it was frozen. Discarding it."); + }; + } + } + let _ = res.send(()); + break; + } + ChainRequest::GetConnectedRpcs(res_tx) => { + // Collect the RpcInfo from rpc_statuses + let connected_rpcs: Vec = rpc_statuses.iter().map(|((chain_name, rpc_url), status)| { + RpcInfo { + chain_name: chain_name.clone(), + rpc_url: rpc_url.clone(), + status: *status, + } + }).collect(); + let _ = res_tx.send(connected_rpcs); } } - let _ = res.send(()); - break; } + Some(rpc_update) = rpc_update_rx.recv() => { + rpc_statuses.insert( + (rpc_update.chain_name.clone(), rpc_update.rpc_url.clone()), + rpc_update.status, + ); + } + else => break, } } @@ -162,6 +190,15 @@ impl ChainManager { rx.await.map_err(|_| ChainError::MessageDropped)? } + pub async fn get_connected_rpcs(&self) -> Result, Error> { + let (res_tx, res_rx) = oneshot::channel(); + self.tx + .send(ChainRequest::GetConnectedRpcs(res_tx)) + .await + .map_err(|_| Error::Fatal)?; + res_rx.await.map_err(|_| Error::Fatal) + } + pub async fn reap( &self, id: String, diff --git a/src/chain/tracker.rs b/src/chain/tracker.rs index 4c36649..09f8455 100644 --- a/src/chain/tracker.rs +++ b/src/chain/tracker.rs @@ -1,10 +1,9 @@ //! A tracker that follows individual chain -use std::{collections::HashMap, time::SystemTime}; - use frame_metadata::v15::RuntimeMetadataV15; use jsonrpsee::ws_client::{WsClient, WsClientBuilder}; use serde_json::Value; +use std::{collections::HashMap, time::SystemTime}; use substrate_parser::{AsMetadata, ShortSpecs}; use tokio::{ sync::mpsc, @@ -12,6 +11,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; +use crate::definitions::api_v2::{Health, RpcInfo}; use crate::{ chain::{ definitions::{BlockHash, ChainTrackerRequest, Invoice}, @@ -38,6 +38,7 @@ pub fn start_chain_watch( signer: Signer, task_tracker: TaskTracker, cancellation_token: CancellationToken, + rpc_update_tx: mpsc::Sender, ) { task_tracker .clone() @@ -51,9 +52,30 @@ pub fn start_chain_watch( if shutdown || cancellation_token.is_cancelled() { break; } + + let _ = rpc_update_tx.send(RpcInfo { + chain_name: chain.name.clone(), + rpc_url: endpoint.clone(), + status: Health::Degraded, + }).await; + if let Ok(client) = WsClientBuilder::default().build(endpoint).await { + let _ = rpc_update_tx.send(RpcInfo { + chain_name: chain.name.clone(), + rpc_url: endpoint.clone(), + status: Health::Ok, + }).await; + // prepare chain - let watcher = match ChainWatcher::prepare_chain(&client, chain.clone(), &mut watched_accounts, endpoint, chain_tx.clone(), state.interface(), task_tracker.clone()) + let watcher = match ChainWatcher::prepare_chain( + &client, + chain.clone(), + &mut watched_accounts, + endpoint, + chain_tx.clone(), + state.interface(), + task_tracker.clone(), + ) .await { Ok(a) => a, @@ -165,6 +187,12 @@ pub fn start_chain_watch( } } } + } else { + let _ = rpc_update_tx.send(RpcInfo { + chain_name: chain.name.clone(), + rpc_url: endpoint.clone(), + status: Health::Critical, + }).await; } } Ok(format!("Chain {} monitor shut down", chain.name).into()) diff --git a/src/definitions.rs b/src/definitions.rs index 927fce3..9fdb7f8 100644 --- a/src/definitions.rs +++ b/src/definitions.rs @@ -176,6 +176,7 @@ pub mod api_v2 { pub enum WithdrawalStatus { Waiting, Failed, + Forced, Completed, } @@ -185,24 +186,23 @@ pub mod api_v2 { pub supported_currencies: HashMap, } - #[allow(dead_code)] // TODO: Use this for health response? #[derive(Debug, Serialize)] - struct ServerHealth { - server_info: ServerInfo, - connected_rpcs: Vec, - status: Health, + pub struct ServerHealth { + pub server_info: ServerInfo, + pub connected_rpcs: Vec, + pub status: Health, } - #[derive(Debug, Serialize)] - struct RpcInfo { - rpc_url: String, - chain_name: String, - status: Health, + #[derive(Debug, Serialize, Clone)] + pub struct RpcInfo { + pub rpc_url: String, + pub chain_name: String, + pub status: Health, } - #[derive(Debug, Serialize)] + #[derive(Debug, Serialize, Clone, PartialEq, Copy)] #[serde(rename_all = "lowercase")] - enum Health { + pub enum Health { Ok, Degraded, Critical, diff --git a/src/handlers/health.rs b/src/handlers/health.rs index a606887..a3ae998 100644 --- a/src/handlers/health.rs +++ b/src/handlers/health.rs @@ -1,4 +1,4 @@ -use crate::definitions::api_v2::ServerStatus; +use crate::definitions::api_v2::{ServerHealth, ServerStatus}; use crate::state::State; use axum::{extract::State as ExtractState, http::StatusCode, Json}; @@ -18,12 +18,18 @@ pub async fn status( } pub async fn health( - ExtractState(_state): ExtractState, + ExtractState(state): ExtractState, ) -> ( [(axum::http::header::HeaderName, &'static str); 1], - Json, + Json, ) { - todo!(); + match state.server_health().await { + Ok(status) => ( + [(axum::http::header::CACHE_CONTROL, "no-store")], + Json(status), + ), + Err(_) => panic!("db connection is down, state is lost"), + } } pub async fn audit(ExtractState(_state): ExtractState) -> StatusCode { diff --git a/src/state.rs b/src/state.rs index 91b414f..b219bd7 100644 --- a/src/state.rs +++ b/src/state.rs @@ -4,7 +4,7 @@ use crate::{ database::Database, definitions::api_v2::{ CurrencyProperties, OrderCreateResponse, OrderInfo, OrderQuery, OrderResponse, OrderStatus, - ServerInfo, ServerStatus, + ServerHealth, ServerInfo, ServerStatus, }, error::{Error, OrderError}, signer::Signer, @@ -13,6 +13,7 @@ use crate::{ use std::collections::HashMap; +use crate::definitions::api_v2::{Health, RpcInfo}; use substrate_crypto_light::common::{AccountId32, AsBase58}; use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; @@ -122,6 +123,15 @@ impl State { }; res.send(server_status).map_err(|_| Error::Fatal)?; } + StateAccessRequest::ServerHealth(res) => { + let connected_rpcs = state.chain_manager.get_connected_rpcs().await?; + let server_health = ServerHealth { + server_info: state.server_info.clone(), + connected_rpcs: connected_rpcs.clone(), + status: Self::overall_health(&connected_rpcs), + }; + res.send(server_health).map_err(|_| Error::Fatal)?; + } StateAccessRequest::OrderPaid(id) => { // Only perform actions if the record is saved in ledger match state.db.mark_paid(id.clone()).await { @@ -164,8 +174,23 @@ impl State { Ok(Self { tx }) } + fn overall_health(connected_rpcs: &Vec) -> Health { + if connected_rpcs.iter().all(|rpc| rpc.status == Health::Ok) { + Health::Ok + } else if connected_rpcs.iter().any(|rpc| rpc.status == Health::Ok) { + Health::Degraded + } else { + Health::Critical + } + } + pub async fn connect_chain(&self, assets: HashMap) { - self.tx.send(StateAccessRequest::ConnectChain(assets)).await; + self.tx + .send(StateAccessRequest::ConnectChain(assets)) + .await + .unwrap_or_else(|e| { + tracing::error!("Failed to send ConnectChain request: {}", e); + }); } pub async fn order_status(&self, order: &str) -> Result { @@ -189,6 +214,15 @@ impl State { rx.await.map_err(|_| Error::Fatal) } + pub async fn server_health(&self) -> Result { + let (res, rx) = oneshot::channel(); + self.tx + .send(StateAccessRequest::ServerHealth(res)) + .await + .map_err(|_| Error::Fatal)?; + rx.await.map_err(|_| Error::Fatal) + } + pub async fn create_order(&self, order_query: OrderQuery) -> Result { let (res, rx) = oneshot::channel(); /* @@ -253,6 +287,7 @@ enum StateAccessRequest { res: oneshot::Sender, }, ServerStatus(oneshot::Sender), + ServerHealth(oneshot::Sender), OrderPaid(String), }