diff --git a/catalyst-gateway/bin/src/service/api/health/ready_get.rs b/catalyst-gateway/bin/src/service/api/health/ready_get.rs index 6fbb16d284a..bbaa646e4a9 100644 --- a/catalyst-gateway/bin/src/service/api/health/ready_get.rs +++ b/catalyst-gateway/bin/src/service/api/health/ready_get.rs @@ -9,6 +9,8 @@ use poem_extensions::{ }; use crate::{ + cli::Error, + event_db::error::Error as DBError, service::common::responses::{ resp_2xx::NoContent, resp_5xx::{server_error, ServerError, ServiceUnavailable}, @@ -48,23 +50,22 @@ pub(crate) type AllResponses = response! { /// but unlikely) /// * 503 Service Unavailable - Service is not ready, do not send other requests. pub(crate) async fn endpoint(state: Data<&Arc>) -> AllResponses { - match state.event_db.schema_version_check().await { + match state.schema_version_check().await { Ok(_) => { tracing::debug!("DB schema version status ok"); - if let Ok(mut g) = state.schema_version_status.lock() { - *g = SchemaVersionStatus::Ok; - } + state.set_schema_version_status(SchemaVersionStatus::Ok); T204(NoContent) }, - Err(crate::event_db::error::Error::TimedOut) => T503(ServiceUnavailable), - Err(err) => { - tracing::error!("DB schema version status mismatch"); - if let Ok(mut g) = state.schema_version_status.lock() { - *g = SchemaVersionStatus::Mismatch; - T503(ServiceUnavailable) - } else { - T500(server_error!("{}", err.to_string())) - } + Err(Error::EventDb(DBError::MismatchedSchema { was, expected })) => { + tracing::error!( + expected = expected, + current = was, + "DB schema version status mismatch" + ); + state.set_schema_version_status(SchemaVersionStatus::Mismatch); + T503(ServiceUnavailable) }, + Err(Error::EventDb(DBError::TimedOut)) => T503(ServiceUnavailable), + Err(err) => T500(server_error!("{}", err.to_string())), } } diff --git a/catalyst-gateway/bin/src/service/api/registration/mod.rs b/catalyst-gateway/bin/src/service/api/registration/mod.rs index 6de2897705a..28be780da32 100644 --- a/catalyst-gateway/bin/src/service/api/registration/mod.rs +++ b/catalyst-gateway/bin/src/service/api/registration/mod.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use poem::web::Data; use poem_extensions::{ response, - UniResponse::{T200, T404, T500}, + UniResponse::{T200, T404, T500, T503}, }; use poem_openapi::{ param::{Path, Query}, @@ -13,17 +13,20 @@ use poem_openapi::{ }; use crate::{ - service::common::{ - objects::{ - event_id::EventId, voter_registration::VoterRegistration, - voting_public_key::VotingPublicKey, - }, - responses::{ - resp_2xx::OK, - resp_4xx::NotFound, - resp_5xx::{server_error, ServerError}, + service::{ + common::{ + objects::{ + event_id::EventId, voter_registration::VoterRegistration, + voting_public_key::VotingPublicKey, + }, + responses::{ + resp_2xx::OK, + resp_4xx::NotFound, + resp_5xx::{server_error, ServerError, ServiceUnavailable}, + }, + tags::ApiTags, }, - tags::ApiTags, + utilities::middleware::schema_validation::schema_version_validation, }, state::State, }; @@ -36,7 +39,8 @@ impl RegistrationApi { #[oai( path = "/voter/:voting_key", method = "get", - operation_id = "getVoterInfo" + operation_id = "getVoterInfo", + transform = "schema_version_validation" )] /// Voter's info /// @@ -65,24 +69,28 @@ impl RegistrationApi { 200: OK>, 404: NotFound, 500: ServerError, + 503: ServiceUnavailable, } { - let voter = pool - .event_db - .get_voter( - &event_id.0.map(Into::into), - voting_key.0 .0, - *with_delegators, - ) - .await; - match voter { - Ok(voter) => { - match voter.try_into() { - Ok(voter) => T200(OK(Json(voter))), - Err(err) => T500(server_error!("{}", err.to_string())), - } - }, - Err(crate::event_db::error::Error::NotFound(_)) => T404(NotFound), - Err(err) => T500(server_error!("{}", err.to_string())), + if let Ok(event_db) = pool.event_db() { + let voter = event_db + .get_voter( + &event_id.0.map(Into::into), + voting_key.0 .0, + *with_delegators, + ) + .await; + match voter { + Ok(voter) => { + match voter.try_into() { + Ok(voter) => T200(OK(Json(voter))), + Err(err) => T500(server_error!("{}", err.to_string())), + } + }, + Err(crate::event_db::error::Error::NotFound(_)) => T404(NotFound), + Err(err) => T500(server_error!("{}", err.to_string())), + } + } else { + T503(ServiceUnavailable) } } } diff --git a/catalyst-gateway/bin/src/service/api/v0/mod.rs b/catalyst-gateway/bin/src/service/api/v0/mod.rs index 5e097c79c9c..90cfdb268e4 100644 --- a/catalyst-gateway/bin/src/service/api/v0/mod.rs +++ b/catalyst-gateway/bin/src/service/api/v0/mod.rs @@ -5,7 +5,12 @@ use std::sync::Arc; use poem::web::Data; use poem_openapi::{payload::Binary, OpenApi}; -use crate::{service::common::tags::ApiTags, state::State}; +use crate::{ + service::{ + common::tags::ApiTags, utilities::middleware::schema_validation::schema_version_validation, + }, + state::State, +}; mod message_post; mod plans_get; @@ -25,7 +30,8 @@ impl V0Api { #[oai( path = "/vote/active/plans", method = "get", - operation_id = "GetActivePlans" + operation_id = "GetActivePlans", + transform = "schema_version_validation" )] async fn plans_get(&self, state: Data<&Arc>) -> plans_get::AllResponses { plans_get::endpoint(state).await diff --git a/catalyst-gateway/bin/src/service/api/v1/mod.rs b/catalyst-gateway/bin/src/service/api/v1/mod.rs index 0b373aaa30f..ab5d7782a59 100644 --- a/catalyst-gateway/bin/src/service/api/v1/mod.rs +++ b/catalyst-gateway/bin/src/service/api/v1/mod.rs @@ -6,9 +6,12 @@ use poem::web::{Data, Path}; use poem_openapi::{param::Query, payload::Json, OpenApi}; use crate::{ - service::common::{ - objects::{account_votes::AccountId, fragments_batch::FragmentsBatch}, - tags::ApiTags, + service::{ + common::{ + objects::{account_votes::AccountId, fragments_batch::FragmentsBatch}, + tags::ApiTags, + }, + utilities::middleware::schema_validation::schema_version_validation, }, state::State, }; @@ -25,7 +28,8 @@ impl V1Api { #[oai( path = "/votes/plan/account-votes/:account_id", method = "get", - operation_id = "AccountVotes" + operation_id = "AccountVotes", + transform = "schema_version_validation" )] /// Get from all active vote plans, the index of the voted proposals /// by th given account ID. @@ -43,6 +47,7 @@ impl V1Api { method = "post", operation_id = "fragments", tag = "ApiTags::Fragments", + transform = "schema_version_validation", deprecated = true )] async fn fragments_post( @@ -61,6 +66,7 @@ impl V1Api { method = "get", operation_id = "fragmentsStatuses", tag = "ApiTags::Fragments", + transform = "schema_version_validation", deprecated = true )] async fn fragments_statuses( diff --git a/catalyst-gateway/bin/src/service/common/responses/resp_5xx.rs b/catalyst-gateway/bin/src/service/common/responses/resp_5xx.rs index a647d23f461..62aafacee95 100644 --- a/catalyst-gateway/bin/src/service/common/responses/resp_5xx.rs +++ b/catalyst-gateway/bin/src/service/common/responses/resp_5xx.rs @@ -93,3 +93,9 @@ impl ResponseError for ServerError { /// /// #### NO DATA BODY IS RETURNED FOR THIS RESPONSE pub(crate) struct ServiceUnavailable; + +impl ResponseError for ServiceUnavailable { + fn status(&self) -> StatusCode { + StatusCode::SERVICE_UNAVAILABLE + } +} diff --git a/catalyst-gateway/bin/src/service/mod.rs b/catalyst-gateway/bin/src/service/mod.rs index 7c432365f73..6e49be2fde6 100644 --- a/catalyst-gateway/bin/src/service/mod.rs +++ b/catalyst-gateway/bin/src/service/mod.rs @@ -23,6 +23,9 @@ pub(crate) enum Error { /// An IO error has occurred #[error(transparent)] Io(#[from] std::io::Error), + /// A mismatch in the expected EventDB schema version + #[error("expected schema version mismatch")] + SchemaVersionMismatch, } /// # Run Catalyst Gateway Service. diff --git a/catalyst-gateway/bin/src/service/utilities/middleware/mod.rs b/catalyst-gateway/bin/src/service/utilities/middleware/mod.rs index 01b7f96939d..15f9b59f769 100644 --- a/catalyst-gateway/bin/src/service/utilities/middleware/mod.rs +++ b/catalyst-gateway/bin/src/service/utilities/middleware/mod.rs @@ -1,3 +1,4 @@ //! Custom POEM Middleware for this service. +pub(crate) mod schema_validation; pub(crate) mod tracing_mw; diff --git a/catalyst-gateway/bin/src/service/utilities/middleware/schema_validation.rs b/catalyst-gateway/bin/src/service/utilities/middleware/schema_validation.rs new file mode 100644 index 00000000000..87fd858fa7e --- /dev/null +++ b/catalyst-gateway/bin/src/service/utilities/middleware/schema_validation.rs @@ -0,0 +1,62 @@ +//! Middleware to verify the status of the last DB schema version validation. +//! +//! If a mismatch is detected, the middleware returns an error with `ServiceUnavailable` +//! status code (503). Otherwise, the middleware calls and returns the wrapped endpoint's +//! response. +//! +//! This middleware checks the `State.schema_version_status` value, if it is Ok, +//! the wrapped endpoint is called and its response is returned. + +use std::sync::Arc; + +use poem::{web::Data, Endpoint, EndpointExt, Middleware, Request, Result}; + +use crate::{ + service::common::responses::resp_5xx::ServiceUnavailable, + state::{SchemaVersionStatus, State}, +}; + +/// A middleware that raises an error with `ServiceUnavailable` and 503 status code +/// if a DB schema version mismatch is found the existing `State`. +pub(crate) struct SchemaVersionValidation; + +impl Middleware for SchemaVersionValidation { + type Output = SchemaVersionValidationImpl; + + fn transform(&self, ep: E) -> Self::Output { + SchemaVersionValidationImpl { ep } + } +} + +/// The new endpoint type generated by the `SchemaVersionValidation`. +pub(crate) struct SchemaVersionValidationImpl { + /// Endpoint wrapped by the middleware. + ep: E, +} + +#[poem::async_trait] +impl Endpoint for SchemaVersionValidationImpl { + type Output = E::Output; + + async fn call(&self, req: Request) -> Result { + if let Some(state) = req.data::>>() { + // Check if the inner schema version status is set to `Mismatch`, + // if so, return the `ServiceUnavailable` error, which implements + // `ResponseError`, with status code `503`. + // Otherwise, return the endpoint as usual. + if state.is_schema_version_status(&SchemaVersionStatus::Mismatch) { + return Err(ServiceUnavailable.into()); + } + } + // Calls the endpoint with the request, and returns the response. + self.ep.call(req).await + } +} + +/// A function that wraps an endpoint with the `SchemaVersionValidation`. +/// +/// This function is convenient to use with `poem-openapi` [operation parameters](https://docs.rs/poem-openapi/latest/poem_openapi/attr.OpenApi.html#operation-parameters) via the +/// `transform` attribute. +pub(crate) fn schema_version_validation(ep: impl Endpoint) -> impl Endpoint { + ep.with(SchemaVersionValidation) +} diff --git a/catalyst-gateway/bin/src/state/mod.rs b/catalyst-gateway/bin/src/state/mod.rs index d5c32cdb414..6e2dfb51090 100644 --- a/catalyst-gateway/bin/src/state/mod.rs +++ b/catalyst-gateway/bin/src/state/mod.rs @@ -1,12 +1,14 @@ //! Shared state used by all endpoints. -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use crate::{ cli::Error, event_db::{establish_connection, queries::EventDbQueries}, + service::Error as ServiceError, }; /// The status of the expected DB schema version. +#[derive(Debug, PartialEq, Eq)] pub(crate) enum SchemaVersionStatus { /// The current DB schema version matches what is expected. Ok, @@ -23,10 +25,10 @@ pub(crate) struct State { /// This is Private, it needs to be accessed with a function. // event_db_handle: Arc>>, // Private need to get it with a function. - pub(crate) event_db: Arc, /* This needs to be obsoleted, we want the DB - * to be able to be down. */ + event_db: Arc, /* This needs to be obsoleted, we want the DB + * to be able to be down. */ /// Status of the last DB schema version check. - pub(crate) schema_version_status: Mutex, + schema_version_status: Mutex, } impl State { @@ -49,8 +51,50 @@ impl State { Ok(state) } - // pub(crate) async fn event_db(&self) -> Option> { - // - // - // } + /// Get the reference to the database connection pool for `EventDB`. + pub(crate) fn event_db(&self) -> Result, Error> { + let guard = self.schema_version_status_lock(); + match *guard { + SchemaVersionStatus::Ok => Ok(self.event_db.clone()), + SchemaVersionStatus::Mismatch => Err(ServiceError::SchemaVersionMismatch.into()), + } + } + + /// Check the DB schema version matches the one expected by the service. + pub(crate) async fn schema_version_check(&self) -> Result { + Ok(self.event_db.schema_version_check().await?) + } + + /// Compare the `State`'s inner value with a given `&SchemaVersionStatus`, returns + /// `bool`. + pub(crate) fn is_schema_version_status(&self, svs: &SchemaVersionStatus) -> bool { + let guard = self.schema_version_status_lock(); + &*guard == svs + } + + /// Set the state's `SchemaVersionStatus`. + pub(crate) fn set_schema_version_status(&self, svs: SchemaVersionStatus) { + let mut guard = self.schema_version_status_lock(); + tracing::debug!( + status = format!("{:?}", svs), + "db schema version status was set" + ); + *guard = svs; + } + + /// Get the `MutexGuard` from inner the variable. + /// + /// Handle poisoned mutex by recovering the guard, and tracing the error. + fn schema_version_status_lock(&self) -> MutexGuard { + match self.schema_version_status.lock() { + Ok(guard) => guard, + Err(poisoned) => { + tracing::error!( + error = format!("{:?}", poisoned), + "recovering DB schema version status fom poisoned mutex" + ); + poisoned.into_inner() + }, + } + } }