Skip to content

Commit

Permalink
impl alter database rename
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed May 11, 2022
1 parent 641dd5e commit 1cdccac
Show file tree
Hide file tree
Showing 41 changed files with 899 additions and 13 deletions.
26 changes: 26 additions & 0 deletions common/ast/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ pub enum Statement<'a> {
if_exists: bool,
database: Identifier<'a>,
},
AlterDatabase {
if_exists: bool,
database: Identifier<'a>,
action: AlterDatabaseAction<'a>,
},
UseDatabase {
database: Identifier<'a>,
},
Expand Down Expand Up @@ -249,6 +254,11 @@ pub struct ColumnDefinition<'a> {
pub default_expr: Option<Box<Expr<'a>>>,
}

#[derive(Debug, Clone, PartialEq)]
pub enum AlterDatabaseAction<'a> {
RenameDatabase { new_db: Identifier<'a> },
}

#[derive(Debug, Clone, PartialEq)]
pub enum AlterTableAction<'a> {
RenameTable { new_table: Identifier<'a> },
Expand Down Expand Up @@ -388,6 +398,22 @@ impl<'a> Display for Statement<'a> {
}
write!(f, " {database}")?;
}
Statement::AlterDatabase {
if_exists,
database,
action,
} => {
write!(f, "ALTER DATABASE")?;
if *if_exists {
write!(f, " IF EXISTS")?;
}
write!(f, " {database}")?;
match action {
AlterDatabaseAction::RenameDatabase { new_db } => {
write!(f, " RENAME TO {new_db}")?;
}
}
}
Statement::UseDatabase { database } => {
write!(f, "USE {database}")?;
}
Expand Down
24 changes: 24 additions & 0 deletions common/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ pub fn statement(i: Input) -> IResult<Statement> {
database,
},
);
let alter_database = map(
rule! {
ALTER ~ DATABASE ~ ( IF ~ EXISTS )? ~ #ident ~ #alter_database_action
},
|(_, _, opt_if_exists, database, action)| Statement::AlterDatabase {
if_exists: opt_if_exists.is_some(),
database,
action,
},
);
let use_database = map(
rule! {
USE ~ #ident
Expand Down Expand Up @@ -403,6 +413,7 @@ pub fn statement(i: Input) -> IResult<Statement> {
| #show_create_database : "`SHOW CREATE DATABASE <database>`"
| #create_database : "`CREATE DATABASE [IF NOT EXIST] <database> [ENGINE = <engine>]`"
| #drop_database : "`DROP DATABASE [IF EXISTS] <database>`"
| #alter_database : "`ALTER DATABASE [IF EXISTS] <action>`"
| #use_database : "`USE <database>`"
| #show_tables : "`SHOW [FULL] TABLES [FROM <database>] [<show_limit>]`"
| #show_create_table : "`SHOW CREATE TABLE [<database>.]<table>`"
Expand Down Expand Up @@ -530,6 +541,19 @@ pub fn create_table_source(i: Input) -> IResult<CreateTableSource> {
)(i)
}

pub fn alter_database_action(i: Input) -> IResult<AlterDatabaseAction> {
let mut rename_database = map(
rule! {
RENAME ~ TO ~ #ident
},
|(_, _, new_db)| AlterDatabaseAction::RenameDatabase { new_db },
);

rule!(
#rename_database
)(i)
}

pub fn alter_table_action(i: Input) -> IResult<AlterTableAction> {
let mut rename_table = map(
rule! {
Expand Down
7 changes: 7 additions & 0 deletions common/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use common_meta_types::ListDatabaseReq;
use common_meta_types::ListTableReq;
use common_meta_types::MetaError;
use common_meta_types::MetaId;
use common_meta_types::RenameDatabaseReply;
use common_meta_types::RenameDatabaseReq;
use common_meta_types::RenameTableReply;
use common_meta_types::RenameTableReq;
use common_meta_types::TableIdent;
Expand Down Expand Up @@ -57,6 +59,11 @@ pub trait SchemaApi: Send + Sync {
req: ListDatabaseReq,
) -> Result<Vec<Arc<DatabaseInfo>>, MetaError>;

async fn rename_database(
&self,
req: RenameDatabaseReq,
) -> Result<RenameDatabaseReply, MetaError>;

// table

async fn create_table(&self, req: CreateTableReq) -> Result<CreateTableReply, MetaError>;
Expand Down
95 changes: 95 additions & 0 deletions common/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use common_meta_types::MatchSeqExt;
use common_meta_types::MetaError;
use common_meta_types::MetaId;
use common_meta_types::Operation;
use common_meta_types::RenameDatabaseReply;
use common_meta_types::RenameDatabaseReq;
use common_meta_types::RenameTableReply;
use common_meta_types::RenameTableReq;
use common_meta_types::TableAlreadyExists;
Expand Down Expand Up @@ -192,6 +194,80 @@ impl<KV: KVApi> SchemaApi for KV {
}
}

async fn rename_database(
&self,
req: RenameDatabaseReq,
) -> Result<RenameDatabaseReply, MetaError> {
let tenant_dbname = &req.name_ident;
let tenant_newdbname = DatabaseNameIdent {
tenant: tenant_dbname.tenant.clone(),
db_name: req.new_db_name.clone(),
};

loop {
// get old db, not exists return err
let res = get_db_or_err(
self,
tenant_dbname,
format!("rename_database: {}", &tenant_dbname),
)
.await;

let (old_db_id_seq, old_db_id, _, _) = match res {
Ok(x) => x,
Err(e) => {
if let MetaError::AppError(AppError::UnknownDatabase(_)) = e {
if req.if_exists {
return Ok(RenameDatabaseReply {});
}
}
return Err(e);
}
};

tracing::debug!(
old_db_id,
tenant_dbname = debug(&tenant_dbname),
"rename_database"
);

// get new db, exists return err
let (db_id_seq, _db_id) = get_id_value(self, &tenant_newdbname).await?;
db_has_to_not_exist(db_id_seq, &tenant_newdbname, "rename_database")?;

// rename database
{
let txn_req = TxnRequest {
condition: vec![
// Prevent renaming or deleting in other threads.
txn_cond_seq(tenant_dbname, Eq, old_db_id_seq)?,
txn_cond_seq(&tenant_newdbname, Eq, 0)?,
],
if_then: vec![
txn_op_del(tenant_dbname)?, // del old_db_name
//Renaming db should not affect the seq of db_meta. Just modify db name.
txn_op_put(&tenant_newdbname, serialize_id(old_db_id)?)?, // (tenant, new_db_name) -> old_db_id
],
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

tracing::debug!(
name = debug(&tenant_dbname),
to = debug(&tenant_newdbname),
database_id = debug(&old_db_id),
succ = display(succ),
"rename_database"
);

if succ {
return Ok(RenameDatabaseReply {});
}
}
}
}

async fn get_database(&self, req: GetDatabaseReq) -> Result<Arc<DatabaseInfo>, MetaError> {
let name_key = &req.inner;

Expand Down Expand Up @@ -820,6 +896,25 @@ fn table_has_to_exist(
}
}

/// Return OK if a db_id or db_meta does not exist by checking the seq.
///
/// Otherwise returns DatabaseAlreadyExists error
fn db_has_to_not_exist(
seq: u64,
name_ident: &DatabaseNameIdent,
ctx: impl Display,
) -> Result<(), MetaError> {
if seq == 0 {
Ok(())
} else {
tracing::debug!(seq, ?name_ident, "exist");

Err(MetaError::AppError(AppError::DatabaseAlreadyExists(
DatabaseAlreadyExists::new(&name_ident.db_name, format!("{}: {}", ctx, name_ident)),
)))
}
}

/// Return OK if a table_id or table_meta does not exist by checking the seq.
///
/// Otherwise returns TableAlreadyExists error
Expand Down
115 changes: 115 additions & 0 deletions common/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_meta_types::GetDatabaseReq;
use common_meta_types::GetTableReq;
use common_meta_types::ListDatabaseReq;
use common_meta_types::ListTableReq;
use common_meta_types::RenameDatabaseReq;
use common_meta_types::RenameTableReq;
use common_meta_types::TableIdent;
use common_meta_types::TableInfo;
Expand Down Expand Up @@ -424,6 +425,120 @@ impl SchemaApiTestSuite {
Ok(())
}

pub async fn database_rename<MT: SchemaApi>(self, mt: &MT) -> anyhow::Result<()> {
let tenant = "tenant1";
let db_name = "db1";
let db2_name = "db2";
let new_db_name = "db3";

tracing::info!("--- rename not exists db1 to not exists db2");
{
let req = RenameDatabaseReq {
if_exists: false,
name_ident: DatabaseNameIdent {
tenant: tenant.to_string(),
db_name: db_name.to_string(),
},
new_db_name: new_db_name.to_string(),
};

let res = mt.rename_database(req).await;
tracing::info!("rename database res: {:?}", res);
assert!(res.is_err());
assert_eq!(
ErrorCode::UnknownDatabase("").code(),
ErrorCode::from(res.unwrap_err()).code()
);
}

tracing::info!("--- prepare db1 and db2");
{
// prepare db2
let res = self.create_database(mt, tenant, "db1", "eng1").await?;
assert_eq!(1, res.db_id);

tracing::info!("--- rename not exists db4 to exists db1");
{
let req = RenameDatabaseReq {
if_exists: false,
name_ident: DatabaseNameIdent {
tenant: tenant.to_string(),
db_name: "db4".to_string(),
},
new_db_name: db_name.to_string(),
};

let res = mt.rename_database(req).await;
tracing::info!("rename database res: {:?}", res);
assert!(res.is_err());
assert_eq!(
ErrorCode::UnknownDatabase("").code(),
ErrorCode::from(res.unwrap_err()).code()
);
}

// prepare db2
let res = self.create_database(mt, tenant, "db2", "eng1").await?;
assert!(res.db_id > 1);
}

tracing::info!("--- rename exists db db1 to exists db db2");
{
let req = RenameDatabaseReq {
if_exists: false,
name_ident: DatabaseNameIdent {
tenant: tenant.to_string(),
db_name: db_name.to_string(),
},
new_db_name: db2_name.to_string(),
};

let res = mt.rename_database(req).await;
tracing::info!("rename database res: {:?}", res);
assert!(res.is_err());
assert_eq!(
ErrorCode::DatabaseAlreadyExists("").code(),
ErrorCode::from(res.unwrap_err()).code()
);
}

tracing::info!("--- rename exists db db1 to not exists mutable db");
{
let req = RenameDatabaseReq {
if_exists: false,
name_ident: DatabaseNameIdent {
tenant: tenant.to_string(),
db_name: db_name.to_string(),
},

new_db_name: new_db_name.to_string(),
};
let res = mt.rename_database(req).await;
tracing::info!("rename database res: {:?}", res);
assert!(res.is_ok());

let res = mt
.get_database(GetDatabaseReq::new(tenant, new_db_name))
.await;
tracing::debug!("get present database res: {:?}", res);
let res = res?;
assert_eq!(1, res.ident.db_id, "db3 id is 1");
assert_eq!("db3".to_string(), res.name_ident.db_name, "db3.db is db3");

tracing::info!("--- get old database after rename");
{
let res = mt.get_database(GetDatabaseReq::new(tenant, db_name)).await;
let err = res.err().unwrap();
assert_eq!(
ErrorCode::UnknownDatabase("").code(),
ErrorCode::from(err).code()
);
}
}

Ok(())
}

pub async fn table_create_get_drop<MT: SchemaApi>(&self, mt: &MT) -> anyhow::Result<()> {
let tenant = "tenant1";
let db_name = "db1";
Expand Down
6 changes: 6 additions & 0 deletions common/meta/embedded/tests/it/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ async fn test_meta_embedded_database_list_in_diff_tenant() -> anyhow::Result<()>
.await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_meta_embedded_database_rename() -> anyhow::Result<()> {
let mt = MetaEmbedded::new_temp().await?;
SchemaApiTestSuite {}.database_rename(&mt).await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_meta_embedded_table_create_get_drop() -> anyhow::Result<()> {
let mt = MetaEmbedded::new_temp().await?;
Expand Down
8 changes: 8 additions & 0 deletions common/meta/grpc/src/grpc_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use common_meta_types::ListTableReq;
use common_meta_types::MGetKVActionReply;
use common_meta_types::MetaId;
use common_meta_types::PrefixListReply;
use common_meta_types::RenameDatabaseReply;
use common_meta_types::RenameDatabaseReq;
use common_meta_types::RenameTableReply;
use common_meta_types::RenameTableReq;
use common_meta_types::ShareInfo;
Expand Down Expand Up @@ -179,6 +181,12 @@ impl RequestFor for DropDatabaseReq {
type Reply = DropDatabaseReply;
}

impl RequestFor for RenameDatabaseReq {
type Reply = RenameDatabaseReply;
}

// == table actions ==

impl RequestFor for CreateTableReq {
type Reply = CreateTableReply;
}
Expand Down
Loading

0 comments on commit 1cdccac

Please sign in to comment.