Skip to content

Commit

Permalink
feat: Port breaking migration algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
leo91000 committed Jan 29, 2024
1 parent 7d8cae2 commit 1d19202
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 17 deletions.
109 changes: 92 additions & 17 deletions crates/migrations/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,29 @@ pub mod pg_version;
mod sql;

use indoc::formatdoc;
use pg_version::fetch_and_check_postgres_version;
use pg_version::{check_postgres_version, fetch_and_check_postgres_version};
use sql::ARCHIMEDES_MIGRATIONS;
use sqlx::{query, Acquire, Error as SqlxError, PgExecutor, Postgres, Row};
use sqlx::{query, query_as, Acquire, Error as SqlxError, FromRow, PgExecutor, Postgres};
use thiserror::Error;
use tracing::info;
use tracing::{info, warn};

#[derive(Error, Debug)]
pub enum MigrateError {
#[error("Error occured while parsing postgres version: {0}")]
ParseVersionError(#[from] std::num::ParseIntError),
#[error("This version of Archimedes requires PostgreSQL v12.0 or greater (detected `server_version_num` = {0})")]
IncompatibleVersion(u32),
#[error("Database is using Graphile Worker schema revision {} which includes breaking migration {}, but the currently running worker only supports up to revision {}. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible.", .latest_migration, .latest_breaking_migration, .highest_migration)]
IncompatbleRevision {
latest_migration: i32,
latest_breaking_migration: i32,
highest_migration: u32,
},
#[error("Error occured while migrate: {0}")]
SqlError(#[from] sqlx::Error),
}

/// Installs the Archimedes schema into the database.
async fn install_schema<'e, E>(executor: E, escaped_schema: &str) -> Result<(), MigrateError>
where
E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Clone,
Expand Down Expand Up @@ -51,16 +58,32 @@ where
Ok(())
}

pub async fn migrate<'e, E>(executor: E, escaped_schema: &str) -> Result<(), MigrateError>
#[derive(FromRow, Default)]
struct LastMigration {
server_version_num: String,
id: Option<i32>,
biggest_breaking_id: Option<i32>,
}

/// Returns the last migration that was run against the database.
/// It also installs the Archimedes schema if it doesn't exist.
async fn get_last_migration<'e, E>(
executor: &E,
escaped_schema: &str,
) -> Result<LastMigration, MigrateError>
where
E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Send + Sync + Clone,
{
let migrations_status_query =
format!("select id from {escaped_schema}.migrations order by id desc limit 1");
let last_migration_query_result = query(&migrations_status_query)
.fetch_optional(executor.clone())
let migrations_status_query = formatdoc!(
r#"
select current_setting('server_version_num') as server_version_num,
(select id from {escaped_schema}.migrations order by id desc limit 1) as id,
(select id from {escaped_schema}.migrations where breaking is true order by id desc limit 1) as biggest_breaking_id;
"#
);
let last_migration_query_result = query_as::<_, LastMigration>(&migrations_status_query)
.fetch_one(executor.clone())
.await;

let last_migration = match last_migration_query_result {
Err(SqlxError::Database(e)) => {
let Some(code) = e.code() else {
Expand All @@ -69,22 +92,48 @@ where

if code == "42P01" {
install_schema(executor.clone(), escaped_schema).await?;
} else {
return Err(MigrateError::SqlError(SqlxError::Database(e)));
Default::default()
}

None
return Err(MigrateError::SqlError(SqlxError::Database(e)));
}
Err(e) => {
return Err(MigrateError::SqlError(e));
}
Ok(optional_row) => optional_row.map(|row| row.get("id")),
Ok(row) => row,
};
Ok(last_migration)
}

for (i, migration) in ARCHIMEDES_MIGRATIONS.iter().enumerate() {
let migration_number = (i + 1) as i32;
impl LastMigration {
fn is_before_number(&self, migration_number: u32) -> bool {
let migration_id = self.id.and_then(|id| id.try_into().ok());
self.id.is_none() || migration_number > migration_id.unwrap_or(0)
}
}

if last_migration.is_none() || migration_number > last_migration.unwrap() {
/// Runs the migrations against the database.
pub async fn migrate<'e, E>(executor: E, escaped_schema: &str) -> Result<(), MigrateError>
where
E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Send + Sync + Clone,
{
let last_migration = get_last_migration(&executor, escaped_schema).await?;

check_postgres_version(&last_migration.server_version_num)?;
let latest_migration = last_migration.id;
let latest_breaking_migration = last_migration.biggest_breaking_id;

let mut highest_migration = 0;
let mut migrated = false;
for migration in ARCHIMEDES_MIGRATIONS.iter() {
let migration_number = migration.migration_number();

if migration_number > highest_migration {
highest_migration = migration_number;
}

if last_migration.is_before_number(migration_number) {
migrated = true;
info!(
migration_number,
migration_name = migration.name(),
Expand All @@ -102,7 +151,7 @@ where
let sql =
format!("insert into {escaped_schema}.migrations (id, breaking) values ($1, $2)");
query(&sql)
.bind(migration_number)
.bind(migration_number as i64)
.bind(migration.is_breaking())
.execute(tx.as_mut())
.await?;
Expand All @@ -111,5 +160,31 @@ where
}
}

if migrated {
info!("Migrations complete");
}

if let Some(latest_breaking_migration) = latest_breaking_migration {
if highest_migration < latest_breaking_migration as u32 {
return Err(MigrateError::IncompatbleRevision {
latest_migration: latest_migration.unwrap_or(0),
latest_breaking_migration,
highest_migration,
});
}
}

if let Some(latest_migration) = latest_migration {
if highest_migration < latest_migration as u32 {
warn!(
latest_migration,
highest_migration,
"Database is using Graphile Worker schema revision {}, but the currently running worker only supports up to revision {} which may or may not be compatible. Please ensure all versions of Graphile Worker you're running are compatible, or use Worker Pro which will perform this check for you. Attempting to continue regardless.",
latest_migration,
highest_migration,
);
}
}

Ok(())
}
18 changes: 18 additions & 0 deletions crates/migrations/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod m000016;
pub mod m000017;
pub mod m000018;

#[derive(Default)]
pub struct ArchimedesMigration {
name: &'static str,
is_breaking: bool,
Expand All @@ -34,6 +35,23 @@ impl ArchimedesMigration {
self.is_breaking
}

/// Parses the migration name and returns the migration number.
/// ```rust
/// let migration = ArchimedesMigration {

Check failure on line 40 in crates/migrations/src/sql/mod.rs

View workflow job for this annotation

GitHub Actions / test

cannot find struct, variant or union type `ArchimedesMigration` in this scope
/// name: "m000001_initial",
/// ..Default::default(),

Check failure on line 42 in crates/migrations/src/sql/mod.rs

View workflow job for this annotation

GitHub Actions / test

cannot use a comma after the base struct
/// };
/// assert_eq!(migration.migration_number(), 1);
/// let migration = ArchimedesMigration {

Check failure on line 45 in crates/migrations/src/sql/mod.rs

View workflow job for this annotation

GitHub Actions / test

cannot find struct, variant or union type `ArchimedesMigration` in this scope
/// name: "m000002",
/// ..Default::default(),

Check failure on line 47 in crates/migrations/src/sql/mod.rs

View workflow job for this annotation

GitHub Actions / test

cannot use a comma after the base struct
/// };
/// assert_eq!(migration.migration_number(), 2);
/// ```
pub fn migration_number(&self) -> u32 {
self.name[1..7].parse().expect("Invalid migration name")
}

pub async fn execute<'e>(
&self,
tx: &mut Transaction<'e, Postgres>,
Expand Down

0 comments on commit 1d19202

Please sign in to comment.