From b801ba39564c21d18f4301cc89d0f2457d64785e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Fri, 20 Mar 2020 23:55:16 +0000 Subject: [PATCH] add Target enum and allow Runner to a Target migration closes #60 --- refinery/src/lib.rs | 4 +- refinery/tests/mod_migrations/mod.rs | 4 +- refinery/tests/mysql.rs | 70 +++++++++++++++++++++---- refinery/tests/mysql_async.rs | 75 ++++++++++++++++++++++---- refinery/tests/postgres.rs | 67 ++++++++++++++++++++---- refinery/tests/rusqlite.rs | 62 ++++++++++++++++++---- refinery/tests/tokio_postgres.rs | 78 ++++++++++++++++++++++++---- refinery_core/src/lib.rs | 2 +- refinery_core/src/runner.rs | 18 +++++++ refinery_core/src/traits/async.rs | 27 ++++++++-- refinery_core/src/traits/sync.rs | 33 ++++++++++-- refinery_macros/src/lib.rs | 1 + 12 files changed, 379 insertions(+), 62 deletions(-) diff --git a/refinery/src/lib.rs b/refinery/src/lib.rs index 73cc33f8..258c6b6e 100644 --- a/refinery/src/lib.rs +++ b/refinery/src/lib.rs @@ -33,5 +33,7 @@ for more examples refer to the [examples](https://github.com/rust-db/refinery/tr */ pub use refinery_core::config; -pub use refinery_core::{AppliedMigration, AsyncMigrate, Error, Migrate, Migration, Runner}; +#[doc(hidden)] +pub use refinery_core::{AppliedMigration, AsyncMigrate, Migrate}; +pub use refinery_core::{Error, Migration, Runner, Target}; pub use refinery_macros::{embed_migrations, include_migration_mods}; diff --git a/refinery/tests/mod_migrations/mod.rs b/refinery/tests/mod_migrations/mod.rs index c93a2096..93dc110c 100644 --- a/refinery/tests/mod_migrations/mod.rs +++ b/refinery/tests/mod_migrations/mod.rs @@ -1,3 +1 @@ -pub mod migrations { - refinery::include_migration_mods!("./tests/mod_migrations"); -} +refinery::include_migration_mods!("./tests/mod_migrations"); diff --git a/refinery/tests/mysql.rs b/refinery/tests/mysql.rs index b556f8d9..019fbf7e 100644 --- a/refinery/tests/mysql.rs +++ b/refinery/tests/mysql.rs @@ -9,7 +9,7 @@ mod mysql { use predicates::str::contains; use refinery::{ config::{migrate_from_config, Config, ConfigDbType}, - Error, Migrate, Migration, + Error, Migrate, Migration, Target, }; use refinery_core::mysql; use std::process::Command; @@ -288,7 +288,7 @@ mod mysql { let pool = mysql::Pool::new("mysql://refinery:root@localhost:3306/refinery_test").unwrap(); let mut conn = pool.get_conn().unwrap(); - mod_migrations::migrations::runner().run(&mut conn).unwrap(); + mod_migrations::runner().run(&mut conn).unwrap(); for row in conn .query( "SELECT table_name FROM information_schema.tables WHERE table_name='refinery_schema_history'" @@ -308,7 +308,7 @@ mod mysql { mysql::Pool::new("mysql://refinery:root@localhost:3306/refinery_test").unwrap(); let mut conn = pool.get_conn().unwrap(); - mod_migrations::migrations::runner().run(&mut conn).unwrap(); + mod_migrations::runner().run(&mut conn).unwrap(); conn.prep_exec( "INSERT INTO persons (name, city) VALUES (:a, :b)", (&"John Legend", &"New York"), @@ -331,7 +331,7 @@ mod mysql { mysql::Pool::new("mysql://refinery:root@localhost:3306/refinery_test").unwrap(); let mut conn = pool.get_conn().unwrap(); - mod_migrations::migrations::runner().run(&mut conn).unwrap(); + mod_migrations::runner().run(&mut conn).unwrap(); for _row in conn .query("SELECT MAX(version) FROM refinery_schema_history") @@ -365,7 +365,8 @@ mod mysql { let migrations = get_migrations(); let mchecksum = migrations[4].checksum(); - conn.migrate(&migrations, true, true, false).unwrap(); + conn.migrate(&migrations, true, true, false, Target::Latest) + .unwrap(); for _row in conn .query("SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)") @@ -380,6 +381,53 @@ mod mysql { }); } + #[test] + fn migrates_to_target_migration() { + run_test(|| { + let pool = + mysql::Pool::new("mysql://refinery:root@localhost:3306/refinery_test").unwrap(); + let mut conn = pool.get_conn().unwrap(); + + embedded::migrations::runner() + .set_target(Target::Version(3)) + .run(&mut conn) + .unwrap(); + + for _row in conn + .query("SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)") + .unwrap() + { + let row = _row.unwrap(); + let current: i32 = row.get(0).unwrap(); + assert_eq!(3, current); + } + }); + } + + #[test] + fn migrates_to_target_migration_grouped() { + run_test(|| { + let pool = + mysql::Pool::new("mysql://refinery:root@localhost:3306/refinery_test").unwrap(); + let mut conn = pool.get_conn().unwrap(); + + embedded::migrations::runner() + .set_target(Target::Version(3)) + .set_grouped(true) + .run(&mut conn) + .unwrap(); + + for _row in conn + .query("SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)") + .unwrap() + { + let row = _row.unwrap(); + let current: i32 = row.get(0).unwrap(); + assert_eq!(3, current); + } + }); + } + #[test] fn aborts_on_missing_migration_on_filesystem() { run_test(|| { @@ -387,14 +435,16 @@ mod mysql { mysql::Pool::new("mysql://refinery:root@localhost:3306/refinery_test").unwrap(); let mut conn = pool.get_conn().unwrap(); - mod_migrations::migrations::runner().run(&mut conn).unwrap(); + mod_migrations::runner().run(&mut conn).unwrap(); let migration = Migration::from_filename( "V4__add_year_field_to_cars", &"ALTER TABLE cars ADD year INTEGER;", ) .unwrap(); - let err = conn.migrate(&[migration], true, true, false).unwrap_err(); + let err = conn + .migrate(&[migration], true, true, false, Target::Latest) + .unwrap_err(); match err { Error::MissingVersion(missing) => { @@ -413,7 +463,7 @@ mod mysql { mysql::Pool::new("mysql://refinery:root@localhost:3306/refinery_test").unwrap(); let mut conn = pool.get_conn().unwrap(); - mod_migrations::migrations::runner().run(&mut conn).unwrap(); + mod_migrations::runner().run(&mut conn).unwrap(); let migration = Migration::from_filename( "V2__add_year_field_to_cars", @@ -421,7 +471,7 @@ mod mysql { ) .unwrap(); let err = conn - .migrate(&[migration.clone()], true, false, false) + .migrate(&[migration.clone()], true, false, false, Target::Latest) .unwrap_err(); match err { @@ -462,7 +512,7 @@ mod mysql { ) .unwrap(); let err = conn - .migrate(&[migration1, migration2], true, true, false) + .migrate(&[migration1, migration2], true, true, false, Target::Latest) .unwrap_err(); match err { Error::MissingVersion(missing) => { diff --git a/refinery/tests/mysql_async.rs b/refinery/tests/mysql_async.rs index 0d705a5a..8f17a2fc 100644 --- a/refinery/tests/mysql_async.rs +++ b/refinery/tests/mysql_async.rs @@ -8,7 +8,7 @@ mod mysql_async { use futures::FutureExt; use refinery::{ config::{migrate_from_config_async, Config, ConfigDbType}, - AsyncMigrate, Error, Migration, + AsyncMigrate, Error, Migration, Target, }; use refinery_core::mysql_async::prelude::Queryable; use refinery_core::{mysql_async, tokio}; @@ -333,7 +333,7 @@ mod mysql_async { let mut pool = mysql_async::Pool::new("mysql://refinery:root@localhost:3306/refinery_test"); let conn = pool.get_conn().await.unwrap(); - mod_migrations::migrations::runner() + mod_migrations::runner() .run_async(&mut pool) .await .unwrap(); @@ -360,7 +360,7 @@ mod mysql_async { mysql_async::Pool::new("mysql://refinery:root@localhost:3306/refinery_test"); let mut conn = pool.get_conn().await.unwrap(); - mod_migrations::migrations::runner() + mod_migrations::runner() .run_async(&mut pool) .await .unwrap(); @@ -399,7 +399,7 @@ mod mysql_async { let mut pool = mysql_async::Pool::new("mysql://refinery:root@localhost:3306/refinery_test"); let conn = pool.get_conn().await.unwrap(); - mod_migrations::migrations::runner() + mod_migrations::runner() .run_async(&mut pool) .await .unwrap(); @@ -449,7 +449,7 @@ mod mysql_async { let migrations = get_migrations(); let mchecksum = migrations[4].checksum(); - pool.migrate(&migrations, true, true, false).await.unwrap(); + pool.migrate(&migrations, true, true, false, Target::Latest).await.unwrap(); conn .query("SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)") @@ -468,13 +468,66 @@ mod mysql_async { }).await; } + #[tokio::test] + async fn migrates_to_target_migration() { + run_test(async { + let mut pool = mysql_async::Pool::new("mysql://refinery:root@localhost:3306/refinery_test"); + let conn = pool.get_conn().await.unwrap(); + + embedded::migrations::runner() + .set_grouped(true) + .set_target(Target::Version(3)) + .run_async(&mut pool) + .await + .unwrap(); + + conn + .query("SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)") + .await + .unwrap() + .for_each(|row| { + let current: i32 = row.get(0).unwrap(); + assert_eq!(3, current); + }) + .await + .unwrap(); + + }).await; + } + + #[tokio::test] + async fn migrates_to_target_migration_grouped() { + run_test(async { + let mut pool = mysql_async::Pool::new("mysql://refinery:root@localhost:3306/refinery_test"); + let conn = pool.get_conn().await.unwrap(); + + embedded::migrations::runner() + .set_target(Target::Version(3)) + .run_async(&mut pool) + .await + .unwrap(); + + conn + .query("SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)") + .await + .unwrap() + .for_each(|row| { + let current: i32 = row.get(0).unwrap(); + assert_eq!(3, current); + }) + .await + .unwrap(); + + }).await; + } + #[tokio::test] async fn aborts_on_missing_migration_on_filesystem() { run_test(async { let mut pool = mysql_async::Pool::new("mysql://refinery:root@localhost:3306/refinery_test"); - mod_migrations::migrations::runner() + mod_migrations::runner() .run_async(&mut pool) .await .unwrap(); @@ -486,7 +539,7 @@ mod mysql_async { .unwrap(); let err = pool - .migrate(&[migration.clone()], true, true, false) + .migrate(&[migration.clone()], true, true, false, Target::Latest) .await .unwrap_err(); @@ -507,12 +560,12 @@ mod mysql_async { let mut pool = mysql_async::Pool::new("mysql://refinery:root@localhost:3306/refinery_test"); - mod_migrations::migrations::runner() + mod_migrations::runner() .run_async(&mut pool) .await .unwrap(); - mod_migrations::migrations::runner() + mod_migrations::runner() .run_async(&mut pool) .await .unwrap(); @@ -524,7 +577,7 @@ mod mysql_async { .unwrap(); let err = pool - .migrate(&[migration.clone()], true, false, false) + .migrate(&[migration.clone()], true, false, false, Target::Latest) .await .unwrap_err(); @@ -569,7 +622,7 @@ mod mysql_async { ) .unwrap(); let err = pool - .migrate(&[migration1, migration2], true, true, false) + .migrate(&[migration1, migration2], true, true, false, Target::Latest) .await .unwrap_err(); diff --git a/refinery/tests/postgres.rs b/refinery/tests/postgres.rs index 32f1d88e..278016db 100644 --- a/refinery/tests/postgres.rs +++ b/refinery/tests/postgres.rs @@ -9,7 +9,7 @@ mod postgres { use predicates::str::contains; use refinery::{ config::{migrate_from_config, Config, ConfigDbType}, - Error, Migrate, Migration, + Error, Migrate, Migration, Target, }; use refinery_core::postgres::{Client, NoTls}; use std::process::Command; @@ -276,7 +276,7 @@ mod postgres { run_test(|| { let mut client = Client::connect("postgres://postgres@localhost:5432/postgres", NoTls).unwrap(); - mod_migrations::migrations::runner() + mod_migrations::runner() .run(&mut client) .unwrap(); for row in &client @@ -297,7 +297,7 @@ mod postgres { let mut client = Client::connect("postgres://postgres@localhost:5432/postgres", NoTls).unwrap(); - mod_migrations::migrations::runner() + mod_migrations::runner() .run(&mut client) .unwrap(); client @@ -321,7 +321,7 @@ mod postgres { let mut client = Client::connect("postgres://postgres@localhost:5432/postgres", NoTls).unwrap(); - mod_migrations::migrations::runner() + mod_migrations::runner() .run(&mut client) .unwrap(); for row in &client @@ -354,7 +354,9 @@ mod postgres { let migrations = get_migrations(); let mchecksum = migrations[4].checksum(); - client.migrate(&migrations, true, true, false).unwrap(); + client + .migrate(&migrations, true, true, false, Target::Latest) + .unwrap(); for row in &client .query("SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)", &[]) @@ -368,13 +370,56 @@ mod postgres { }); } + #[test] + fn migrates_to_target_migration() { + run_test(|| { + let mut client = + Client::connect("postgres://postgres@localhost:5432/postgres", NoTls).unwrap(); + + embedded::migrations::runner() + .set_target(Target::Version(3)) + .run(&mut client) + .unwrap(); + + for row in &client + .query("SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)", &[]) + .unwrap() + { + let current: i32 = row.get(0); + assert_eq!(3, current); + } + }); + } + + #[test] + fn migrates_to_target_migration_grouped() { + run_test(|| { + let mut client = + Client::connect("postgres://postgres@localhost:5432/postgres", NoTls).unwrap(); + + embedded::migrations::runner() + .set_target(Target::Version(3)) + .set_grouped(true) + .run(&mut client) + .unwrap(); + + for row in &client + .query("SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)", &[]) + .unwrap() + { + let current: i32 = row.get(0); + assert_eq!(3, current); + } + }); + } + #[test] fn aborts_on_missing_migration_on_filesystem() { run_test(|| { let mut client = Client::connect("postgres://postgres@localhost:5432/postgres", NoTls).unwrap(); - mod_migrations::migrations::runner() + mod_migrations::runner() .run(&mut client) .unwrap(); @@ -383,7 +428,9 @@ mod postgres { &"ALTER TABLE cars ADD year INTEGER;", ) .unwrap(); - let err = client.migrate(&[migration], true, true, false).unwrap_err(); + let err = client + .migrate(&[migration], true, true, false, Target::Latest) + .unwrap_err(); match err { Error::MissingVersion(missing) => { @@ -401,7 +448,7 @@ mod postgres { let mut client = Client::connect("postgres://postgres@localhost:5432/postgres", NoTls).unwrap(); - mod_migrations::migrations::runner() + mod_migrations::runner() .run(&mut client) .unwrap(); @@ -411,7 +458,7 @@ mod postgres { ) .unwrap(); let err = client - .migrate(&[migration.clone()], true, false, false) + .migrate(&[migration.clone()], true, false, false, Target::Latest) .unwrap_err(); match err { @@ -451,7 +498,7 @@ mod postgres { ) .unwrap(); let err = client - .migrate(&[migration1, migration2], true, true, false) + .migrate(&[migration1, migration2], true, true, false, Target::Latest) .unwrap_err(); match err { Error::MissingVersion(missing) => { diff --git a/refinery/tests/rusqlite.rs b/refinery/tests/rusqlite.rs index 2a60399a..ae576a24 100644 --- a/refinery/tests/rusqlite.rs +++ b/refinery/tests/rusqlite.rs @@ -9,7 +9,7 @@ mod rusqlite { use predicates::str::contains; use refinery::{ config::{migrate_from_config, Config, ConfigDbType}, - Error, Migrate, Migration, + Error, Migrate, Migration, Target, }; use refinery_core::rusqlite::{Connection, OptionalExtension, NO_PARAMS}; use std::fs::{self, File}; @@ -251,7 +251,7 @@ mod rusqlite { #[test] fn mod_creates_migration_table() { let mut conn = Connection::open_in_memory().unwrap(); - mod_migrations::migrations::runner().run(&mut conn).unwrap(); + mod_migrations::runner().run(&mut conn).unwrap(); let table_name: String = conn .query_row( "SELECT name FROM sqlite_master WHERE type='table' AND name='refinery_schema_history'", @@ -266,7 +266,7 @@ mod rusqlite { fn mod_applies_migration() { let mut conn = Connection::open_in_memory().unwrap(); - mod_migrations::migrations::runner().run(&mut conn).unwrap(); + mod_migrations::runner().run(&mut conn).unwrap(); conn.execute( "INSERT INTO persons (name, city) VALUES (?, ?)", @@ -286,7 +286,7 @@ mod rusqlite { fn mod_updates_schema_history() { let mut conn = Connection::open_in_memory().unwrap(); - mod_migrations::migrations::runner().run(&mut conn).unwrap(); + mod_migrations::runner().run(&mut conn).unwrap(); let current: u32 = conn .query_row( @@ -319,7 +319,8 @@ mod rusqlite { let migrations = get_migrations(); let mchecksum = migrations[4].checksum(); - conn.migrate(&migrations, true, true, false).unwrap(); + conn.migrate(&migrations, true, true, false, Target::Latest) + .unwrap(); let (current, checksum): (u32, String) = conn .query_row( @@ -332,18 +333,59 @@ mod rusqlite { assert_eq!(mchecksum.to_string(), checksum); } + #[test] + fn migrates_to_target_migration() { + let mut conn = Connection::open_in_memory().unwrap(); + + embedded::migrations::runner() + .set_target(Target::Version(3)) + .run(&mut conn) + .unwrap(); + + let (current, checksum): (u32, String) = conn + .query_row( + "SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)", + NO_PARAMS, + |row| Ok((row.get(0).unwrap(), row.get(1).unwrap())), + ) + .unwrap(); + assert_eq!(3, current); + } + + #[test] + fn migrates_to_target_migration_grouped() { + let mut conn = Connection::open_in_memory().unwrap(); + + embedded::migrations::runner() + .set_target(Target::Version(3)) + .set_grouped(true) + .run(&mut conn) + .unwrap(); + + let (current, checksum): (u32, String) = conn + .query_row( + "SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)", + NO_PARAMS, + |row| Ok((row.get(0).unwrap(), row.get(1).unwrap())), + ) + .unwrap(); + assert_eq!(3, current); + } + #[test] fn aborts_on_missing_migration_on_filesystem() { let mut conn = Connection::open_in_memory().unwrap(); - mod_migrations::migrations::runner().run(&mut conn).unwrap(); + mod_migrations::runner().run(&mut conn).unwrap(); let migration = Migration::from_filename( "V4__add_year_field_to_cars", &"ALTER TABLE cars ADD year INTEGER;", ) .unwrap(); - let err = conn.migrate(&[migration], true, true, false).unwrap_err(); + let err = conn + .migrate(&[migration], true, true, false, Target::Latest) + .unwrap_err(); match err { Error::MissingVersion(missing) => { @@ -358,7 +400,7 @@ mod rusqlite { fn aborts_on_divergent_migration() { let mut conn = Connection::open_in_memory().unwrap(); - mod_migrations::migrations::runner().run(&mut conn).unwrap(); + mod_migrations::runner().run(&mut conn).unwrap(); let migration = Migration::from_filename( "V2__add_year_field_to_cars", @@ -366,7 +408,7 @@ mod rusqlite { ) .unwrap(); let err = conn - .migrate(&[migration.clone()], true, false, false) + .migrate(&[migration.clone()], true, false, false, Target::Latest) .unwrap_err(); match err { @@ -403,7 +445,7 @@ mod rusqlite { ) .unwrap(); let err = conn - .migrate(&[migration1, migration2], true, true, false) + .migrate(&[migration1, migration2], true, true, false, Target::Latest) .unwrap_err(); match err { Error::MissingVersion(missing) => { diff --git a/refinery/tests/tokio_postgres.rs b/refinery/tests/tokio_postgres.rs index ef587794..624b3d6b 100644 --- a/refinery/tests/tokio_postgres.rs +++ b/refinery/tests/tokio_postgres.rs @@ -8,7 +8,7 @@ mod tokio_postgres { use futures::FutureExt; use refinery::{ config::{migrate_from_config_async, Config, ConfigDbType}, - AsyncMigrate, Error, Migration, + AsyncMigrate, Error, Migration, Target, }; use refinery_core::tokio_postgres::NoTls; use refinery_core::{tokio, tokio_postgres}; @@ -381,7 +381,7 @@ mod tokio_postgres { connection.await.unwrap(); }); - mod_migrations::migrations::runner() + mod_migrations::runner() .run_async(&mut client) .await .unwrap(); @@ -410,7 +410,7 @@ mod tokio_postgres { connection.await.unwrap(); }); - mod_migrations::migrations::runner() + mod_migrations::runner() .run_async(&mut client) .await .unwrap(); @@ -447,7 +447,7 @@ mod tokio_postgres { connection.await.unwrap(); }); - mod_migrations::migrations::runner() + mod_migrations::runner() .run_async(&mut client) .await .unwrap(); @@ -499,6 +499,7 @@ mod tokio_postgres { true, true, false, + Target::Latest ) .await .unwrap(); @@ -516,6 +517,65 @@ mod tokio_postgres { }).await; } + #[tokio::test] + async fn migrates_to_target_migration() { + run_test(async { + let (mut client, connection) = + tokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + embedded::migrations::runner() + .set_target(Target::Version(3)) + .run_async(&mut client) + .await + .unwrap(); + + for row in client + .query("SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)", &[]) + .await + .unwrap() + { + let current: i32 = row.get(0); + assert_eq!(3, current); + } + }).await; + } + + #[tokio::test] + async fn migrates_to_target_migration_grouped() { + run_test(async { + let (mut client, connection) = + tokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + embedded::migrations::runner() + .set_target(Target::Version(3)) + .set_grouped(true) + .run_async(&mut client) + .await + .unwrap(); + + for row in client + .query("SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)", &[]) + .await + .unwrap() + { + let current: i32 = row.get(0); + assert_eq!(3, current); + } + }).await; + } + #[tokio::test] async fn aborts_on_missing_migration_on_filesystem() { run_test(async { @@ -528,7 +588,7 @@ mod tokio_postgres { connection.await.unwrap(); }); - mod_migrations::migrations::runner() + mod_migrations::runner() .run_async(&mut client) .await .unwrap(); @@ -539,7 +599,7 @@ mod tokio_postgres { ) .unwrap(); let err = client - .migrate(&[migration.clone()], true, true, false) + .migrate(&[migration.clone()], true, true, false, Target::Latest) .await .unwrap_err(); @@ -566,7 +626,7 @@ mod tokio_postgres { connection.await.unwrap(); }); - mod_migrations::migrations::runner() + mod_migrations::runner() .run_async(&mut client) .await .unwrap(); @@ -578,7 +638,7 @@ mod tokio_postgres { .unwrap(); let err = client - .migrate(&[migration.clone()], true, false, false) + .migrate(&[migration.clone()], true, false, false, Target::Latest) .await .unwrap_err(); @@ -629,7 +689,7 @@ mod tokio_postgres { ) .unwrap(); let err = client - .migrate(&[migration1, migration2], true, true, false) + .migrate(&[migration1, migration2], true, true, false, Target::Latest) .await .unwrap_err(); diff --git a/refinery_core/src/lib.rs b/refinery_core/src/lib.rs index 77090606..bcc474f7 100644 --- a/refinery_core/src/lib.rs +++ b/refinery_core/src/lib.rs @@ -6,7 +6,7 @@ mod traits; mod util; pub use crate::error::Error; -pub use crate::runner::{AppliedMigration, Migration, Runner}; +pub use crate::runner::{AppliedMigration, Migration, Runner, Target}; pub use crate::traits::r#async::AsyncMigrate; pub use crate::traits::sync::Migrate; pub use crate::util::{find_migration_files, MigrationType}; diff --git a/refinery_core/src/runner.rs b/refinery_core/src/runner.rs index d7e2c359..1f91a91e 100644 --- a/refinery_core/src/runner.rs +++ b/refinery_core/src/runner.rs @@ -23,6 +23,13 @@ pub enum MigrationPrefix { Versioned, } +/// An enum set that represents the target version up to which refinery should migrate, it is used by [Runner] +#[derive(Clone, Copy)] +pub enum Target { + Latest, + Version(i32), +} + /// Represents a schema migration to be run on the database, /// this struct is used by the [`embed_migrations!`] and [`include_migration_mods!`] macros to gather migration files /// and shouldn't be needed by the user @@ -146,18 +153,27 @@ pub struct Runner { abort_divergent: bool, abort_missing: bool, migrations: Vec, + target: Target, } impl Runner { pub fn new(migrations: &[Migration]) -> Runner { Runner { grouped: false, + target: Target::Latest, abort_divergent: true, abort_missing: true, migrations: migrations.to_vec(), } } + /// set the target version up to which refinery should migrate, Latest migrates to the latest version available + /// Version migrates to a user provided version, a Version with a higher version than the latest will be ignored. + /// by default this is set to Latest + pub fn set_target(self, target: Target) -> Runner { + Runner { target, ..self } + } + /// Set true if all migrations should be grouped and run in a single transaction. /// by default this is set to false, each migration runs on their own transaction /// @@ -201,6 +217,7 @@ impl Runner { self.abort_divergent, self.abort_missing, self.grouped, + self.target, ) } @@ -215,6 +232,7 @@ impl Runner { self.abort_divergent, self.abort_missing, self.grouped, + self.target, ) .await } diff --git a/refinery_core/src/traits/async.rs b/refinery_core/src/traits/async.rs index f696a486..c23ade2a 100644 --- a/refinery_core/src/traits/async.rs +++ b/refinery_core/src/traits/async.rs @@ -1,6 +1,6 @@ use crate::error::WrapMigrationError; use crate::traits::{check_missing_divergent, ASSERT_MIGRATIONS_TABLE, GET_APPLIED_MIGRATIONS}; -use crate::{AppliedMigration, Error, Migration}; +use crate::{AppliedMigration, Error, Migration, Target}; use async_trait::async_trait; use chrono::Local; @@ -20,8 +20,16 @@ pub trait AsyncQuery: AsyncTransaction { async fn migrate( transaction: &mut T, migrations: Vec, + target: Target, ) -> Result<(), Error> { for migration in migrations.iter() { + if let Target::Version(input_target) = target { + if input_target < migration.version { + log::info!("stoping at migration: {}, due to user option", input_target); + break; + } + } + log::info!("applying migration: {}", migration); let update_query = &format!( "INSERT INTO refinery_schema_history (version, name, applied_on, checksum) VALUES ({}, '{}', '{}', '{}')", @@ -37,10 +45,17 @@ async fn migrate( async fn migrate_grouped( transaction: &mut T, migrations: Vec, + target: Target, ) -> Result<(), Error> { let mut grouped_migrations = Vec::new(); let mut display_migrations = Vec::new(); for migration in migrations.into_iter() { + if let Target::Version(input_target) = target { + if input_target < migration.version { + break; + } + } + let query = format!( "INSERT INTO refinery_schema_history (version, name, applied_on, checksum) VALUES ({}, '{}', '{}', '{}')", migration.version, migration.name, Local::now().to_rfc3339(), migration.checksum().to_string() @@ -49,11 +64,16 @@ async fn migrate_grouped( grouped_migrations.push(migration.sql); grouped_migrations.push(query); } + log::info!( "going to apply batch migrations in single transaction: {:#?}", display_migrations ); + if let Target::Version(input_target) = target { + log::info!("stoping at migration: {}, due to user option", input_target); + } + let refs: Vec<&str> = grouped_migrations.iter().map(AsRef::as_ref).collect(); transaction @@ -75,6 +95,7 @@ where abort_divergent: bool, abort_missing: bool, grouped: bool, + target: Target, ) -> Result<(), Error> { self.execute(&[ASSERT_MIGRATIONS_TABLE]) .await @@ -98,9 +119,9 @@ where } if grouped { - migrate_grouped(self, migrations).await? + migrate_grouped(self, migrations, target).await? } else { - migrate(self, migrations).await? + migrate(self, migrations, target).await? } Ok(()) diff --git a/refinery_core/src/traits/sync.rs b/refinery_core/src/traits/sync.rs index b2d31a2b..7d53cafc 100644 --- a/refinery_core/src/traits/sync.rs +++ b/refinery_core/src/traits/sync.rs @@ -1,6 +1,6 @@ use crate::error::WrapMigrationError; use crate::traits::{check_missing_divergent, ASSERT_MIGRATIONS_TABLE, GET_APPLIED_MIGRATIONS}; -use crate::{AppliedMigration, Error, Migration}; +use crate::{AppliedMigration, Error, Migration, Target}; use chrono::Local; pub trait Transaction { @@ -13,8 +13,19 @@ pub trait Query: Transaction { fn query(&mut self, query: &str) -> Result, Self::Error>; } -fn migrate(transaction: &mut T, migrations: Vec) -> Result<(), Error> { +fn migrate( + transaction: &mut T, + migrations: Vec, + target: Target, +) -> Result<(), Error> { for migration in migrations.iter() { + if let Target::Version(input_target) = target { + if input_target < migration.version { + log::info!("stoping at migration: {}, due to user option", input_target); + break; + } + } + log::info!("applying migration: {}", migration); let update_query = &format!( "INSERT INTO refinery_schema_history (version, name, applied_on, checksum) VALUES ({}, '{}', '{}', '{}')", @@ -29,10 +40,18 @@ fn migrate(transaction: &mut T, migrations: Vec) -> R fn migrate_grouped( transaction: &mut T, migrations: Vec, + target: Target, ) -> Result<(), Error> { let mut grouped_migrations = Vec::new(); let mut display_migrations = Vec::new(); + for migration in migrations.into_iter() { + if let Target::Version(input_target) = target { + if input_target < migration.version { + break; + } + } + let query = format!( "INSERT INTO refinery_schema_history (version, name, applied_on, checksum) VALUES ({}, '{}', '{}', '{}')", migration.version, migration.name, Local::now().to_rfc3339(), migration.checksum().to_string() @@ -41,11 +60,16 @@ fn migrate_grouped( grouped_migrations.push(migration.sql); grouped_migrations.push(query); } + log::info!( "going to apply batch migrations in single transaction: {:#?}", display_migrations ); + if let Target::Version(input_target) = target { + log::info!("stoping at migration: {}, due to user option", input_target); + } + let refs: Vec<&str> = grouped_migrations.iter().map(AsRef::as_ref).collect(); transaction @@ -65,6 +89,7 @@ where abort_divergent: bool, abort_missing: bool, grouped: bool, + target: Target, ) -> Result<(), Error> { self.execute(&[ASSERT_MIGRATIONS_TABLE]) .migration_err("error asserting migrations table")?; @@ -86,9 +111,9 @@ where } if grouped { - migrate_grouped(self, migrations) + migrate_grouped(self, migrations, target) } else { - migrate(self, migrations) + migrate(self, migrations, target) } } } diff --git a/refinery_macros/src/lib.rs b/refinery_macros/src/lib.rs index 5e43bfad..6cbc7736 100644 --- a/refinery_macros/src/lib.rs +++ b/refinery_macros/src/lib.rs @@ -1,5 +1,6 @@ //! Contains Refinery macros that are used to import and embed migration files. #![recursion_limit = "128"] +//TODO remove when previous version is 1.42 extern crate proc_macro; use proc_macro::TokenStream;