Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Handle DB schema version mismatch #197

Merged
merged 1 commit into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions catalyst-gateway/bin/src/service/api/health/ready_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
resp_2xx::NoContent,
resp_5xx::{server_error, ServerError, ServiceUnavailable},
},
state::State,
state::{SchemaVersionStatus, State},
};

/// All responses
Expand Down Expand Up @@ -47,11 +47,24 @@ pub(crate) type AllResponses = response! {
/// * 500 Server Error - If anything within this function fails unexpectedly. (Possible
/// but unlikely)
/// * 503 Service Unavailable - Service is not ready, do not send other requests.
#[allow(clippy::unused_async)]
pub(crate) async fn endpoint(state: Data<&Arc<State>>) -> AllResponses {
match state.event_db.schema_version_check().await {
Ok(_) => T204(NoContent),
Ok(_) => {
tracing::debug!("DB schema version status ok");
if let Ok(mut g) = state.schema_version_status.lock() {
*g = SchemaVersionStatus::Ok;
}
T204(NoContent)
},
Err(crate::event_db::error::Error::TimedOut) => T503(ServiceUnavailable),
Err(err) => T500(server_error!("{}", err.to_string())),
Err(err) => {
tracing::error!("DB schema version status mismatch");
if let Ok(mut g) = state.schema_version_status.lock() {
*g = SchemaVersionStatus::Mismatch;
T503(ServiceUnavailable)
} else {
T500(server_error!("{}", err.to_string()))
}
},
}
}
26 changes: 21 additions & 5 deletions catalyst-gateway/bin/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,46 @@
//! Shared state used by all endpoints.
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use crate::{
cli::Error,
event_db::{establish_connection, queries::EventDbQueries},
};

/// The status of the expected DB schema version.
pub(crate) enum SchemaVersionStatus {
/// The current DB schema version matches what is expected.
Ok,
/// There is a mismatch between the current DB schema version
/// and what is expected.
Mismatch,
}

/// Global State of the service
pub(crate) struct State {
/// This can be None, or a handle to the DB.
/// If the DB fails, it can be set to None.
/// If its None, an attempt to get it will try and connect to the DB.
/// This is Private, it needs to be accessed with a function.
// event_db_handle: Arc<ArcSwap<Option<dyn EventDbQueries>>>, // Private need to get it with a
// function.
// event_db_handle: Arc<ArcSwap<Option<dyn EventDbQueries>>>,
// Private need to get it with a function.
pub(crate) event_db: Arc<dyn EventDbQueries>, /* This needs to be obsoleted, we want the DB
* to be able to be down. */
/// Status of the last DB schema version check.
pub(crate) schema_version_status: Mutex<SchemaVersionStatus>,
}

impl State {
/// Create a new global [`State`]
pub(crate) async fn new(database_url: Option<String>) -> Result<Self, Error> {
// Get a configured pool to the Database.
// Get a configured pool to the Database, runs schema version check internally.
let event_db = Arc::new(establish_connection(database_url, false).await?);

let state = Self { event_db };
let state = Self {
event_db,
// It is safe to assume that the schema version matches if `event_db` doesn't fail,
// due to the interior check ran by `establish_connection`.
schema_version_status: Mutex::new(SchemaVersionStatus::Ok),
};

// We don't care if this succeeds or not.
// We just try our best to connect to the event DB.
Expand Down