Skip to content

Commit

Permalink
feat: add TabelCreator/TableDroper for executing create/drop table (#303
Browse files Browse the repository at this point in the history
)

* feat: add TabelCreator/TableDroper for executing create/drop table

* chore: replace table creator/dropper with Manipulator

* chore: release disk quota before recover cache
  • Loading branch information
ShiKaiWi authored Oct 15, 2022
1 parent 09bf075 commit 4752653
Show file tree
Hide file tree
Showing 17 changed files with 330 additions and 207 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ jobs:
- run: |
rustup set auto-self-update disable
rustup toolchain install ${{ matrix.rust }} --profile minimal
- name: Release Disk Quota
run: |
sudo rm -rf /usr/local/lib/android # release about 10 GB
sudo rm -rf /usr/share/dotnet # release about 20GB
- name: Cache Rust Dependencies
uses: actions/cache@v3
with:
Expand All @@ -60,8 +64,6 @@ jobs:
debug-${{ runner.os }}
- name: Ensure Disk Quota
run: |
sudo rm -rf /usr/local/lib/android # release about 10 GB
sudo rm -rf /usr/share/dotnet # release about 20GB
make ensure-disk-quota
- name: Setup Build Environment
run: |
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/tsbs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ jobs:
- uses: actions/checkout@v3
with:
submodules: true
- name: Release Disk Quota
run: |
sudo rm -rf /usr/local/lib/android # release about 10 GB
sudo rm -rf /usr/share/dotnet # release about 20GB
- name: Cache Rust Dependencies
uses: actions/cache@v3
with:
Expand All @@ -30,8 +34,6 @@ jobs:
release-${{ runner.os }}
- name: Ensure Disk Quota
run: |
sudo rm -rf /usr/local/lib/android # release about 10 GB
sudo rm -rf /usr/share/dotnet # release about 20GB
make ensure-disk-quota
- name: Setup Build Environment
run: |
Expand Down
96 changes: 11 additions & 85 deletions interpreters/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,21 @@
//! Interpreter for create statements

use async_trait::async_trait;
use catalog::{
manager::ManagerRef,
schema::{CreateOptions, CreateTableRequest},
};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use snafu::{ResultExt, Snafu};
use sql::plan::CreateTablePlan;
use table_engine::engine::{TableEngineRef, TableState};
use table_engine::engine::TableEngineRef;

use crate::{
context::Context,
interpreter::{Create, Interpreter, InterpreterPtr, Output, Result as InterpreterResult},
table_manipulator::{self, TableManipulatorRef},
};

#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum Error {
#[snafu(display("Failed to find catalog, name:{}, err:{}", name, source))]
FindCatalog {
name: String,
source: catalog::manager::Error,
},

#[snafu(display("Catalog not exists, name:{}.\nBacktrace:\n{}", name, backtrace))]
CatalogNotExists { name: String, backtrace: Backtrace },

#[snafu(display("Failed to find schema, name:{}, err:{}", name, source))]
FindSchema {
name: String,
source: catalog::Error,
},

#[snafu(display("Schema not exists, name:{}.\nBacktrace:\n{}", name, backtrace))]
SchemaNotExists { name: String, backtrace: Backtrace },

#[snafu(display("Failed to create table, name:{}, err:{}", table, source))]
SchemaCreateTable {
table: String,
source: catalog::schema::Error,
},

#[snafu(display("Failed to allocate table id, err:{}", source))]
AllocTableId { source: catalog::schema::Error },
#[snafu(display("Failed to create table by table manipulator, err:{}", source))]
ManipulateTable { source: table_manipulator::Error },
}

define_result!(Error);
Expand All @@ -53,79 +26,32 @@ define_result!(Error);
pub struct CreateInterpreter {
ctx: Context,
plan: CreateTablePlan,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
}

impl CreateInterpreter {
pub fn create(
ctx: Context,
plan: CreateTablePlan,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
) -> InterpreterPtr {
Box::new(Self {
ctx,
plan,
catalog_manager,
table_engine,
table_manipulator,
})
}
}

impl CreateInterpreter {
async fn execute_create(self: Box<Self>) -> Result<Output> {
let default_catalog = self.ctx.default_catalog();
let catalog = self
.catalog_manager
.catalog_by_name(default_catalog)
.context(FindCatalog {
name: default_catalog,
})?
.context(CatalogNotExists {
name: default_catalog,
})?;

let default_schema = self.ctx.default_schema();
let schema = catalog
.schema_by_name(default_schema)
.context(FindSchema {
name: default_schema,
})?
.context(SchemaNotExists {
name: default_schema,
})?;

let CreateTablePlan {
engine,
table,
table_schema,
if_not_exists,
options,
} = self.plan;

let request = CreateTableRequest {
catalog_name: catalog.name().to_string(),
schema_name: schema.name().to_string(),
schema_id: schema.id(),
table_name: table.clone(),
table_schema,
engine,
options,
state: TableState::Stable,
};

let opts = CreateOptions {
table_engine: self.table_engine,
create_if_not_exists: if_not_exists,
};

schema
.create_table(request, opts)
self.table_manipulator
.create_table(self.ctx, self.plan, self.table_engine)
.await
.context(SchemaCreateTable { table })?;

Ok(Output::AffectedRows(0))
.context(ManipulateTable)
}
}

Expand Down
90 changes: 12 additions & 78 deletions interpreters/src/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,21 @@
//! Interpreter for drop statements

use async_trait::async_trait;
use catalog::{manager::ManagerRef, schema::DropOptions};
use log::warn;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use snafu::{ResultExt, Snafu};
use sql::plan::DropTablePlan;
use table_engine::engine::{DropTableRequest, TableEngineRef};
use table_engine::engine::TableEngineRef;

use crate::{
context::Context,
interpreter::{Drop, Interpreter, InterpreterPtr, Output, Result as InterpreterResult},
table_manipulator::{self, TableManipulatorRef},
};

#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum Error {
#[snafu(display("Failed to find catalog, name:{}, err:{}", name, source))]
FindCatalog {
name: String,
source: catalog::manager::Error,
},

#[snafu(display("Catalog not exists, name:{}.\nBacktrace:\n{}", name, backtrace))]
CatalogNotExists { name: String, backtrace: Backtrace },

#[snafu(display("Failed to find schema, name:{}, err:{}", name, source))]
FindSchema {
name: String,
source: catalog::Error,
},

#[snafu(display("Schema not exists, name:{}.\nBacktrace:\n{}", name, backtrace))]
SchemaNotExists { name: String, backtrace: Backtrace },

#[snafu(display("Failed to drop table in schema, name:{}, err:{}", table, source))]
SchemaDropTable {
table: String,
source: catalog::schema::Error,
},

#[snafu(display("Failed to drop table, name:{}, err:{}", table, source))]
DropTable {
table: String,
source: table_engine::engine::Error,
},
#[snafu(display("Failed to drop table by table manipulator, err:{}", source))]
ManipulateTable { source: table_manipulator::Error },
}

define_result!(Error);
Expand All @@ -53,71 +26,32 @@ define_result!(Error);
pub struct DropInterpreter {
ctx: Context,
plan: DropTablePlan,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
}

impl DropInterpreter {
pub fn create(
ctx: Context,
plan: DropTablePlan,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
) -> InterpreterPtr {
Box::new(Self {
ctx,
plan,
catalog_manager,
table_engine,
table_manipulator,
})
}
}

impl DropInterpreter {
async fn execute_drop(self: Box<Self>) -> Result<Output> {
let default_catalog = self.ctx.default_catalog();
let catalog = self
.catalog_manager
.catalog_by_name(default_catalog)
.context(FindCatalog {
name: default_catalog,
})?
.context(CatalogNotExists {
name: default_catalog,
})?;

let default_schema = self.ctx.default_schema();
let schema = catalog
.schema_by_name(default_schema)
.context(FindSchema {
name: default_schema,
})?
.context(SchemaNotExists {
name: default_schema,
})?;

let table = self.plan.table;
let request = DropTableRequest {
catalog_name: catalog.name().to_string(),
schema_name: schema.name().to_string(),
schema_id: schema.id(),
table_name: table.clone(),
engine: self.plan.engine,
};

let opts = DropOptions {
table_engine: self.table_engine,
};

if schema
.drop_table(request, opts)
self.table_manipulator
.drop_table(self.ctx, self.plan, self.table_engine)
.await
.context(SchemaDropTable { table: &table })?
{
warn!("Table {} has been dropped already", &table);
}

Ok(Output::AffectedRows(0))
.context(ManipulateTable)
}
}

Expand Down
9 changes: 6 additions & 3 deletions interpreters/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,29 @@ use crate::{
alter_table::AlterTableInterpreter, context::Context, create::CreateInterpreter,
describe::DescribeInterpreter, drop::DropInterpreter, exists::ExistsInterpreter,
insert::InsertInterpreter, interpreter::InterpreterPtr, select::SelectInterpreter,
show::ShowInterpreter,
show::ShowInterpreter, table_manipulator::TableManipulatorRef,
};

/// A factory to create interpreters
pub struct Factory<Q> {
query_executor: Q,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
}

impl<Q: Executor + 'static> Factory<Q> {
pub fn new(
query_executor: Q,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
) -> Self {
Self {
query_executor,
catalog_manager,
table_engine,
table_manipulator,
}
}

Expand All @@ -39,10 +42,10 @@ impl<Q: Executor + 'static> Factory<Q> {
Plan::Query(p) => SelectInterpreter::create(ctx, p, self.query_executor),
Plan::Insert(p) => InsertInterpreter::create(ctx, p),
Plan::Create(p) => {
CreateInterpreter::create(ctx, p, self.catalog_manager, self.table_engine)
CreateInterpreter::create(ctx, p, self.table_engine, self.table_manipulator)
}
Plan::Drop(p) => {
DropInterpreter::create(ctx, p, self.catalog_manager, self.table_engine)
DropInterpreter::create(ctx, p, self.table_engine, self.table_manipulator)
}
Plan::Describe(p) => DescribeInterpreter::create(p),
Plan::AlterTable(p) => AlterTableInterpreter::create(p),
Expand Down
1 change: 1 addition & 0 deletions interpreters/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod insert;
pub mod interpreter;
pub mod select;
pub mod show;
pub mod table_manipulator;

mod show_create;

Expand Down
Loading

0 comments on commit 4752653

Please sign in to comment.