Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable create database #243

Merged
merged 5 commits into from
Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cargo-sqlx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ sqlx = { version = "0.3", path = "..", default-features = false, features = [ "r
futures = "0.3"
structopt = "0.3"
chrono = "0.4"
anyhow = "1.0"
url = { version = "2.1.1", default-features = false }
231 changes: 166 additions & 65 deletions cargo-sqlx/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,105 @@ use std::io::prelude::*;

use dotenv::dotenv;

use sqlx::postgres::PgRow;
use sqlx::Connect;
use sqlx::Executor;
use sqlx::PgConnection;
use sqlx::PgPool;
use sqlx::Row;

use structopt::StructOpt;

use anyhow::{anyhow, Context, Result};

const MIGRATION_FOLDER: &'static str = "migrations";

/// Sqlx commandline tool
#[derive(StructOpt, Debug)]
#[structopt(name = "Sqlx")]
enum Opt {
// #[structopt(subcommand)]
Migrate(MigrationCommand),

#[structopt(alias = "db")]
Database(DatabaseCommand),
}

/// Simple postgres migrator
/// Adds and runs migrations
#[derive(StructOpt, Debug)]
#[structopt(name = "Sqlx migrator")]
enum MigrationCommand {
/// Initalizes new migration directory with db create script
// Init {
// // #[structopt(long)]
// database_name: String,
// },

/// Add new migration with name <timestamp>_<migration_name>.sql
Add {
// #[structopt(long)]
name: String,
},
Add { name: String },

/// Run all migrations
Run,
}

/// Create or drops database depending on your connection string. Alias: db
#[derive(StructOpt, Debug)]
#[structopt(name = "Sqlx migrator")]
enum DatabaseCommand {
/// Create database in url
Create,

/// Drop database in url
Drop,
}

#[tokio::main]
async fn main() {
async fn main() -> Result<()> {
let opt = Opt::from_args();

match opt {
Opt::Migrate(command) => match command {
// Opt::Init { database_name } => init_migrations(&database_name),
MigrationCommand::Add { name } => add_migration_file(&name),
MigrationCommand::Run => run_migrations().await,
MigrationCommand::Add { name } => add_migration_file(&name)?,
MigrationCommand::Run => run_migrations().await?,
},
}
Opt::Database(command) => match command {
DatabaseCommand::Create => run_create_database().await?,
DatabaseCommand::Drop => run_drop_database().await?,
},
};

println!("All done!");
Ok(())
}

// fn init_migrations(db_name: &str) {
// println!("Initing the migrations so hard! db: {:#?}", db_name);
// }
async fn run_create_database() -> Result<()> {
dotenv().ok();
let db_url = env::var("DATABASE_URL").context("Failed to find 'DATABASE_URL'")?;
let db_url = get_base_url(&db_url)?;

let db_exists = check_if_db_exists(&db_url).await?;
if db_exists {
println!("Database already exists, aborting");
Ok(())
} else {
println!("Creating database: {}", db_url.db_name);
Ok(create_database(&db_url).await?)
}
}

fn add_migration_file(name: &str) {
async fn run_drop_database() -> Result<()> {
dotenv().ok();
let db_url = env::var("DATABASE_URL").context("Failed to find 'DATABASE_URL'")?;
let db_url = get_base_url(&db_url)?;

let db_exists = check_if_db_exists(&db_url).await?;
if db_exists {
println!("Dropping database: {}", db_url.db_name);
Ok(drop_database(&db_url).await?)
} else {
println!("Database does not exists, aborting");
Ok(())
}
}

fn add_migration_file(name: &str) -> Result<()> {
use chrono::prelude::*;
use std::path::Path;
use std::path::PathBuf;

if !Path::new(MIGRATION_FOLDER).exists() {
fs::create_dir(MIGRATION_FOLDER).expect("Failed to create 'migrations' dir")
}
fs::create_dir_all(MIGRATION_FOLDER)?;

let dt = Utc::now();
let mut file_name = dt.format("%Y-%m-%d_%H-%M-%S").to_string();
Expand All @@ -78,25 +115,21 @@ fn add_migration_file(name: &str) {
path.push(MIGRATION_FOLDER);
path.push(&file_name);

if path.exists() {
eprintln!("Migration already exists!");
return;
}

let mut file = File::create(path).expect("Failed to create file");
let mut file = File::create(path).context("Failed to create file")?;
file.write_all(b"-- Add migration script here")
.expect("Could not write to file");
.context("Could not write to file")?;

println!("Created migration: '{}'", file_name);
Ok(())
}

pub struct Migration {
pub name: String,
pub sql: String,
}

fn load_migrations() -> Vec<Migration> {
let entries = fs::read_dir(&MIGRATION_FOLDER).expect("Could not find 'migrations' dir");
fn load_migrations() -> Result<Vec<Migration>> {
let entries = fs::read_dir(&MIGRATION_FOLDER).context("Could not find 'migrations' dir")?;

let mut migrations = Vec::new();

Expand All @@ -116,11 +149,11 @@ fn load_migrations() -> Vec<Migration> {
continue;
}

let mut file =
File::open(e.path()).expect(&format!("Failed to open: '{:?}'", e.file_name()));
let mut file = File::open(e.path())
.with_context(|| format!("Failed to open: '{:?}'", e.file_name()))?;
let mut contents = String::new();
file.read_to_string(&mut contents)
.expect(&format!("Failed to read: '{:?}'", e.file_name()));
.with_context(|| format!("Failed to read: '{:?}'", e.file_name()))?;

migrations.push(Migration {
name: e.file_name().to_str().unwrap().to_string(),
Expand All @@ -132,71 +165,139 @@ fn load_migrations() -> Vec<Migration> {

migrations.sort_by(|a, b| a.name.partial_cmp(&b.name).unwrap());

migrations
Ok(migrations)
}

async fn run_migrations() {
async fn run_migrations() -> Result<()> {
dotenv().ok();
let db_url = env::var("DATABASE_URL").expect("Failed to find 'DATABASE_URL'");
let db_url = env::var("DATABASE_URL").context("Failed to find 'DATABASE_URL'")?;

let mut pool = PgPool::new(&db_url)
.await
.expect("Failed to connect to pool");
.context("Failed to connect to pool")?;

create_migration_table(&mut pool).await;
create_migration_table(&mut pool).await?;

let migrations = load_migrations();
let migrations = load_migrations()?;

for mig in migrations.iter() {
let mut tx = pool.begin().await.unwrap();
let mut tx = pool.begin().await?;

if check_if_applied(&mut tx, &mig.name).await {
if check_if_applied(&mut tx, &mig.name).await? {
println!("Already applied migration: '{}'", mig.name);
continue;
}
println!("Applying migration: '{}'", mig.name);

sqlx::query(&mig.sql)
.execute(&mut tx)
tx.execute(&*mig.sql)
.await
.expect(&format!("Failed to run migration {:?}", &mig.name));
.with_context(|| format!("Failed to run migration {:?}", &mig.name))?;

save_applied_migration(&mut tx, &mig.name).await;
save_applied_migration(&mut tx, &mig.name).await?;

tx.commit().await.unwrap();
tx.commit().await.context("Failed")?;
}

Ok(())
}

async fn create_migration_table(mut pool: &PgPool) {
sqlx::query(
struct DbUrl<'a> {
base_url: &'a str,
db_name: &'a str,
}

fn get_base_url<'a>(db_url: &'a str) -> Result<DbUrl> {
let split: Vec<&str> = db_url.rsplitn(2, '/').collect();

if split.len() != 2 {
return Err(anyhow!("Failed to find database name in connection string"));
}

let db_name = split[0];
let base_url = split[1];

Ok(DbUrl { base_url, db_name })
}

async fn check_if_db_exists(db_url: &DbUrl<'_>) -> Result<bool> {
let db_name = db_url.db_name;
let base_url = db_url.base_url;

let mut conn = PgConnection::connect(base_url).await?;

let result: bool =
sqlx::query("select exists(SELECT 1 from pg_database WHERE datname = $1) as exists")
.bind(db_name)
.try_map(|row: PgRow| row.try_get("exists"))
.fetch_one(&mut conn)
.await
.context("Failed to check if database exists")?;

Ok(result)
}

async fn create_database(db_url: &DbUrl<'_>) -> Result<()> {
let db_name = db_url.db_name;
let base_url = db_url.base_url;

let mut conn = PgConnection::connect(base_url).await?;

sqlx::query(&format!("CREATE DATABASE {}", db_name))
.execute(&mut conn)
.await
.with_context(|| format!("Failed to create database: {}", db_name))?;

Ok(())
}

async fn drop_database(db_url: &DbUrl<'_>) -> Result<()> {
let db_name = db_url.db_name;
let base_url = db_url.base_url;

let mut conn = PgConnection::connect(base_url).await?;

sqlx::query(&format!("DROP DATABASE {}", db_name))
.execute(&mut conn)
.await
.with_context(|| format!("Failed to create database: {}", db_name))?;

Ok(())
}

async fn create_migration_table(mut pool: &PgPool) -> Result<()> {
pool.execute(
r#"
CREATE TABLE IF NOT EXISTS __migrations (
migration VARCHAR (255) PRIMARY KEY,
created TIMESTAMP NOT NULL DEFAULT current_timestamp
);
"#,
)
.execute(&mut pool)
.await
.expect("Failed to create migration table");
.context("Failed to create migration table")?;

Ok(())
}

async fn check_if_applied(pool: &mut PgConnection, migration: &str) -> bool {
use sqlx::postgres::PgRow;
use sqlx::Row;
async fn check_if_applied(connection: &mut PgConnection, migration: &str) -> Result<bool> {
let result = sqlx::query(
"select exists(select migration from __migrations where migration = $1) as exists",
)
.bind(migration.to_string())
.try_map(|row: PgRow| row.try_get("exists"))
.fetch_one(connection)
.await
.context("Failed to check migration table")?;

sqlx::query("select exists(select migration from __migrations where migration = $1) as exists")
.bind(migration.to_string())
.try_map(|row: PgRow| row.try_get("exists"))
.fetch_one(pool)
.await
.expect("Failed to check migration table")
Ok(result)
}

async fn save_applied_migration(pool: &mut PgConnection, migration: &str) {
async fn save_applied_migration(pool: &mut PgConnection, migration: &str) -> Result<()> {
sqlx::query("insert into __migrations (migration) values ($1)")
.bind(migration.to_string())
.execute(pool)
.await
.expect("Failed to insert migration ");
.context("Failed to insert migration")?;

Ok(())
}