Skip to content

Commit

Permalink
chore: improve the code
Browse files Browse the repository at this point in the history
  • Loading branch information
e1ijah1 committed Jan 11, 2023
1 parent 629fb72 commit 9512df3
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 180 deletions.
19 changes: 2 additions & 17 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub trait CatalogManager: CatalogList {
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;

/// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed.
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool>;
async fn rename_table(&self, request: RenameTableRequest, table_id: TableId) -> Result<bool>;

/// Register a system table, should be called before starting the manager.
async fn register_system_table(&self, request: RegisterSystemTableRequest)
Expand Down Expand Up @@ -145,27 +145,12 @@ impl Debug for RegisterTableRequest {
}
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct RenameTableRequest {
pub catalog: String,
pub schema: String,
pub table_name: String,
pub new_table_name: String,
pub table_id: TableId,
pub table: TableRef,
}

impl Debug for RenameTableRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RenameTableRequest")
.field("catalog", &self.catalog)
.field("schema", &self.schema)
.field("table_name", &self.table_name)
.field("new_table_name", &self.new_table_name)
.field("table_id", &self.table_id)
.field("table", &self.table.table_info())
.finish()
}
}

#[derive(Clone)]
Expand Down
10 changes: 5 additions & 5 deletions src/catalog/src/local/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl CatalogManager for LocalCatalogManager {
}
}

async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
async fn rename_table(&self, request: RenameTableRequest, table_id: TableId) -> Result<bool> {
let started = self.init_lock.lock().await;

ensure!(
Expand Down Expand Up @@ -411,12 +411,12 @@ impl CatalogManager for LocalCatalogManager {
catalog_name.clone(),
schema_name.clone(),
request.new_table_name.clone(),
request.table_id,
table_id,
)
.await?;
schema
.rename_table(&request.table_name, request.new_table_name, request.table)
.map(|v| v.is_none())
Ok(schema
.rename_table(&request.table_name, request.new_table_name)
.is_ok())
}

async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
Expand Down
47 changes: 16 additions & 31 deletions src/catalog/src/local/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl CatalogManager for MemoryCatalogManager {
.map(|v| v.is_none())
}

async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
async fn rename_table(&self, request: RenameTableRequest, _table_id: TableId) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
.get(&request.catalog)
Expand All @@ -106,9 +106,9 @@ impl CatalogManager for MemoryCatalogManager {
catalog: &request.catalog,
schema: &request.schema,
})?;
schema
.rename_table(&request.table_name, request.new_table_name, request.table)
.map(|v| v.is_none())
Ok(schema
.rename_table(&request.table_name, request.new_table_name)
.is_ok())
}

async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
Expand Down Expand Up @@ -312,25 +312,11 @@ impl SchemaProvider for MemorySchemaProvider {
}
}

fn rename_table(
&self,
name: &str,
new_name: String,
table: TableRef,
) -> Result<Option<TableRef>> {
fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef> {
let mut tables = self.tables.write().unwrap();
if let Some(existing) = tables.get(name) {
// if table with the same name but different table id exists, then it's a fatal bug
if existing.table_info().ident.table_id != table.table_info().ident.table_id {
error!(
"Unexpected table rename: {:?}, existing: {:?}",
table.table_info(),
existing.table_info()
);
return TableExistsSnafu { table: name }.fail()?;
}
tables.remove(name);
Ok(tables.insert(new_name, table))
if tables.get(name).is_some() {
let table = tables.remove(name).unwrap();
Ok(tables.insert(new_name, table).unwrap())
} else {
TableNotFoundSnafu {
table_info: name.to_string(),
Expand Down Expand Up @@ -418,10 +404,9 @@ mod tests {

// rename test table
let new_table_name = "numbers";
assert!(provider
.rename_table(table_name, new_table_name.to_string(), test_table.clone(),)
.unwrap()
.is_none());
provider
.rename_table(table_name, new_table_name.to_string())
.unwrap();

// test old table name not exist
assert!(!provider.table_exist(table_name).unwrap());
Expand All @@ -438,7 +423,6 @@ mod tests {
let other_table = Arc::new(NumbersTable::new(2));
let result = provider.register_table(new_table_name.to_string(), other_table);
let err = result.err().unwrap();
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
}

Expand All @@ -459,7 +443,7 @@ mod tests {
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
table: table.clone(),
table,
};
assert!(catalog.register_table(register_table_req).await.unwrap());
assert!(schema.table_exist(table_name).unwrap());
Expand All @@ -471,10 +455,11 @@ mod tests {
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
new_table_name: new_table_name.to_string(),
table_id,
table: table.clone(),
};
assert!(catalog.rename_table(rename_table_req).await.unwrap());
assert!(catalog
.rename_table(rename_table_req, table_id)
.await
.unwrap());
assert!(!schema.table_exist(table_name).unwrap());
assert!(schema.table_exist(new_table_name).unwrap());

Expand Down
20 changes: 10 additions & 10 deletions src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,11 @@ impl CatalogManager for RemoteCatalogManager {
Ok(true)
}

async fn rename_table(&self, _request: RenameTableRequest) -> Result<bool> {
// todo impl rename_table for catalog manager
todo!()
async fn rename_table(&self, _request: RenameTableRequest, _table_id: TableId) -> Result<bool> {
UnimplementedSnafu {
operation: "rename table",
}
.fail()
}

async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
Expand Down Expand Up @@ -744,13 +746,11 @@ impl SchemaProvider for RemoteSchemaProvider {
prev
}

fn rename_table(
&self,
_name: &str,
_new_name: String,
_table: TableRef,
) -> Result<Option<TableRef>> {
todo!()
fn rename_table(&self, _name: &str, _new_name: String) -> Result<TableRef> {
UnimplementedSnafu {
operation: "rename table",
}
.fail()
}

fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
Expand Down
7 changes: 1 addition & 6 deletions src/catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ pub trait SchemaProvider: Sync + Send {

/// If supported by the implementation, renames an existing table from this schema and returns it.
/// If no table of that name exists, returns "Table not found" error.
fn rename_table(
&self,
name: &str,
new_name: String,
table: TableRef,
) -> Result<Option<TableRef>>;
fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef>;

/// If supported by the implementation, removes an existing table from this schema and returns it.
/// If no table of that name exists, returns Ok(None).
Expand Down
3 changes: 2 additions & 1 deletion src/catalog/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ fn build_system_catalog_schema() -> Schema {
}

/// Formats key string for table entry in system catalog
#[inline]
pub fn format_table_entry_key(catalog: &str, schema: &str, table_id: TableId) -> String {
format!("{catalog}.{schema}.{table_id}")
}
Expand Down Expand Up @@ -309,7 +310,7 @@ pub fn decode_system_catalog(
debug!("Table meta value: {}", String::from_utf8_lossy(value));
let table_meta: TableEntryValue =
serde_json::from_slice(value).context(ValueDeserializeSnafu)?;
let table_id = table_parts[2].parse::<u32>().unwrap();
let table_id = table_parts[2].parse::<TableId>().unwrap();
Ok(Entry::Table(TableEntry {
catalog_name: table_parts[0].to_string(),
schema_name: table_parts[1].to_string(),
Expand Down
9 changes: 2 additions & 7 deletions src/catalog/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,8 @@ impl SchemaProvider for InformationSchema {
panic!("System catalog & schema does not support register table")
}

fn rename_table(
&self,
_name: &str,
_new_name: String,
_table: TableRef,
) -> crate::error::Result<Option<TableRef>> {
panic!("System catalog & schema does not support rename table")
fn rename_table(&self, _name: &str, _new_name: String) -> crate::error::Result<TableRef> {
unimplemented!("System catalog & schema does not support rename table")
}

fn deregister_table(&self, _name: &str) -> crate::error::Result<Option<TableRef>> {
Expand Down
4 changes: 1 addition & 3 deletions src/catalog/tests/local_catalog_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,9 @@ mod tests {
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
new_table_name: new_table_name.to_string(),
table_id,
table: table.clone(),
};
assert!(catalog_manager
.rename_table(rename_table_req)
.rename_table(rename_table_req, table_id)
.await
.unwrap());

Expand Down
7 changes: 1 addition & 6 deletions src/datanode/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,7 @@ mod tests {
unimplemented!();
}

fn rename_table(
&self,
_name: &str,
_new_name: String,
_table: TableRef,
) -> catalog::error::Result<Option<TableRef>> {
fn rename_table(&self, _name: &str, _new_name: String) -> catalog::error::Result<TableRef> {
unimplemented!()
}

Expand Down
10 changes: 4 additions & 6 deletions src/datanode/src/sql/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,24 @@ impl SqlHandler {
table_name: &full_table_name,
}
);
let kind = req.alter_kind.clone();
let is_rename = req.is_rename_table();
let table =
self.table_engine
.alter_table(&ctx, req)
.await
.context(error::AlterTableSnafu {
table_name: full_table_name,
})?;
let table_info = &table.table_info();
if let AlterKind::RenameTable { .. } = kind {
if is_rename {
let table_info = &table.table_info();
let rename_table_req = RenameTableRequest {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table_name,
new_table_name: table_info.name.clone(),
table_id: table_info.ident.table_id,
table,
};
self.catalog_manager
.rename_table(rename_table_req)
.rename_table(rename_table_req, table_info.ident.table_id)
.await
.context(error::RenameTableSnafu)?;
}
Expand Down
Loading

0 comments on commit 9512df3

Please sign in to comment.