Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): Support query result cache system table and table function #10159

Merged
merged 20 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion scripts/distribution/local-scripts/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ echo "Stop old Databend instances"
killall -9 databend-meta
killall -9 databend-query
echo "Deploy new Databend(standalone)"
ulimit -n 65535
ulimit -n 65535
nohup bin/databend-meta --config-file=configs/databend-meta.toml 2>&1 >meta.log &
sleep 3
# export STORAGE_S3_ENABLE_VIRTUAL_HOST_STYLE=true
Expand Down
12 changes: 6 additions & 6 deletions scripts/distribution/package-scripts/postinstall.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ set -e
usermod --append --groups adm databend || true

if getent group 'systemd-journal'; then
# Add Databend to systemd-journal to read journald logs
usermod --append --groups systemd-journal databend || true
systemctl daemon-reload || true
# Add Databend to systemd-journal to read journald logs
usermod --append --groups systemd-journal databend || true
systemctl daemon-reload || true
fi

if getent group 'systemd-journal-remote'; then
# Add Databend to systemd-journal-remote to read remote journald logs
usermod --append --groups systemd-journal-remote databend || true
systemctl daemon-reload || true
# Add Databend to systemd-journal-remote to read remote journald logs
usermod --append --groups systemd-journal-remote databend || true
systemctl daemon-reload || true
fi
4 changes: 2 additions & 2 deletions scripts/distribution/package-scripts/preinstall.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ set -e

# Add databend:databend user & group
id --user databend >/dev/null 2>&1 ||
useradd --system --shell /sbin/nologin --home-dir /var/lib/databend --user-group \
--comment "Databend cloud data analytics" databend
useradd --system --shell /sbin/nologin --home-dir /var/lib/databend --user-group \
--comment "Databend cloud data analytics" databend

# Create default Databend data directory
mkdir -p /var/lib/databend
Expand Down
3 changes: 3 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ pub trait TableContext: Send + Sync {
fn get_cluster(&self) -> Arc<Cluster>;
fn get_processes_info(&self) -> Vec<ProcessInfo>;
fn get_stage_attachment(&self) -> Option<StageAttachment>;
fn get_last_query_id(&self, index: i32) -> String;
fn get_result_cache_key(&self, query_id: &str) -> Option<String>;
fn set_query_id_result_cache(&self, query_id: String, result_cache_key: String);
fn set_on_error_map(&self, map: Option<HashMap<String, ErrorCode>>);

fn apply_changed_settings(&self, changed_settings: Arc<Settings>) -> Result<()>;
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/databases/system/system_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use common_storages_system::MallocStatsTotalsTable;
use common_storages_system::MetricsTable;
use common_storages_system::OneTable;
use common_storages_system::ProcessesTable;
use common_storages_system::QueryCacheTable;
use common_storages_system::QueryLogTable;
use common_storages_system::RolesTable;
use common_storages_system::SettingsTable;
Expand Down Expand Up @@ -86,6 +87,7 @@ impl SystemDatabase {
StagesTable::create(sys_db_meta.next_table_id()),
BuildOptionsTable::create(sys_db_meta.next_table_id()),
CatalogsTable::create(sys_db_meta.next_table_id()),
QueryCacheTable::create(sys_db_meta.next_table_id()),
];

for tbl in table_list.into_iter() {
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/src/interpreters/interpreter_select_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ impl Interpreter for SelectInterpreterV2 {
// 2. Check the cache.
match cache_reader.try_read_cached_result().await {
Ok(Some(blocks)) => {
// 2.0 update query_id -> result_cache_meta_key in session.
self.ctx
.get_current_session()
.update_query_ids_results(self.ctx.get_id(), cache_reader.get_meta_key());
// 2.1 If found, return the result directly.
return PipelineBuildResult::from_blocks(blocks);
}
Expand Down
18 changes: 18 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,24 @@ impl TableContext for QueryContext {
self.shared.get_stage_attachment()
}

fn get_last_query_id(&self, index: i32) -> String {
self.shared.session.session_ctx.get_last_query_id(index)
}

fn get_result_cache_key(&self, query_id: &str) -> Option<String> {
self.shared
.session
.session_ctx
.get_query_result_cache_key(query_id)
}

fn set_query_id_result_cache(&self, query_id: String, result_cache_key: String) {
self.shared
.session
.session_ctx
.update_query_ids_results(query_id, Some(result_cache_key))
}

fn set_on_error_map(&self, map: Option<HashMap<String, ErrorCode>>) {
self.shared.set_on_error_map(map);
}
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,14 @@ impl QueryContextShared {
}
}

impl Drop for QueryContextShared {
fn drop(&mut self) {
self.session
.session_ctx
.update_query_ids_results(self.init_query_id.read().clone(), None)
}
}
ariesdevil marked this conversation as resolved.
Show resolved Hide resolved

pub fn short_sql(query: &str) -> String {
use unicode_segmentation::UnicodeSegmentation;
let query = query.trim_start();
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,15 @@ impl Session {
pub fn get_status(self: &Arc<Self>) -> Arc<RwLock<SessionStatus>> {
self.status.clone()
}

pub fn get_query_result_cache_key(self: &Arc<Self>, query_id: &str) -> Option<String> {
self.session_ctx.get_query_result_cache_key(query_id)
}

pub fn update_query_ids_results(self: &Arc<Self>, query_id: String, result_cache_key: String) {
self.session_ctx
.update_query_ids_results(query_id, Some(result_cache_key))
}
}

impl Drop for Session {
Expand Down
45 changes: 45 additions & 0 deletions src/query/service/src/sessions/session_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ pub struct SessionContext {
client_host: RwLock<Option<SocketAddr>>,
io_shutdown_tx: RwLock<Option<Sender<Sender<()>>>>,
query_context_shared: RwLock<Weak<QueryContextShared>>,
// We store `query_id -> query_result_cache_key` to session context, so that we can fetch
// query result through previous query_id easily.
query_ids_results: RwLock<Vec<(String, Option<String>)>>,
}

impl SessionContext {
Expand All @@ -67,6 +70,7 @@ impl SessionContext {
current_database: RwLock::new("default".to_string()),
io_shutdown_tx: Default::default(),
query_context_shared: Default::default(),
query_ids_results: Default::default(),
}))
}

Expand Down Expand Up @@ -212,4 +216,45 @@ impl SessionContext {
let mut lock = self.query_context_shared.write();
*lock = ctx
}

pub fn get_query_result_cache_key(&self, query_id: &str) -> Option<String> {
let lock = self.query_ids_results.read();
for (qid, result_cache_key) in (*lock).iter().rev() {
if qid.eq_ignore_ascii_case(query_id) {
return result_cache_key.clone();
}
}
None
}

pub fn update_query_ids_results(&self, query_id: String, value: Option<String>) {
let mut lock = self.query_ids_results.write();
// Here we use reverse iteration, as it is not common to modify elements from earlier.
for (idx, (qid, _)) in (*lock).iter().rev().enumerate() {
if qid.eq_ignore_ascii_case(&query_id) {
// update value iff value is some.
if let Some(v) = value {
(*lock)[idx] = (query_id, Some(v))
}
return;
}
}
lock.push((query_id, value))
}

pub fn get_last_query_id(&self, index: i32) -> String {
let lock = self.query_ids_results.read();
let query_ids_len = lock.len();
let idx = if index < 0 {
query_ids_len as i32 + index
} else {
index
};

if idx < 0 || idx > (query_ids_len - 1) as i32 {
return "".to_string();
}

(*lock)[idx as usize].0.clone()
}
}
10 changes: 10 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,16 @@ impl TableContext for CtxDelegation {
todo!()
}

fn get_last_query_id(&self, _index: i32) -> String {
todo!()
}
fn get_result_cache_key(&self, _query_id: &str) -> Option<String> {
todo!()
}
fn set_query_id_result_cache(&self, _query_id: String, _result_cache_key: String) {
todo!()
}

fn set_on_error_map(&self, _map: Option<HashMap<String, ErrorCode>>) {
todo!()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
| "labels" | "system" | "metrics" | "String" | "VARCHAR" | "" | "" | "NO" | "" |
| "level" | "system" | "settings" | "String" | "VARCHAR" | "" | "" | "NO" | "" |
| "license" | "system" | "credits" | "String" | "VARCHAR" | "" | "" | "NO" | "" |
| "location" | "system" | "query_cache" | "String" | "VARCHAR" | "" | "" | "NO" | "" |
| "log_type" | "system" | "query_log" | "Int8" | "TINYINT" | "" | "" | "NO" | "" |
| "memory_usage" | "system" | "processes" | "Int64" | "BIGINT" | "" | "" | "NO" | "" |
| "memory_usage" | "system" | "query_log" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" |
Expand All @@ -150,6 +151,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
| "name" | "system" | "users" | "String" | "VARCHAR" | "" | "" | "NO" | "" |
| "non_unique" | "information_schema" | "statistics" | "NULL" | "NULL" | "" | "" | "NO" | "" |
| "nullable" | "information_schema" | "statistics" | "NULL" | "NULL" | "" | "" | "NO" | "" |
| "num_rows" | "system" | "query_cache" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" |
| "num_rows" | "system" | "tables" | "Nullable(UInt64)" | "BIGINT UNSIGNED" | "" | "" | "YES" | "" |
| "num_rows" | "system" | "tables_with_history" | "Nullable(UInt64)" | "BIGINT UNSIGNED" | "" | "" | "YES" | "" |
| "number_of_files" | "system" | "stages" | "Nullable(UInt64)" | "BIGINT UNSIGNED" | "" | "" | "YES" | "" |
Expand All @@ -159,6 +161,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
| "ordinal_position" | "information_schema" | "columns" | "UInt8" | "TINYINT UNSIGNED" | "" | "" | "NO" | "" |
| "ordinal_position" | "information_schema" | "key_column_usage" | "NULL" | "NULL" | "" | "" | "NO" | "" |
| "packed" | "information_schema" | "statistics" | "NULL" | "NULL" | "" | "" | "NO" | "" |
| "partitions_sha" | "system" | "query_cache" | "Array(String)" | "ARRAY(STRING)" | "" | "" | "NO" | "" |
| "port" | "system" | "clusters" | "UInt16" | "SMALLINT UNSIGNED" | "" | "" | "NO" | "" |
| "position_in_unique_constraint" | "information_schema" | "key_column_usage" | "NULL" | "NULL" | "" | "" | "NO" | "" |
| "projections" | "system" | "query_log" | "String" | "VARCHAR" | "" | "" | "NO" | "" |
Expand All @@ -175,6 +178,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
| "reserved" | "information_schema" | "keywords" | "UInt8" | "TINYINT UNSIGNED" | "" | "" | "NO" | "" |
| "result_bytes" | "system" | "query_log" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" |
| "result_rows" | "system" | "query_log" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" |
| "result_size" | "system" | "query_cache" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" |
| "scan_bytes" | "system" | "query_log" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" |
| "scan_io_bytes" | "system" | "query_log" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" |
| "scan_io_bytes_cost_ms" | "system" | "query_log" | "UInt64" | "BIGINT UNSIGNED" | "" | "" | "NO" | "" |
Expand All @@ -187,6 +191,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
| "seq_in_index" | "information_schema" | "statistics" | "NULL" | "NULL" | "" | "" | "NO" | "" |
| "server_version" | "system" | "query_log" | "String" | "VARCHAR" | "" | "" | "NO" | "" |
| "session_settings" | "system" | "query_log" | "String" | "VARCHAR" | "" | "" | "NO" | "" |
| "sql" | "system" | "query_cache" | "String" | "VARCHAR" | "" | "" | "NO" | "" |
| "sql_path" | "information_schema" | "schemata" | "NULL" | "NULL" | "" | "" | "NO" | "" |
| "sql_user" | "system" | "query_log" | "String" | "VARCHAR" | "" | "" | "NO" | "" |
| "sql_user_privileges" | "system" | "query_log" | "String" | "VARCHAR" | "" | "" | "NO" | "" |
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#![allow(clippy::uninlined_format_args)]
#![feature(box_patterns)]
#![feature(let_chains)]

pub mod evaluator;
pub mod executor;
Expand Down
55 changes: 55 additions & 0 deletions src/query/sql/src/planner/semantic/type_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,7 @@ impl<'a> TypeChecker<'a> {
"ifnull",
"is_null",
"coalesce",
"last_query_id",
]
}

Expand Down Expand Up @@ -1642,6 +1643,60 @@ impl<'a> TypeChecker<'a> {
)
}

("last_query_id", args) => {
// last_query_id(index) returns query_id in current session by index
// index support literal(eg: -1, -2, 2) and simple binary op(eg: 1+1, 3-1)
// if index out of range, returns none.
let index = if args.len() != 1 {
ariesdevil marked this conversation as resolved.
Show resolved Hide resolved
-1
} else {
match args[0] {
Expr::BinaryOp {
op, left, right, ..
} => {
if let Expr::Literal {span:_, lit:Literal::Integer(l)} = **left
&& let Expr::Literal {span:_, lit:Literal::Integer(r)} = **right {
match op {
BinaryOperator::Plus => (l + r) as i32,
BinaryOperator::Minus => (l - r) as i32,
_ => -1,
}
} else {-1}
}
Expr::UnaryOp { op, expr, .. } => {
if let Expr::Literal {
span: _,
lit: Literal::Integer(i),
} = **expr
{
match op {
UnaryOperator::Plus => i as i32,
UnaryOperator::Minus => -(i as i32),
UnaryOperator::Not => -1,
}
} else {
-1
}
}
Expr::Literal {
lit: Literal::Integer(i),
..
} => *i as i32,
_ => -1,
}
};
let query_id = self.ctx.get_last_query_id(index);
Some(
self.resolve(
&Expr::Literal {
span,
lit: Literal::String(query_id),
},
None,
)
.await,
)
}
_ => None,
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/query/storages/result_cache/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@ pub fn gen_result_cache_key(raw: &str) -> String {
}

#[inline(always)]
pub(crate) fn gen_result_cache_meta_key(tenant: &str, key: &str) -> String {
pub fn gen_result_cache_meta_key(tenant: &str, key: &str) -> String {
format!("{RESULT_CACHE_PREFIX}/{tenant}/{key}")
}

#[inline(always)]
pub fn gen_result_cache_prefix(tenant: &str) -> String {
format!("{RESULT_CACHE_PREFIX}/{tenant}/")
}

#[inline(always)]
pub(crate) fn gen_result_cache_dir(key: &str) -> String {
format!("{RESULT_CACHE_PREFIX}/{key}")
}

#[derive(serde::Serialize, serde::Deserialize)]
pub(crate) struct ResultCacheValue {
pub struct ResultCacheValue {
/// The original query SQL.
pub sql: String,
/// The query time.
Expand Down
3 changes: 3 additions & 0 deletions src/query/storages/result_cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ mod read;
mod write;

pub use common::gen_result_cache_key;
pub use common::gen_result_cache_meta_key;
pub use common::gen_result_cache_prefix;
pub use meta_manager::ResultCacheMetaManager;
pub use read::ResultCacheReader;
pub use write::WriteResultCacheSink;
Loading