diff --git a/Cargo.lock b/Cargo.lock index 4dcf4b32e4fd4..ab583df331ac7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2313,6 +2313,7 @@ dependencies = [ "common-pipeline-sources", "common-sql", "common-storages-fuse", + "common-storages-result-cache", "common-storages-view", "common-users", "itertools", diff --git a/scripts/distribution/local-scripts/start.sh b/scripts/distribution/local-scripts/start.sh index 934f15fe775da..b6d39258133d3 100755 --- a/scripts/distribution/local-scripts/start.sh +++ b/scripts/distribution/local-scripts/start.sh @@ -2,7 +2,7 @@ echo "Stop old Databend instances" killall -9 databend-meta killall -9 databend-query echo "Deploy new Databend(standalone)" -ulimit -n 65535 +ulimit -n 65535 nohup bin/databend-meta --config-file=configs/databend-meta.toml 2>&1 >meta.log & sleep 3 # export STORAGE_S3_ENABLE_VIRTUAL_HOST_STYLE=true diff --git a/scripts/distribution/package-scripts/postinstall.sh b/scripts/distribution/package-scripts/postinstall.sh index 55e015eb5df56..c5131f4b1d5c6 100755 --- a/scripts/distribution/package-scripts/postinstall.sh +++ b/scripts/distribution/package-scripts/postinstall.sh @@ -5,13 +5,13 @@ set -e usermod --append --groups adm databend || true if getent group 'systemd-journal'; then - # Add Databend to systemd-journal to read journald logs - usermod --append --groups systemd-journal databend || true - systemctl daemon-reload || true + # Add Databend to systemd-journal to read journald logs + usermod --append --groups systemd-journal databend || true + systemctl daemon-reload || true fi if getent group 'systemd-journal-remote'; then - # Add Databend to systemd-journal-remote to read remote journald logs - usermod --append --groups systemd-journal-remote databend || true - systemctl daemon-reload || true + # Add Databend to systemd-journal-remote to read remote journald logs + usermod --append --groups systemd-journal-remote databend || true + systemctl daemon-reload || true fi diff --git a/scripts/distribution/package-scripts/preinstall.sh b/scripts/distribution/package-scripts/preinstall.sh index 06a55d5185961..88ae9acbcc24a 100755 --- a/scripts/distribution/package-scripts/preinstall.sh +++ b/scripts/distribution/package-scripts/preinstall.sh @@ -3,8 +3,8 @@ set -e # Add databend:databend user & group id --user databend >/dev/null 2>&1 || - useradd --system --shell /sbin/nologin --home-dir /var/lib/databend --user-group \ - --comment "Databend cloud data analytics" databend + useradd --system --shell /sbin/nologin --home-dir /var/lib/databend --user-group \ + --comment "Databend cloud data analytics" databend # Create default Databend data directory mkdir -p /var/lib/databend diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 328c5c33aca2f..a88d4a0245211 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -109,6 +109,9 @@ pub trait TableContext: Send + Sync { fn get_cluster(&self) -> Arc; fn get_processes_info(&self) -> Vec; fn get_stage_attachment(&self) -> Option; + fn get_last_query_id(&self, index: i32) -> String; + fn get_result_cache_key(&self, query_id: &str) -> Option; + fn set_query_id_result_cache(&self, query_id: String, result_cache_key: String); fn set_on_error_map(&self, map: Option>); fn apply_changed_settings(&self, changed_settings: Arc) -> Result<()>; diff --git a/src/query/service/src/databases/system/system_database.rs b/src/query/service/src/databases/system/system_database.rs index a614f81080883..4b538e1ac19ed 100644 --- a/src/query/service/src/databases/system/system_database.rs +++ b/src/query/service/src/databases/system/system_database.rs @@ -35,6 +35,7 @@ use common_storages_system::MallocStatsTotalsTable; use common_storages_system::MetricsTable; use common_storages_system::OneTable; use common_storages_system::ProcessesTable; +use common_storages_system::QueryCacheTable; use common_storages_system::QueryLogTable; use common_storages_system::RolesTable; use common_storages_system::SettingsTable; @@ -86,6 +87,7 @@ impl SystemDatabase { StagesTable::create(sys_db_meta.next_table_id()), BuildOptionsTable::create(sys_db_meta.next_table_id()), CatalogsTable::create(sys_db_meta.next_table_id()), + QueryCacheTable::create(sys_db_meta.next_table_id()), ]; for tbl in table_list.into_iter() { diff --git a/src/query/service/src/interpreters/interpreter_select_v2.rs b/src/query/service/src/interpreters/interpreter_select_v2.rs index 2e9acbd49787e..6483a8e7e8fe7 100644 --- a/src/query/service/src/interpreters/interpreter_select_v2.rs +++ b/src/query/service/src/interpreters/interpreter_select_v2.rs @@ -176,6 +176,10 @@ impl Interpreter for SelectInterpreterV2 { // 2. Check the cache. match cache_reader.try_read_cached_result().await { Ok(Some(blocks)) => { + // 2.0 update query_id -> result_cache_meta_key in session. + self.ctx + .get_current_session() + .update_query_ids_results(self.ctx.get_id(), cache_reader.get_meta_key()); // 2.1 If found, return the result directly. return PipelineBuildResult::from_blocks(blocks); } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 4332b2f1b483f..3c834dffe0b39 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -376,6 +376,24 @@ impl TableContext for QueryContext { self.shared.get_stage_attachment() } + fn get_last_query_id(&self, index: i32) -> String { + self.shared.session.session_ctx.get_last_query_id(index) + } + + fn get_result_cache_key(&self, query_id: &str) -> Option { + self.shared + .session + .session_ctx + .get_query_result_cache_key(query_id) + } + + fn set_query_id_result_cache(&self, query_id: String, result_cache_key: String) { + self.shared + .session + .session_ctx + .update_query_ids_results(query_id, Some(result_cache_key)) + } + fn set_on_error_map(&self, map: Option>) { self.shared.set_on_error_map(map); } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index afc9236362de6..e459260651830 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -336,6 +336,14 @@ impl QueryContextShared { } } +impl Drop for QueryContextShared { + fn drop(&mut self) { + self.session + .session_ctx + .update_query_ids_results(self.init_query_id.read().clone(), None) + } +} + pub fn short_sql(query: &str) -> String { use unicode_segmentation::UnicodeSegmentation; let query = query.trim_start(); diff --git a/src/query/service/src/sessions/session.rs b/src/query/service/src/sessions/session.rs index ad9b90e1c7e83..136488762ad5e 100644 --- a/src/query/service/src/sessions/session.rs +++ b/src/query/service/src/sessions/session.rs @@ -352,6 +352,15 @@ impl Session { pub fn get_status(self: &Arc) -> Arc> { self.status.clone() } + + pub fn get_query_result_cache_key(self: &Arc, query_id: &str) -> Option { + self.session_ctx.get_query_result_cache_key(query_id) + } + + pub fn update_query_ids_results(self: &Arc, query_id: String, result_cache_key: String) { + self.session_ctx + .update_query_ids_results(query_id, Some(result_cache_key)) + } } impl Drop for Session { diff --git a/src/query/service/src/sessions/session_ctx.rs b/src/query/service/src/sessions/session_ctx.rs index 8dc9c5278bfe2..d3438c255de13 100644 --- a/src/query/service/src/sessions/session_ctx.rs +++ b/src/query/service/src/sessions/session_ctx.rs @@ -51,6 +51,9 @@ pub struct SessionContext { client_host: RwLock>, io_shutdown_tx: RwLock>>>, query_context_shared: RwLock>, + // We store `query_id -> query_result_cache_key` to session context, so that we can fetch + // query result through previous query_id easily. + query_ids_results: RwLock)>>, } impl SessionContext { @@ -67,6 +70,7 @@ impl SessionContext { current_database: RwLock::new("default".to_string()), io_shutdown_tx: Default::default(), query_context_shared: Default::default(), + query_ids_results: Default::default(), })) } @@ -212,4 +216,45 @@ impl SessionContext { let mut lock = self.query_context_shared.write(); *lock = ctx } + + pub fn get_query_result_cache_key(&self, query_id: &str) -> Option { + let lock = self.query_ids_results.read(); + for (qid, result_cache_key) in (*lock).iter().rev() { + if qid.eq_ignore_ascii_case(query_id) { + return result_cache_key.clone(); + } + } + None + } + + pub fn update_query_ids_results(&self, query_id: String, value: Option) { + let mut lock = self.query_ids_results.write(); + // Here we use reverse iteration, as it is not common to modify elements from earlier. + for (idx, (qid, _)) in (*lock).iter().rev().enumerate() { + if qid.eq_ignore_ascii_case(&query_id) { + // update value iff value is some. + if let Some(v) = value { + (*lock)[idx] = (query_id, Some(v)) + } + return; + } + } + lock.push((query_id, value)) + } + + pub fn get_last_query_id(&self, index: i32) -> String { + let lock = self.query_ids_results.read(); + let query_ids_len = lock.len(); + let idx = if index < 0 { + query_ids_len as i32 + index + } else { + index + }; + + if idx < 0 || idx > (query_ids_len - 1) as i32 { + return "".to_string(); + } + + (*lock)[idx as usize].0.clone() + } } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 115a1845174d8..2d5c1ab75bb4e 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -444,6 +444,16 @@ impl TableContext for CtxDelegation { todo!() } + fn get_last_query_id(&self, _index: i32) -> String { + todo!() + } + fn get_result_cache_key(&self, _query_id: &str) -> Option { + todo!() + } + fn set_query_id_result_cache(&self, _query_id: String, _result_cache_key: String) { + todo!() + } + fn set_on_error_map(&self, _map: Option>) { todo!() } diff --git a/src/query/service/tests/it/storages/testdata/columns_table.txt b/src/query/service/tests/it/storages/testdata/columns_table.txt index 547759bf9b99a..e8875c768bffc 100644 --- a/src/query/service/tests/it/storages/testdata/columns_table.txt +++ b/src/query/service/tests/it/storages/testdata/columns_table.txt @@ -128,6 +128,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | "labels" | "system" | "metrics" | "String" | "VARCHAR" | "" | "" | "NO" | "" | | "level" | "system" | "settings" | "String" | "VARCHAR" | "" | "" | "NO" | "" | | "license" | "system" | "credits" | "String" | "VARCHAR" | "" | "" | "NO" | "" | +| "location" | "system" | "query_cache" | "String" | "VARCHAR" | "" | "" | "NO" | "" | | "log_type" | "system" | "query_log" | "Int8" | "TINYINT" | "" | "" | "NO" | "" | | "memory_usage" | "system" | "processes" | "Int64" | "BIGINT" | "" | "" | "NO" | "" | | "memory_usage" | "system" | "query_log" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" | @@ -150,6 +151,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | "name" | "system" | "users" | "String" | "VARCHAR" | "" | "" | "NO" | "" | | "non_unique" | "information_schema" | "statistics" | "NULL" | "NULL" | "" | "" | "NO" | "" | | "nullable" | "information_schema" | "statistics" | "NULL" | "NULL" | "" | "" | "NO" | "" | +| "num_rows" | "system" | "query_cache" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" | | "num_rows" | "system" | "tables" | "Nullable(UInt64)" | "BIGINT UNSIGNED" | "" | "" | "YES" | "" | | "num_rows" | "system" | "tables_with_history" | "Nullable(UInt64)" | "BIGINT UNSIGNED" | "" | "" | "YES" | "" | | "number_of_files" | "system" | "stages" | "Nullable(UInt64)" | "BIGINT UNSIGNED" | "" | "" | "YES" | "" | @@ -159,6 +161,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | "ordinal_position" | "information_schema" | "columns" | "UInt8" | "TINYINT UNSIGNED" | "" | "" | "NO" | "" | | "ordinal_position" | "information_schema" | "key_column_usage" | "NULL" | "NULL" | "" | "" | "NO" | "" | | "packed" | "information_schema" | "statistics" | "NULL" | "NULL" | "" | "" | "NO" | "" | +| "partitions_sha" | "system" | "query_cache" | "Array(String)" | "ARRAY(STRING)" | "" | "" | "NO" | "" | | "port" | "system" | "clusters" | "UInt16" | "SMALLINT UNSIGNED" | "" | "" | "NO" | "" | | "position_in_unique_constraint" | "information_schema" | "key_column_usage" | "NULL" | "NULL" | "" | "" | "NO" | "" | | "projections" | "system" | "query_log" | "String" | "VARCHAR" | "" | "" | "NO" | "" | @@ -175,6 +178,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | "reserved" | "information_schema" | "keywords" | "UInt8" | "TINYINT UNSIGNED" | "" | "" | "NO" | "" | | "result_bytes" | "system" | "query_log" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" | | "result_rows" | "system" | "query_log" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" | +| "result_size" | "system" | "query_cache" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" | | "scan_bytes" | "system" | "query_log" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" | | "scan_io_bytes" | "system" | "query_log" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" | | "scan_io_bytes_cost_ms" | "system" | "query_log" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" | @@ -187,6 +191,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | "seq_in_index" | "information_schema" | "statistics" | "NULL" | "NULL" | "" | "" | "NO" | "" | | "server_version" | "system" | "query_log" | "String" | "VARCHAR" | "" | "" | "NO" | "" | | "session_settings" | "system" | "query_log" | "String" | "VARCHAR" | "" | "" | "NO" | "" | +| "sql" | "system" | "query_cache" | "String" | "VARCHAR" | "" | "" | "NO" | "" | | "sql_path" | "information_schema" | "schemata" | "NULL" | "NULL" | "" | "" | "NO" | "" | | "sql_user" | "system" | "query_log" | "String" | "VARCHAR" | "" | "" | "NO" | "" | | "sql_user_privileges" | "system" | "query_log" | "String" | "VARCHAR" | "" | "" | "NO" | "" | diff --git a/src/query/sql/src/lib.rs b/src/query/sql/src/lib.rs index ea81d9e816cf1..bb8d68861f85d 100644 --- a/src/query/sql/src/lib.rs +++ b/src/query/sql/src/lib.rs @@ -14,6 +14,7 @@ #![allow(clippy::uninlined_format_args)] #![feature(box_patterns)] +#![feature(let_chains)] pub mod evaluator; pub mod executor; diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index 0439a48acf054..acfcd864593dd 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -1445,6 +1445,7 @@ impl<'a> TypeChecker<'a> { "ifnull", "is_null", "coalesce", + "last_query_id", ] } @@ -1642,6 +1643,60 @@ impl<'a> TypeChecker<'a> { ) } + ("last_query_id", args) => { + // last_query_id(index) returns query_id in current session by index + // index support literal(eg: -1, -2, 2) and simple binary op(eg: 1+1, 3-1) + // if index out of range, returns none. + let index = if args.len() != 1 { + -1 + } else { + match args[0] { + Expr::BinaryOp { + op, left, right, .. + } => { + if let Expr::Literal {span:_, lit:Literal::Integer(l)} = **left + && let Expr::Literal {span:_, lit:Literal::Integer(r)} = **right { + match op { + BinaryOperator::Plus => (l + r) as i32, + BinaryOperator::Minus => (l - r) as i32, + _ => -1, + } + } else {-1} + } + Expr::UnaryOp { op, expr, .. } => { + if let Expr::Literal { + span: _, + lit: Literal::Integer(i), + } = **expr + { + match op { + UnaryOperator::Plus => i as i32, + UnaryOperator::Minus => -(i as i32), + UnaryOperator::Not => -1, + } + } else { + -1 + } + } + Expr::Literal { + lit: Literal::Integer(i), + .. + } => *i as i32, + _ => -1, + } + }; + let query_id = self.ctx.get_last_query_id(index); + Some( + self.resolve( + &Expr::Literal { + span, + lit: Literal::String(query_id), + }, + None, + ) + .await, + ) + } _ => None, } } diff --git a/src/query/storages/result_cache/src/common.rs b/src/query/storages/result_cache/src/common.rs index 0f10d21a183e8..f79385c9cf6bc 100644 --- a/src/query/storages/result_cache/src/common.rs +++ b/src/query/storages/result_cache/src/common.rs @@ -28,17 +28,22 @@ pub fn gen_result_cache_key(raw: &str) -> String { } #[inline(always)] -pub(crate) fn gen_result_cache_meta_key(tenant: &str, key: &str) -> String { +pub fn gen_result_cache_meta_key(tenant: &str, key: &str) -> String { format!("{RESULT_CACHE_PREFIX}/{tenant}/{key}") } +#[inline(always)] +pub fn gen_result_cache_prefix(tenant: &str) -> String { + format!("{RESULT_CACHE_PREFIX}/{tenant}/") +} + #[inline(always)] pub(crate) fn gen_result_cache_dir(key: &str) -> String { format!("{RESULT_CACHE_PREFIX}/{key}") } #[derive(serde::Serialize, serde::Deserialize)] -pub(crate) struct ResultCacheValue { +pub struct ResultCacheValue { /// The original query SQL. pub sql: String, /// The query time. diff --git a/src/query/storages/result_cache/src/lib.rs b/src/query/storages/result_cache/src/lib.rs index bcb868770948d..1f68819d6d3e1 100644 --- a/src/query/storages/result_cache/src/lib.rs +++ b/src/query/storages/result_cache/src/lib.rs @@ -20,5 +20,8 @@ mod read; mod write; pub use common::gen_result_cache_key; +pub use common::gen_result_cache_meta_key; +pub use common::gen_result_cache_prefix; +pub use meta_manager::ResultCacheMetaManager; pub use read::ResultCacheReader; pub use write::WriteResultCacheSink; diff --git a/src/query/storages/result_cache/src/meta_manager.rs b/src/query/storages/result_cache/src/meta_manager.rs index b7501a905ee8e..2d9b1352aca40 100644 --- a/src/query/storages/result_cache/src/meta_manager.rs +++ b/src/query/storages/result_cache/src/meta_manager.rs @@ -25,23 +25,28 @@ use common_meta_types::UpsertKV; use crate::common::ResultCacheValue; -pub(super) struct ResultCacheMetaManager { - key: String, +pub struct ResultCacheMetaManager { ttl: u64, inner: Arc, } impl ResultCacheMetaManager { - pub fn create(inner: Arc, key: String, ttl: u64) -> Self { - Self { key, ttl, inner } + pub fn create(inner: Arc, ttl: u64) -> Self { + Self { ttl, inner } } - pub async fn set(&self, value: ResultCacheValue, seq: MatchSeq, expire_at: u64) -> Result<()> { + pub async fn set( + &self, + key: String, + value: ResultCacheValue, + seq: MatchSeq, + expire_at: u64, + ) -> Result<()> { let value = serde_json::to_vec(&value)?; let _ = self .inner .upsert_kv(UpsertKV { - key: self.key.clone(), + key, seq, value: Operation::Update(value), value_meta: Some(KVMeta { @@ -52,8 +57,8 @@ impl ResultCacheMetaManager { Ok(()) } - pub async fn get(&self) -> Result> { - let raw = self.inner.get_kv(&self.key).await?; + pub async fn get(&self, key: String) -> Result> { + let raw = self.inner.get_kv(&key).await?; match raw { None => Ok(None), Some(SeqV { data, .. }) => { @@ -63,6 +68,19 @@ impl ResultCacheMetaManager { } } + pub async fn list(&self, prefix: &str) -> Result> { + let result = self.inner.prefix_list_kv(prefix).await?; + + let mut r = vec![]; + for (_key, val) in result { + let u = serde_json::from_slice::(&val.data)?; + + r.push(u); + } + + Ok(r) + } + pub fn get_ttl(&self) -> u64 { self.ttl } diff --git a/src/query/storages/result_cache/src/read/reader.rs b/src/query/storages/result_cache/src/read/reader.rs index 01f3824d48bcc..0d468494f2b95 100644 --- a/src/query/storages/result_cache/src/read/reader.rs +++ b/src/query/storages/result_cache/src/read/reader.rs @@ -27,6 +27,7 @@ use crate::meta_manager::ResultCacheMetaManager; pub struct ResultCacheReader { meta_mgr: ResultCacheMetaManager, + meta_key: String, operator: Operator, /// To ensure the cache is valid. @@ -49,15 +50,28 @@ impl ResultCacheReader { let partitions_shas = ctx.get_partitions_shas(); Self { - meta_mgr: ResultCacheMetaManager::create(kv_store, meta_key, 0), + meta_mgr: ResultCacheMetaManager::create(kv_store, 0), + meta_key, partitions_shas, operator: DataOperator::instance().operator(), tolerate_inconsistent, } } + pub fn get_meta_key(&self) -> String { + self.meta_key.clone() + } + pub async fn try_read_cached_result(&self) -> Result>> { - match self.meta_mgr.get().await? { + self.try_read_cached_result_with_meta_key(self.meta_key.clone()) + .await + } + + pub async fn try_read_cached_result_with_meta_key( + &self, + meta_key: String, + ) -> Result>> { + match self.meta_mgr.get(meta_key).await? { Some(value) => { if self.tolerate_inconsistent || value.partitions_shas == self.partitions_shas { if value.num_rows == 0 { diff --git a/src/query/storages/result_cache/src/write/sink.rs b/src/query/storages/result_cache/src/write/sink.rs index 13722cbca068f..1071e1d27f423 100644 --- a/src/query/storages/result_cache/src/write/sink.rs +++ b/src/query/storages/result_cache/src/write/sink.rs @@ -33,10 +33,12 @@ use crate::common::ResultCacheValue; use crate::meta_manager::ResultCacheMetaManager; pub struct WriteResultCacheSink { + ctx: Arc, sql: String, partitions_shas: Vec, meta_mgr: ResultCacheMetaManager, + meta_key: String, cache_writer: ResultCacheWriter, } @@ -77,7 +79,11 @@ impl AsyncMpscSink for WriteResultCacheSink { num_rows: self.cache_writer.num_rows(), location, }; - self.meta_mgr.set(value, MatchSeq::GE(0), expire_at).await?; + self.meta_mgr + .set(self.meta_key.clone(), value, MatchSeq::GE(0), expire_at) + .await?; + self.ctx + .set_query_id_result_cache(self.ctx.get_id(), self.meta_key.clone()); Ok(()) } } @@ -105,9 +111,11 @@ impl WriteResultCacheSink { Ok(ProcessorPtr::create(Box::new(AsyncMpscSinker::create( inputs, WriteResultCacheSink { + ctx, sql, partitions_shas, - meta_mgr: ResultCacheMetaManager::create(kv_store, meta_key, ttl), + meta_mgr: ResultCacheMetaManager::create(kv_store, ttl), + meta_key, cache_writer, }, )))) diff --git a/src/query/storages/system/Cargo.toml b/src/query/storages/system/Cargo.toml index 7135f25b27159..e1e61df945bb7 100644 --- a/src/query/storages/system/Cargo.toml +++ b/src/query/storages/system/Cargo.toml @@ -25,6 +25,7 @@ common-pipeline-core = { path = "../../pipeline/core" } common-pipeline-sources = { path = "../../pipeline/sources" } common-sql = { path = "../../../query/sql" } common-storages-fuse = { path = "../fuse" } +common-storages-result-cache = { path = "../result_cache" } common-storages-view = { path = "../view" } common-users = { path = "../../users" } diff --git a/src/query/storages/system/src/lib.rs b/src/query/storages/system/src/lib.rs index 56c9d3f70e908..b9518a8d1930e 100644 --- a/src/query/storages/system/src/lib.rs +++ b/src/query/storages/system/src/lib.rs @@ -32,6 +32,7 @@ mod malloc_stats_totals_table; mod metrics_table; mod one_table; mod processes_table; +mod query_cache_table; mod query_log_table; mod roles_table; mod settings_table; @@ -62,6 +63,7 @@ pub use malloc_stats_totals_table::MallocStatsTotalsTable; pub use metrics_table::MetricsTable; pub use one_table::OneTable; pub use processes_table::ProcessesTable; +pub use query_cache_table::QueryCacheTable; pub use query_log_table::LogType; pub use query_log_table::QueryLogElement; pub use query_log_table::QueryLogQueue; diff --git a/src/query/storages/system/src/query_cache_table.rs b/src/query/storages/system/src/query_cache_table.rs new file mode 100644 index 0000000000000..fe4cdc953250d --- /dev/null +++ b/src/query/storages/system/src/query_cache_table.rs @@ -0,0 +1,120 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_catalog::table::Table; +use common_catalog::table_context::TableContext; +use common_exception::Result; +use common_expression::types::NumberDataType; +use common_expression::types::StringType; +use common_expression::types::UInt64Type; +use common_expression::DataBlock; +use common_expression::FromData; +use common_expression::TableDataType; +use common_expression::TableField; +use common_expression::TableSchemaRefExt; +use common_meta_app::schema::TableIdent; +use common_meta_app::schema::TableInfo; +use common_meta_app::schema::TableMeta; +use common_storages_result_cache::gen_result_cache_prefix; +use common_storages_result_cache::ResultCacheMetaManager; +use common_users::UserApiProvider; +use itertools::Itertools; + +use crate::table::AsyncOneBlockSystemTable; +use crate::table::AsyncSystemTable; + +pub struct QueryCacheTable { + table_info: TableInfo, +} + +#[async_trait::async_trait] +impl AsyncSystemTable for QueryCacheTable { + const NAME: &'static str = "system.query_cache"; + + fn get_table_info(&self) -> &TableInfo { + &self.table_info + } + + async fn get_full_data(&self, ctx: Arc) -> Result { + let meta_client = UserApiProvider::instance().get_meta_store_client(); + let result_cache_mgr = ResultCacheMetaManager::create(meta_client, 0); + let tenant = ctx.get_tenant(); + let prefix = gen_result_cache_prefix(&tenant); + + let cached_values = result_cache_mgr.list(prefix.as_str()).await?; + + let mut sql_vec: Vec<&str> = Vec::with_capacity(cached_values.len()); + let mut result_size_vec = Vec::with_capacity(cached_values.len()); + let mut num_rows_vec = Vec::with_capacity(cached_values.len()); + let mut partitions_sha_vec = Vec::with_capacity(cached_values.len()); + let mut location_vec = Vec::with_capacity(cached_values.len()); + + cached_values.iter().for_each(|x| { + sql_vec.push(x.sql.as_str()); + result_size_vec.push(x.result_size as u64); + num_rows_vec.push(x.num_rows as u64); + partitions_sha_vec.push(x.partitions_shas.clone()); + location_vec.push(x.location.as_str()); + }); + + let partitions_sha_vec: Vec = partitions_sha_vec + .into_iter() + .map(|part| part.into_iter().join(", ")) + .collect(); + + Ok(DataBlock::new_from_columns(vec![ + StringType::from_data(sql_vec), + UInt64Type::from_data(result_size_vec), + UInt64Type::from_data(num_rows_vec), + StringType::from_data( + partitions_sha_vec + .iter() + .map(|part_sha| part_sha.as_str()) + .collect::>(), + ), + StringType::from_data(location_vec), + ])) + } +} + +impl QueryCacheTable { + pub fn create(table_id: u64) -> Arc { + let schema = TableSchemaRefExt::create(vec![ + TableField::new("sql", TableDataType::String), + TableField::new("result_size", TableDataType::Number(NumberDataType::UInt64)), + TableField::new("num_rows", TableDataType::Number(NumberDataType::UInt64)), + TableField::new( + "partitions_sha", + TableDataType::Array(Box::new(TableDataType::String)), + ), + TableField::new("location", TableDataType::String), + ]); + + let table_info = TableInfo { + desc: "'system'.'query_cache'".to_string(), + name: "query_cache".to_string(), + ident: TableIdent::new(table_id, 0), + meta: TableMeta { + schema, + engine: "SystemQueryCache".to_string(), + ..Default::default() + }, + ..Default::default() + }; + + AsyncOneBlockSystemTable::create(QueryCacheTable { table_info }) + } +} diff --git a/tests/sqllogictests/suites/base/01_system/01_0011_system_query_cache b/tests/sqllogictests/suites/base/01_system/01_0011_system_query_cache new file mode 100644 index 0000000000000..bf2ee2bb372be --- /dev/null +++ b/tests/sqllogictests/suites/base/01_system/01_0011_system_query_cache @@ -0,0 +1,34 @@ +statement ok +DROP DATABASE IF EXISTS db01_0011; + +statement ok +CREATE DATABASE db01_0011; + +statement ok +USE db01_0011; + +statement ok +CREATE TABLE IF NOT EXISTS t1 (a INT); + +statement ok +INSERT INTO t1 VALUES (1), (2), (3); + +statement ok +SET enable_query_result_cache = 1; + +statement ok +SELECT * FROM t1; + +query I +SELECT num_rows FROM system.query_cache; +---- +3 + +statement ok +SET enable_query_result_cache = 0; + +statement ok +DROP TABLE t1; + +statement ok +DROP DATABASE db01_0011; \ No newline at end of file