Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(service): schema mismatch disables service #198

Merged
merged 8 commits into from
Jan 3, 2024
27 changes: 14 additions & 13 deletions catalyst-gateway/bin/src/service/api/health/ready_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use poem_extensions::{
};

use crate::{
cli::Error,
event_db::error::Error as DBError,
service::common::responses::{
resp_2xx::NoContent,
resp_5xx::{server_error, ServerError, ServiceUnavailable},
Expand Down Expand Up @@ -48,23 +50,22 @@ pub(crate) type AllResponses = response! {
/// but unlikely)
/// * 503 Service Unavailable - Service is not ready, do not send other requests.
pub(crate) async fn endpoint(state: Data<&Arc<State>>) -> AllResponses {
match state.event_db.schema_version_check().await {
match state.schema_version_check().await {
Ok(_) => {
tracing::debug!("DB schema version status ok");
if let Ok(mut g) = state.schema_version_status.lock() {
*g = SchemaVersionStatus::Ok;
}
state.set_schema_version_status(SchemaVersionStatus::Ok);
T204(NoContent)
},
Err(crate::event_db::error::Error::TimedOut) => T503(ServiceUnavailable),
Err(err) => {
tracing::error!("DB schema version status mismatch");
if let Ok(mut g) = state.schema_version_status.lock() {
*g = SchemaVersionStatus::Mismatch;
T503(ServiceUnavailable)
} else {
T500(server_error!("{}", err.to_string()))
}
Err(Error::EventDb(DBError::MismatchedSchema { was, expected })) => {
tracing::error!(
expected = expected,
was = was,
Mr-Leshiy marked this conversation as resolved.
Show resolved Hide resolved
"DB schema version status mismatch"
);
state.set_schema_version_status(SchemaVersionStatus::Mismatch);
T503(ServiceUnavailable)
},
Err(Error::EventDb(DBError::TimedOut)) => T503(ServiceUnavailable),
Err(err) => T500(server_error!("{}", err.to_string())),
}
}
66 changes: 37 additions & 29 deletions catalyst-gateway/bin/src/service/api/registration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use poem::web::Data;
use poem_extensions::{
response,
UniResponse::{T200, T404, T500},
UniResponse::{T200, T404, T500, T503},
};
use poem_openapi::{
param::{Path, Query},
Expand All @@ -13,17 +13,20 @@ use poem_openapi::{
};

use crate::{
service::common::{
objects::{
event_id::EventId, voter_registration::VoterRegistration,
voting_public_key::VotingPublicKey,
},
responses::{
resp_2xx::OK,
resp_4xx::NotFound,
resp_5xx::{server_error, ServerError},
service::{
common::{
objects::{
event_id::EventId, voter_registration::VoterRegistration,
voting_public_key::VotingPublicKey,
},
responses::{
resp_2xx::OK,
resp_4xx::NotFound,
resp_5xx::{server_error, ServerError, ServiceUnavailable},
},
tags::ApiTags,
},
tags::ApiTags,
utilities::middleware::schema_validation::schema_version_validation,
},
state::State,
};
Expand All @@ -36,7 +39,8 @@ impl RegistrationApi {
#[oai(
path = "/voter/:voting_key",
method = "get",
operation_id = "getVoterInfo"
operation_id = "getVoterInfo",
transform = "schema_version_validation"
)]
/// Voter's info
///
Expand Down Expand Up @@ -65,24 +69,28 @@ impl RegistrationApi {
200: OK<Json<VoterRegistration>>,
404: NotFound,
500: ServerError,
503: ServiceUnavailable,
} {
let voter = pool
.event_db
.get_voter(
&event_id.0.map(Into::into),
voting_key.0 .0,
*with_delegators,
)
.await;
match voter {
Ok(voter) => {
match voter.try_into() {
Ok(voter) => T200(OK(Json(voter))),
Err(err) => T500(server_error!("{}", err.to_string())),
}
},
Err(crate::event_db::error::Error::NotFound(_)) => T404(NotFound),
Err(err) => T500(server_error!("{}", err.to_string())),
if let Some(event_db) = pool.event_db() {
Mr-Leshiy marked this conversation as resolved.
Show resolved Hide resolved
let voter = event_db
.get_voter(
&event_id.0.map(Into::into),
voting_key.0 .0,
*with_delegators,
)
.await;
match voter {
Ok(voter) => {
match voter.try_into() {
Ok(voter) => T200(OK(Json(voter))),
Err(err) => T500(server_error!("{}", err.to_string())),
}
},
Err(crate::event_db::error::Error::NotFound(_)) => T404(NotFound),
Err(err) => T500(server_error!("{}", err.to_string())),
}
} else {
T503(ServiceUnavailable)
}
}
}
10 changes: 8 additions & 2 deletions catalyst-gateway/bin/src/service/api/v0/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ use std::sync::Arc;
use poem::web::Data;
use poem_openapi::{payload::Binary, OpenApi};

use crate::{service::common::tags::ApiTags, state::State};
use crate::{
service::{
common::tags::ApiTags, utilities::middleware::schema_validation::schema_version_validation,
},
state::State,
};

mod message_post;
mod plans_get;
Expand All @@ -25,7 +30,8 @@ impl V0Api {
#[oai(
path = "/vote/active/plans",
method = "get",
operation_id = "GetActivePlans"
operation_id = "GetActivePlans",
transform = "schema_version_validation"
)]
async fn plans_get(&self, state: Data<&Arc<State>>) -> plans_get::AllResponses {
plans_get::endpoint(state).await
Expand Down
14 changes: 10 additions & 4 deletions catalyst-gateway/bin/src/service/api/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ use poem::web::{Data, Path};
use poem_openapi::{param::Query, payload::Json, OpenApi};

use crate::{
service::common::{
objects::{account_votes::AccountId, fragments_batch::FragmentsBatch},
tags::ApiTags,
service::{
common::{
objects::{account_votes::AccountId, fragments_batch::FragmentsBatch},
tags::ApiTags,
},
utilities::middleware::schema_validation::schema_version_validation,
},
state::State,
};
Expand All @@ -25,7 +28,8 @@ impl V1Api {
#[oai(
path = "/votes/plan/account-votes/:account_id",
method = "get",
operation_id = "AccountVotes"
operation_id = "AccountVotes",
transform = "schema_version_validation"
)]
/// Get from all active vote plans, the index of the voted proposals
/// by th given account ID.
Expand All @@ -43,6 +47,7 @@ impl V1Api {
method = "post",
operation_id = "fragments",
tag = "ApiTags::Fragments",
transform = "schema_version_validation",
deprecated = true
)]
async fn fragments_post(
Expand All @@ -61,6 +66,7 @@ impl V1Api {
method = "get",
operation_id = "fragmentsStatuses",
tag = "ApiTags::Fragments",
transform = "schema_version_validation",
deprecated = true
)]
async fn fragments_statuses(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,9 @@ impl ResponseError for ServerError {
///
/// #### NO DATA BODY IS RETURNED FOR THIS RESPONSE
pub(crate) struct ServiceUnavailable;

impl ResponseError for ServiceUnavailable {
fn status(&self) -> StatusCode {
StatusCode::SERVICE_UNAVAILABLE
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Custom POEM Middleware for this service.

pub(crate) mod schema_validation;
pub(crate) mod tracing_mw;
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//! Middleware to verify the status of the last DB schema version validation.
//!
//! If a mismatch is detected, the middleware returns an error with `ServiceUnavailable`
//! status code (503). Otherwise, the middleware calls and returns the wrapped endpoint's
//! response.
//!
//! This middleware checks the `State.schema_version_status` value, if it is Ok,
//! the wrapped endpoint is called and its response is returned.

use std::sync::Arc;

use poem::{web::Data, Endpoint, EndpointExt, Middleware, Request, Result};

use crate::{
service::common::responses::resp_5xx::ServiceUnavailable,
state::{SchemaVersionStatus, State},
};

/// A middleware that raises an error with `ServiceUnavailable` and 503 status code
/// if a DB schema version mismatch is found the existing `State`.
pub(crate) struct SchemaVersionValidation;

impl<E: Endpoint> Middleware<E> for SchemaVersionValidation {
type Output = SchemaVersionValidationImpl<E>;

fn transform(&self, ep: E) -> Self::Output {
SchemaVersionValidationImpl { ep }
}
}

/// The new endpoint type generated by the `SchemaVersionValidation`.
pub(crate) struct SchemaVersionValidationImpl<E> {
/// Endpoint wrapped by the middleware.
ep: E,
}

#[poem::async_trait]
impl<E: Endpoint> Endpoint for SchemaVersionValidationImpl<E> {
type Output = E::Output;

async fn call(&self, req: Request) -> Result<Self::Output> {
let req_data: Option<&Data<&Arc<State>>> = req.data();
Mr-Leshiy marked this conversation as resolved.
Show resolved Hide resolved
if let Some(state) = req_data {
if state.is_schema_version_status(&SchemaVersionStatus::Mismatch) {
saibatizoku marked this conversation as resolved.
Show resolved Hide resolved
return Err(ServiceUnavailable.into());
}
}
self.ep.call(req).await
}
}

/// A function that wraps an endpoint with the `SchemaVersionValidation`.
///
/// This function is convenient to use with `poem-openapi` [operation parameters](https://docs.rs/poem-openapi/latest/poem_openapi/attr.OpenApi.html#operation-parameters) via the
/// `transform` attribute.
pub(crate) fn schema_version_validation(ep: impl Endpoint) -> impl Endpoint {
ep.with(SchemaVersionValidation)
}
59 changes: 51 additions & 8 deletions catalyst-gateway/bin/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Shared state used by all endpoints.
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};

use crate::{
cli::Error,
event_db::{establish_connection, queries::EventDbQueries},
};

/// The status of the expected DB schema version.
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum SchemaVersionStatus {
/// The current DB schema version matches what is expected.
Ok,
Expand All @@ -23,10 +24,10 @@ pub(crate) struct State {
/// This is Private, it needs to be accessed with a function.
// event_db_handle: Arc<ArcSwap<Option<dyn EventDbQueries>>>,
// Private need to get it with a function.
pub(crate) event_db: Arc<dyn EventDbQueries>, /* This needs to be obsoleted, we want the DB
* to be able to be down. */
event_db: Arc<dyn EventDbQueries>, /* This needs to be obsoleted, we want the DB
* to be able to be down. */
/// Status of the last DB schema version check.
pub(crate) schema_version_status: Mutex<SchemaVersionStatus>,
schema_version_status: Mutex<SchemaVersionStatus>,
}

impl State {
Expand All @@ -49,8 +50,50 @@ impl State {
Ok(state)
}

// pub(crate) async fn event_db(&self) -> Option<Arc<dyn EventDbQueries>> {
//
//
// }
/// Get the reference to the database connection pool for `EventDB`.
pub(crate) fn event_db(&self) -> Option<Arc<dyn EventDbQueries>> {
Mr-Leshiy marked this conversation as resolved.
Show resolved Hide resolved
let guard = self._schema_version_status_lock();
match *guard {
SchemaVersionStatus::Ok => Some(self.event_db.clone()),
SchemaVersionStatus::Mismatch => None,
}
}

/// Check the DB schema version matches the one expected by the service.
pub(crate) async fn schema_version_check(&self) -> Result<i32, Error> {
Ok(self.event_db.schema_version_check().await?)
}

/// Compare the `State`'s inner value with a given `&SchemaVersionStatus`, returns
/// `bool`.
pub(crate) fn is_schema_version_status(&self, svs: &SchemaVersionStatus) -> bool {
let guard = self._schema_version_status_lock();
&*guard == svs
}

/// Set the state's `SchemaVersionStatus`.
pub(crate) fn set_schema_version_status(&self, svs: SchemaVersionStatus) {
let mut guard = self._schema_version_status_lock();
tracing::debug!(
status = format!("{:?}", svs),
"db schema version status was set"
);
*guard = svs;
}

/// Get the `MutexGuard<SchemaVersionStatus>` from inner the variable.
///
/// Handle poisoned mutex by recovering the guard, and tracing the error.
fn _schema_version_status_lock(&self) -> MutexGuard<SchemaVersionStatus> {
Mr-Leshiy marked this conversation as resolved.
Show resolved Hide resolved
match self.schema_version_status.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::error!(
error = format!("{:?}", poisoned),
"recovering DB schema version status fom poisoned mutex"
);
poisoned.into_inner()
},
}
}
}