diff --git a/crates/migrations/src/lib.rs b/crates/migrations/src/lib.rs index 7dee76e..7bb5d2a 100644 --- a/crates/migrations/src/lib.rs +++ b/crates/migrations/src/lib.rs @@ -2,11 +2,11 @@ pub mod pg_version; mod sql; use indoc::formatdoc; -use pg_version::fetch_and_check_postgres_version; +use pg_version::{check_postgres_version, fetch_and_check_postgres_version}; use sql::ARCHIMEDES_MIGRATIONS; -use sqlx::{query, Acquire, Error as SqlxError, PgExecutor, Postgres, Row}; +use sqlx::{query, query_as, Acquire, Error as SqlxError, FromRow, PgExecutor, Postgres}; use thiserror::Error; -use tracing::info; +use tracing::{info, warn}; #[derive(Error, Debug)] pub enum MigrateError { @@ -14,10 +14,17 @@ pub enum MigrateError { ParseVersionError(#[from] std::num::ParseIntError), #[error("This version of Archimedes requires PostgreSQL v12.0 or greater (detected `server_version_num` = {0})")] IncompatibleVersion(u32), + #[error("Database is using Graphile Worker schema revision {} which includes breaking migration {}, but the currently running worker only supports up to revision {}. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible.", .latest_migration, .latest_breaking_migration, .highest_migration)] + IncompatbleRevision { + latest_migration: i32, + latest_breaking_migration: i32, + highest_migration: u32, + }, #[error("Error occured while migrate: {0}")] SqlError(#[from] sqlx::Error), } +/// Installs the Archimedes schema into the database. async fn install_schema<'e, E>(executor: E, escaped_schema: &str) -> Result<(), MigrateError> where E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Clone, @@ -51,16 +58,32 @@ where Ok(()) } -pub async fn migrate<'e, E>(executor: E, escaped_schema: &str) -> Result<(), MigrateError> +#[derive(FromRow, Default)] +struct LastMigration { + server_version_num: String, + id: Option, + biggest_breaking_id: Option, +} + +/// Returns the last migration that was run against the database. +/// It also installs the Archimedes schema if it doesn't exist. +async fn get_last_migration<'e, E>( + executor: &E, + escaped_schema: &str, +) -> Result where E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Send + Sync + Clone, { - let migrations_status_query = - format!("select id from {escaped_schema}.migrations order by id desc limit 1"); - let last_migration_query_result = query(&migrations_status_query) - .fetch_optional(executor.clone()) + let migrations_status_query = formatdoc!( + r#" + select current_setting('server_version_num') as server_version_num, + (select id from {escaped_schema}.migrations order by id desc limit 1) as id, + (select id from {escaped_schema}.migrations where breaking is true order by id desc limit 1) as biggest_breaking_id; + "# + ); + let last_migration_query_result = query_as::<_, LastMigration>(&migrations_status_query) + .fetch_one(executor.clone()) .await; - let last_migration = match last_migration_query_result { Err(SqlxError::Database(e)) => { let Some(code) = e.code() else { @@ -69,22 +92,48 @@ where if code == "42P01" { install_schema(executor.clone(), escaped_schema).await?; - } else { - return Err(MigrateError::SqlError(SqlxError::Database(e))); + Default::default() } - None + return Err(MigrateError::SqlError(SqlxError::Database(e))); } Err(e) => { return Err(MigrateError::SqlError(e)); } - Ok(optional_row) => optional_row.map(|row| row.get("id")), + Ok(row) => row, }; + Ok(last_migration) +} - for (i, migration) in ARCHIMEDES_MIGRATIONS.iter().enumerate() { - let migration_number = (i + 1) as i32; +impl LastMigration { + fn is_before_number(&self, migration_number: u32) -> bool { + let migration_id = self.id.and_then(|id| id.try_into().ok()); + self.id.is_none() || migration_number > migration_id.unwrap_or(0) + } +} - if last_migration.is_none() || migration_number > last_migration.unwrap() { +/// Runs the migrations against the database. +pub async fn migrate<'e, E>(executor: E, escaped_schema: &str) -> Result<(), MigrateError> +where + E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Send + Sync + Clone, +{ + let last_migration = get_last_migration(&executor, escaped_schema).await?; + + check_postgres_version(&last_migration.server_version_num)?; + let latest_migration = last_migration.id; + let latest_breaking_migration = last_migration.biggest_breaking_id; + + let mut highest_migration = 0; + let mut migrated = false; + for migration in ARCHIMEDES_MIGRATIONS.iter() { + let migration_number = migration.migration_number(); + + if migration_number > highest_migration { + highest_migration = migration_number; + } + + if last_migration.is_before_number(migration_number) { + migrated = true; info!( migration_number, migration_name = migration.name(), @@ -102,7 +151,7 @@ where let sql = format!("insert into {escaped_schema}.migrations (id, breaking) values ($1, $2)"); query(&sql) - .bind(migration_number) + .bind(migration_number as i64) .bind(migration.is_breaking()) .execute(tx.as_mut()) .await?; @@ -111,5 +160,31 @@ where } } + if migrated { + info!("Migrations complete"); + } + + if let Some(latest_breaking_migration) = latest_breaking_migration { + if highest_migration < latest_breaking_migration as u32 { + return Err(MigrateError::IncompatbleRevision { + latest_migration: latest_migration.unwrap_or(0), + latest_breaking_migration, + highest_migration, + }); + } + } + + if let Some(latest_migration) = latest_migration { + if highest_migration < latest_migration as u32 { + warn!( + latest_migration, + highest_migration, + "Database is using Graphile Worker schema revision {}, but the currently running worker only supports up to revision {} which may or may not be compatible. Please ensure all versions of Graphile Worker you're running are compatible, or use Worker Pro which will perform this check for you. Attempting to continue regardless.", + latest_migration, + highest_migration, + ); + } + } + Ok(()) } diff --git a/crates/migrations/src/sql/mod.rs b/crates/migrations/src/sql/mod.rs index 5d816d4..18a091b 100644 --- a/crates/migrations/src/sql/mod.rs +++ b/crates/migrations/src/sql/mod.rs @@ -19,6 +19,7 @@ pub mod m000016; pub mod m000017; pub mod m000018; +#[derive(Default)] pub struct ArchimedesMigration { name: &'static str, is_breaking: bool, @@ -34,6 +35,23 @@ impl ArchimedesMigration { self.is_breaking } + /// Parses the migration name and returns the migration number. + /// ```rust + /// let migration = ArchimedesMigration { + /// name: "m000001_initial", + /// ..Default::default(), + /// }; + /// assert_eq!(migration.migration_number(), 1); + /// let migration = ArchimedesMigration { + /// name: "m000002", + /// ..Default::default(), + /// }; + /// assert_eq!(migration.migration_number(), 2); + /// ``` + pub fn migration_number(&self) -> u32 { + self.name[1..7].parse().expect("Invalid migration name") + } + pub async fn execute<'e>( &self, tx: &mut Transaction<'e, Postgres>,