From 77c459f1c5fa49bf5d29e6c4bbbc3ac39e8e8223 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sat, 23 Mar 2024 00:53:59 +0800 Subject: [PATCH 1/5] fix: added `mdl` to control DDL serial execution --- Cargo.toml | 2 +- src/binder/copy.rs | 5 +- src/binder/mod.rs | 26 +++++++ src/db.rs | 76 +++++++++++++------ src/execution/volcano/{dml => ddl}/analyze.rs | 0 src/execution/volcano/ddl/mod.rs | 1 + src/execution/volcano/dml/mod.rs | 1 - src/execution/volcano/mod.rs | 2 +- tests/slt/basic_test.slt | 10 +++ 9 files changed, 93 insertions(+), 30 deletions(-) rename src/execution/volcano/{dml => ddl}/analyze.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index c2d8bff5..3d089da5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ harness = false [dependencies] ahash = { version = "0.8.11" } +async-lock = { version = "3.3.0" } async-trait = { version = "0.1.77", optional = true } bincode = { version = "1.3.3" } bytes = { version = "1.5.0" } @@ -50,7 +51,6 @@ kip_db = { version = "0.1.2-alpha.25.fix2" } lazy_static = { version = "1.4.0" } log = { version = "0.4.21", optional = true } ordered-float = { version = "4.2.0" } -parking_lot = { version = "0.12.1" } petgraph = { version = "0.6.4" } pgwire = { version = "0.19.2", optional = true } rand = { version = "0.9.0-alpha.0" } diff --git a/src/binder/copy.rs b/src/binder/copy.rs index 58160289..64613a5c 100644 --- a/src/binder/copy.rs +++ b/src/binder/copy.rs @@ -97,10 +97,7 @@ impl<'a, T: Transaction> Binder<'a, T> { )) } } else { - Err(DatabaseError::InvalidTable(format!( - "not found table {}", - table_name - ))) + Err(DatabaseError::TableNotFound) } } } diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 76836dbe..6e358f2a 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -34,6 +34,32 @@ pub enum InputRefType { GroupBy, } +pub enum CommandType { + DQL, + DML, + DDL, +} + +pub fn command_type(stmt: &Statement) -> Result { + match stmt { + Statement::Analyze { .. } + | Statement::CreateTable { .. } + | Statement::CreateIndex { .. } + | Statement::AlterTable { .. } + | Statement::Drop { .. } => Ok(CommandType::DDL), + Statement::Query(_) + | Statement::Explain { .. } + | Statement::ExplainTable { .. } + | Statement::ShowTables { .. } => Ok(CommandType::DQL), + Statement::Truncate { .. } + | Statement::Update { .. } + | Statement::Delete { .. } + | Statement::Insert { .. } + | Statement::Copy { .. } => Ok(CommandType::DML), + stmt => Err(DatabaseError::UnsupportedStmt(stmt.to_string())), + } +} + // Tips: only query now! #[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)] pub enum QueryBindStep { diff --git a/src/db.rs b/src/db.rs index ea67f3fe..39f09a8d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,9 +1,11 @@ use ahash::HashMap; +use async_lock::{RwLock, RwLockReadGuardArc, RwLockWriteGuardArc}; +use sqlparser::ast::Statement; use std::path::PathBuf; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use crate::binder::{Binder, BinderContext}; +use crate::binder::{command_type, Binder, BinderContext, CommandType}; use crate::errors::DatabaseError; use crate::execution::volcano::{build_write, try_collect}; use crate::expression::function::{FunctionSummary, ScalarFunctionImpl}; @@ -19,6 +21,12 @@ use crate::types::tuple::{SchemaRef, Tuple}; pub(crate) type Functions = HashMap>; +#[allow(dead_code)] +pub(crate) enum MetaDataLock { + Read(RwLockReadGuardArc<()>), + Write(RwLockWriteGuardArc<()>), +} + pub struct DataBaseBuilder { path: PathBuf, functions: Functions, @@ -45,6 +53,7 @@ impl DataBaseBuilder { Ok(Database { storage, functions: Arc::new(self.functions), + mdl: Arc::new(RwLock::new(())), }) } } @@ -52,6 +61,7 @@ impl DataBaseBuilder { pub struct Database { pub storage: S, functions: Arc, + mdl: Arc>, } impl Database { @@ -60,8 +70,19 @@ impl Database { &self, sql: T, ) -> Result<(SchemaRef, Vec), DatabaseError> { + // parse + let stmts = parse_sql(sql)?; + if stmts.is_empty() { + return Err(DatabaseError::EmptyStatement); + } + let stmt = &stmts[0]; + let _guard = if matches!(command_type(stmt)?, CommandType::DDL) { + MetaDataLock::Write(self.mdl.write_arc().await) + } else { + MetaDataLock::Read(self.mdl.read_arc().await) + }; let transaction = self.storage.transaction().await?; - let plan = Self::build_plan::(sql, &transaction, &self.functions)?; + let plan = Self::build_plan(stmt, &transaction, &self.functions)?; Self::run_volcano(transaction, plan).await } @@ -81,24 +102,20 @@ impl Database { } pub async fn new_transaction(&self) -> Result, DatabaseError> { - let transaction = self.storage.transaction().await?; + let guard = self.mdl.read_arc().await; Ok(DBTransaction { - inner: transaction, + inner: self.storage.transaction().await?, functions: self.functions.clone(), + _guard: guard, }) } - pub fn build_plan, T: Transaction>( - sql: V, + pub(crate) fn build_plan( + stmt: &Statement, transaction: &::TransactionType, functions: &Functions, ) -> Result { - // parse - let stmts = parse_sql(sql)?; - if stmts.is_empty() { - return Err(DatabaseError::EmptyStatement); - } let mut binder = Binder::new( BinderContext::new(transaction, functions, Arc::new(AtomicUsize::new(0))), None, @@ -110,7 +127,7 @@ impl Database { /// Sort(a) /// Limit(1) /// Project(a,b) - let source_plan = binder.bind(&stmts[0])?; + let source_plan = binder.bind(stmt)?; // println!("source_plan plan: {:#?}", source_plan); let best_plan = @@ -200,6 +217,7 @@ impl Database { pub struct DBTransaction { inner: S::TransactionType, functions: Arc, + _guard: RwLockReadGuardArc<()>, } impl DBTransaction { @@ -207,8 +225,19 @@ impl DBTransaction { &mut self, sql: T, ) -> Result<(SchemaRef, Vec), DatabaseError> { + let stmts = parse_sql(sql)?; + if stmts.is_empty() { + return Err(DatabaseError::EmptyStatement); + } + let stmt = &stmts[0]; + if matches!(command_type(stmt)?, CommandType::DDL) { + return Err(DatabaseError::UnsupportedStmt( + "`DDL` is not allowed to execute within a transaction".to_string(), + )); + } let mut plan = - Database::::build_plan::(sql, &self.inner, &self.functions)?; + Database::::build_plan(stmt, &self.inner, &self.functions)?; + let schema = plan.output_schema().clone(); let mut stream = build_write(plan, &mut self.inner); @@ -298,20 +327,17 @@ mod test { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build().await?; - let mut tx_1 = fnck_sql.new_transaction().await?; - let mut tx_2 = fnck_sql.new_transaction().await?; - - let _ = tx_1 + let _ = fnck_sql .run("create table t1 (a int primary key, b int)") .await?; - let _ = tx_2 - .run("create table t1 (c int primary key, d int)") - .await?; + + let mut tx_1 = fnck_sql.new_transaction().await?; + let mut tx_2 = fnck_sql.new_transaction().await?; let _ = tx_1.run("insert into t1 values(0, 0)").await?; let _ = tx_1.run("insert into t1 values(1, 1)").await?; - let _ = tx_2.run("insert into t1 values(2, 2)").await?; + let _ = tx_2.run("insert into t1 values(0, 0)").await?; let _ = tx_2.run("insert into t1 values(3, 3)").await?; let (_, tuples_1) = tx_1.run("select * from t1").await?; @@ -338,8 +364,8 @@ mod test { assert_eq!( tuples_2[0].values, vec![ - Arc::new(DataValue::Int32(Some(2))), - Arc::new(DataValue::Int32(Some(2))) + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(Some(0))) ] ); assert_eq!( @@ -354,6 +380,10 @@ mod test { assert!(tx_2.commit().await.is_err()); + let mut tx_3 = fnck_sql.new_transaction().await?; + let res = tx_3.run("create table t2 (a int primary key, b int)").await; + assert!(res.is_err()); + Ok(()) } } diff --git a/src/execution/volcano/dml/analyze.rs b/src/execution/volcano/ddl/analyze.rs similarity index 100% rename from src/execution/volcano/dml/analyze.rs rename to src/execution/volcano/ddl/analyze.rs diff --git a/src/execution/volcano/ddl/mod.rs b/src/execution/volcano/ddl/mod.rs index 384bd2d7..0990a4f7 100644 --- a/src/execution/volcano/ddl/mod.rs +++ b/src/execution/volcano/ddl/mod.rs @@ -1,4 +1,5 @@ pub mod add_column; +pub(crate) mod analyze; pub(crate) mod create_index; pub(crate) mod create_table; pub mod drop_column; diff --git a/src/execution/volcano/dml/mod.rs b/src/execution/volcano/dml/mod.rs index 094edda3..fac541fe 100644 --- a/src/execution/volcano/dml/mod.rs +++ b/src/execution/volcano/dml/mod.rs @@ -1,4 +1,3 @@ -pub(crate) mod analyze; pub(crate) mod copy_from_file; pub(crate) mod copy_to_file; pub(crate) mod delete; diff --git a/src/execution/volcano/mod.rs b/src/execution/volcano/mod.rs index 472ace14..ab23dcd0 100644 --- a/src/execution/volcano/mod.rs +++ b/src/execution/volcano/mod.rs @@ -3,12 +3,12 @@ pub(crate) mod dml; pub(crate) mod dql; use crate::errors::DatabaseError; +use crate::execution::volcano::ddl::analyze::Analyze; use crate::execution::volcano::ddl::create_index::CreateIndex; use crate::execution::volcano::ddl::create_table::CreateTable; use crate::execution::volcano::ddl::drop_column::DropColumn; use crate::execution::volcano::ddl::drop_table::DropTable; use crate::execution::volcano::ddl::truncate::Truncate; -use crate::execution::volcano::dml::analyze::Analyze; use crate::execution::volcano::dml::copy_from_file::CopyFromFile; use crate::execution::volcano::dml::delete::Delete; use crate::execution::volcano::dml::insert::Insert; diff --git a/tests/slt/basic_test.slt b/tests/slt/basic_test.slt index 34fff868..b584facf 100644 --- a/tests/slt/basic_test.slt +++ b/tests/slt/basic_test.slt @@ -86,6 +86,16 @@ select CAST(name AS BIGINT) from t statement ok select CAST(id AS VARCHAR) from t +statement ok +create table t1 (id int primary key, name VARCHAR NOT NULL) + +# issue: https://github.com/KipData/FnckSQL/issues/175 +statement error +select t.name from t1; + +statement ok +drop table t1 + statement ok drop table if exists t From 05ce6f59a005a95af9c94bc6bc5ad3ea2790027d Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Mon, 18 Mar 2024 01:26:55 +0800 Subject: [PATCH 2/5] perf:make txn share storage table catalog cache --- src/storage/kip.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/storage/kip.rs b/src/storage/kip.rs index 323989ad..4aeed2ab 100644 --- a/src/storage/kip.rs +++ b/src/storage/kip.rs @@ -23,6 +23,7 @@ use std::sync::Arc; pub struct KipStorage { pub inner: Arc, pub(crate) meta_cache: Arc, + pub(crate) table_cache: Arc>, } impl KipStorage { @@ -31,10 +32,12 @@ impl KipStorage { storage::KipStorage::open_with_config(Config::new(path).enable_level_0_memorization()) .await?; let meta_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new()).unwrap()); + let table_cache = Arc::new(ShardingLruCache::new(128, 16, RandomState::new()).unwrap()); Ok(KipStorage { inner: Arc::new(storage), meta_cache, + table_cache, }) } } @@ -47,7 +50,7 @@ impl Storage for KipStorage { Ok(KipTransaction { tx, - table_cache: ShardingLruCache::new(8, 2, RandomState::default())?, + table_cache: Arc::clone(&self.table_cache), meta_cache: self.meta_cache.clone(), }) } @@ -57,8 +60,8 @@ pub(crate) type StatisticsMetaCache = ShardingLruCache<(TableName, IndexId), Sta pub struct KipTransaction { tx: mvcc::Transaction, - table_cache: ShardingLruCache, - meta_cache: Arc, + table_cache: Arc>, + meta_cache: Arc>>, } impl Transaction for KipTransaction { From 64644827ab3fd4ec0e95e1d98a4040eb17c0c499 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sat, 23 Mar 2024 01:00:16 +0800 Subject: [PATCH 3/5] fix: code merge --- src/db.rs | 3 +-- src/storage/kip.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/db.rs b/src/db.rs index 39f09a8d..c0f62462 100644 --- a/src/db.rs +++ b/src/db.rs @@ -235,8 +235,7 @@ impl DBTransaction { "`DDL` is not allowed to execute within a transaction".to_string(), )); } - let mut plan = - Database::::build_plan(stmt, &self.inner, &self.functions)?; + let mut plan = Database::::build_plan(stmt, &self.inner, &self.functions)?; let schema = plan.output_schema().clone(); let mut stream = build_write(plan, &mut self.inner); diff --git a/src/storage/kip.rs b/src/storage/kip.rs index 4aeed2ab..d71ee85e 100644 --- a/src/storage/kip.rs +++ b/src/storage/kip.rs @@ -61,7 +61,7 @@ pub(crate) type StatisticsMetaCache = ShardingLruCache<(TableName, IndexId), Sta pub struct KipTransaction { tx: mvcc::Transaction, table_cache: Arc>, - meta_cache: Arc>>, + meta_cache: Arc, } impl Transaction for KipTransaction { From 29106929a554d7d86a9db317d382dc85fc3a170a Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sat, 23 Mar 2024 01:07:56 +0800 Subject: [PATCH 4/5] style: code fmt --- src/db.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/db.rs b/src/db.rs index c0f62462..98a9f8b0 100644 --- a/src/db.rs +++ b/src/db.rs @@ -103,9 +103,10 @@ impl Database { pub async fn new_transaction(&self) -> Result, DatabaseError> { let guard = self.mdl.read_arc().await; + let transaction = self.storage.transaction().await?; Ok(DBTransaction { - inner: self.storage.transaction().await?, + inner: transaction, functions: self.functions.clone(), _guard: guard, }) From fffcff63ece42897f38005965d46d741fdd18e03 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sat, 23 Mar 2024 03:22:26 +0800 Subject: [PATCH 5/5] fix: rollback `analyze` on `ddl` to `dml` --- src/binder/mod.rs | 6 +++--- src/execution/volcano/ddl/mod.rs | 1 - src/execution/volcano/{ddl => dml}/analyze.rs | 0 src/execution/volcano/dml/mod.rs | 1 + src/execution/volcano/mod.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) rename src/execution/volcano/{ddl => dml}/analyze.rs (100%) diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 6e358f2a..9919a09f 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -42,8 +42,7 @@ pub enum CommandType { pub fn command_type(stmt: &Statement) -> Result { match stmt { - Statement::Analyze { .. } - | Statement::CreateTable { .. } + Statement::CreateTable { .. } | Statement::CreateIndex { .. } | Statement::AlterTable { .. } | Statement::Drop { .. } => Ok(CommandType::DDL), @@ -51,7 +50,8 @@ pub fn command_type(stmt: &Statement) -> Result { | Statement::Explain { .. } | Statement::ExplainTable { .. } | Statement::ShowTables { .. } => Ok(CommandType::DQL), - Statement::Truncate { .. } + Statement::Analyze { .. } + | Statement::Truncate { .. } | Statement::Update { .. } | Statement::Delete { .. } | Statement::Insert { .. } diff --git a/src/execution/volcano/ddl/mod.rs b/src/execution/volcano/ddl/mod.rs index 0990a4f7..384bd2d7 100644 --- a/src/execution/volcano/ddl/mod.rs +++ b/src/execution/volcano/ddl/mod.rs @@ -1,5 +1,4 @@ pub mod add_column; -pub(crate) mod analyze; pub(crate) mod create_index; pub(crate) mod create_table; pub mod drop_column; diff --git a/src/execution/volcano/ddl/analyze.rs b/src/execution/volcano/dml/analyze.rs similarity index 100% rename from src/execution/volcano/ddl/analyze.rs rename to src/execution/volcano/dml/analyze.rs diff --git a/src/execution/volcano/dml/mod.rs b/src/execution/volcano/dml/mod.rs index fac541fe..094edda3 100644 --- a/src/execution/volcano/dml/mod.rs +++ b/src/execution/volcano/dml/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod analyze; pub(crate) mod copy_from_file; pub(crate) mod copy_to_file; pub(crate) mod delete; diff --git a/src/execution/volcano/mod.rs b/src/execution/volcano/mod.rs index ab23dcd0..472ace14 100644 --- a/src/execution/volcano/mod.rs +++ b/src/execution/volcano/mod.rs @@ -3,12 +3,12 @@ pub(crate) mod dml; pub(crate) mod dql; use crate::errors::DatabaseError; -use crate::execution::volcano::ddl::analyze::Analyze; use crate::execution::volcano::ddl::create_index::CreateIndex; use crate::execution::volcano::ddl::create_table::CreateTable; use crate::execution::volcano::ddl::drop_column::DropColumn; use crate::execution::volcano::ddl::drop_table::DropTable; use crate::execution::volcano::ddl::truncate::Truncate; +use crate::execution::volcano::dml::analyze::Analyze; use crate::execution::volcano::dml::copy_from_file::CopyFromFile; use crate::execution::volcano::dml::delete::Delete; use crate::execution::volcano::dml::insert::Insert;