From 5d4fdf36f3d73995ef4db8ed737a79474508752d Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 23 Nov 2021 19:03:12 +0800 Subject: [PATCH 1/8] add getter and setter for user in session --- query/src/sessions/session.rs | 11 +++++++++++ query/src/sessions/session_status.rs | 21 +++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/query/src/sessions/session.rs b/query/src/sessions/session.rs index 43bab4688d9e6..e7d4d3a804ebc 100644 --- a/query/src/sessions/session.rs +++ b/query/src/sessions/session.rs @@ -16,6 +16,7 @@ use std::net::SocketAddr; use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use common_exception::ErrorCode; use common_exception::Result; use common_macros::MallocSizeOf; use common_mem_allocator::malloc_size; @@ -149,6 +150,16 @@ impl Session { self.mutable_state.get_current_database() } + pub fn get_current_user(self: &Arc) -> Result { + self.mutable_state + .get_current_user() + .ok_or_else(|| ErrorCode::AuthenticateFailure("unauthenticated")) + } + + pub fn set_current_user(self: &Arc, user: String) -> Result<()> { + self.mutable_state.set_current_user(user) + } + pub fn get_settings(self: &Arc) -> Arc { self.mutable_state.get_settings() } diff --git a/query/src/sessions/session_status.rs b/query/src/sessions/session_status.rs index 5b837fe8208c9..2d58dcaec7836 100644 --- a/query/src/sessions/session_status.rs +++ b/query/src/sessions/session_status.rs @@ -17,6 +17,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; +use common_exception::ErrorCode; use common_exception::Result; use common_infallible::RwLock; use common_macros::MallocSizeOf; @@ -30,6 +31,7 @@ pub struct MutableStatus { abort: AtomicBool, current_database: RwLock, session_settings: RwLock, + current_user: RwLock>, #[ignore_malloc_size_of = "insignificant"] client_host: RwLock>, #[ignore_malloc_size_of = "insignificant"] @@ -42,6 +44,7 @@ impl MutableStatus { pub fn try_create() -> Result { Ok(MutableStatus { abort: Default::default(), + current_user: RwLock::new(None), current_database: RwLock::new("default".to_string()), session_settings: RwLock::new(Settings::try_create()?.as_ref().clone()), client_host: Default::default(), @@ -72,6 +75,24 @@ impl MutableStatus { *lock = db } + // Set the current user after authentication + pub fn set_current_user(&self, user: String) -> Result<()> { + let mut lock = self.currrent_user.write(); + if lock.is_some() { + return Err(ErrorCode::UnexpectedError( + "can not change user after authentication during a session", + )); + } + *lock = user; + Ok(()) + } + + // Get current user + pub fn get_current_user(&self) -> Option { + let lock = self.current_user.read(); + lock.clone() + } + pub fn get_settings(&self) -> Arc { let lock = self.session_settings.read(); Arc::new(lock.clone()) From 0e36ffd967599efb2b5f35a2801155e1a765572d Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 23 Nov 2021 20:28:49 +0800 Subject: [PATCH 2/8] set_current_user() after mysql interactive work get authed --- query/src/servers/mysql/mysql_interactive_worker.rs | 11 ++++++++--- query/src/sessions/session_status.rs | 8 ++++---- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/query/src/servers/mysql/mysql_interactive_worker.rs b/query/src/servers/mysql/mysql_interactive_worker.rs index 1fd22f9e5164a..085b3f6960f5e 100644 --- a/query/src/servers/mysql/mysql_interactive_worker.rs +++ b/query/src/servers/mysql/mysql_interactive_worker.rs @@ -208,19 +208,24 @@ impl InteractiveWorkerBase { let address = &info.user_client_address; let user_manager = self.session.get_user_manager(); - // TODO: use get_users and check client address + // TODO: list user's grant list and check client address let user_info = user_manager.get_user(user_name, "%").await?; let input = &info.user_password; let saved = &user_info.password; let encode_password = Self::encoding_password(auth_plugin, salt, input, saved)?; - user_manager + let authed = user_manager .auth_user( user_info, CertifiedInfo::create(user_name, encode_password, address), ) - .await + .await?; + if authed { + self.session.set_current_user(user_name.clone()); + } + + Ok(authed) } fn encoding_password( diff --git a/query/src/sessions/session_status.rs b/query/src/sessions/session_status.rs index 2d58dcaec7836..c8a856358c4d0 100644 --- a/query/src/sessions/session_status.rs +++ b/query/src/sessions/session_status.rs @@ -44,10 +44,10 @@ impl MutableStatus { pub fn try_create() -> Result { Ok(MutableStatus { abort: Default::default(), - current_user: RwLock::new(None), + current_user: Default::default(), + client_host: Default::default(), current_database: RwLock::new("default".to_string()), session_settings: RwLock::new(Settings::try_create()?.as_ref().clone()), - client_host: Default::default(), io_shutdown_tx: Default::default(), context_shared: Default::default(), }) @@ -77,13 +77,13 @@ impl MutableStatus { // Set the current user after authentication pub fn set_current_user(&self, user: String) -> Result<()> { - let mut lock = self.currrent_user.write(); + let mut lock = self.current_user.write(); if lock.is_some() { return Err(ErrorCode::UnexpectedError( "can not change user after authentication during a session", )); } - *lock = user; + *lock = Some(user); Ok(()) } From 8e623d464ff10f2ed7cb21f07e06af3cb25442d0 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 23 Nov 2021 20:48:37 +0800 Subject: [PATCH 3/8] add user field in the ProcessInfo --- query/src/sessions/session.rs | 2 +- query/src/sessions/session_info.rs | 2 ++ query/src/sessions/session_status.rs | 8 +------- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/query/src/sessions/session.rs b/query/src/sessions/session.rs index e7d4d3a804ebc..5b04b4dcc698c 100644 --- a/query/src/sessions/session.rs +++ b/query/src/sessions/session.rs @@ -156,7 +156,7 @@ impl Session { .ok_or_else(|| ErrorCode::AuthenticateFailure("unauthenticated")) } - pub fn set_current_user(self: &Arc, user: String) -> Result<()> { + pub fn set_current_user(self: &Arc, user: String) { self.mutable_state.set_current_user(user) } diff --git a/query/src/sessions/session_info.rs b/query/src/sessions/session_info.rs index f33be6f00e639..0bed323eb35e0 100644 --- a/query/src/sessions/session_info.rs +++ b/query/src/sessions/session_info.rs @@ -24,6 +24,7 @@ pub struct ProcessInfo { pub typ: String, pub state: String, pub database: String, + pub user: String, #[allow(unused)] pub settings: Arc, pub client_address: Option, @@ -53,6 +54,7 @@ impl Session { typ: self.typ.clone(), state: self.process_state(status), database: status.get_current_database(), + user: status.get_current_user().unwrap_or("".into()), settings: status.get_settings(), client_address: status.get_client_host(), session_extra_info: self.process_extra_info(status), diff --git a/query/src/sessions/session_status.rs b/query/src/sessions/session_status.rs index c8a856358c4d0..507e943f5490e 100644 --- a/query/src/sessions/session_status.rs +++ b/query/src/sessions/session_status.rs @@ -76,15 +76,9 @@ impl MutableStatus { } // Set the current user after authentication - pub fn set_current_user(&self, user: String) -> Result<()> { + pub fn set_current_user(&self, user: String) { let mut lock = self.current_user.write(); - if lock.is_some() { - return Err(ErrorCode::UnexpectedError( - "can not change user after authentication during a session", - )); - } *lock = Some(user); - Ok(()) } // Get current user From 414cb1f5f814cac04d5e650dfb668a62070f11a2 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 23 Nov 2021 21:13:29 +0800 Subject: [PATCH 4/8] record user on auth clickhouse session --- query/src/servers/clickhouse/interactive_worker.rs | 5 ++++- query/src/sessions/session_status.rs | 1 - 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/query/src/servers/clickhouse/interactive_worker.rs b/query/src/servers/clickhouse/interactive_worker.rs index f2d33d883b171..5bdc8ade032c4 100644 --- a/query/src/servers/clickhouse/interactive_worker.rs +++ b/query/src/servers/clickhouse/interactive_worker.rs @@ -109,7 +109,10 @@ impl ClickHouseSession for InteractiveWorker { Err(err) => Err(err), }; match res { - Ok(res) => res, + Ok(res) => { + self.session.set_current_user(user.to_string()); + res + } Err(failure) => { log::error!( "ClickHouse handler authenticate failed, \ diff --git a/query/src/sessions/session_status.rs b/query/src/sessions/session_status.rs index 507e943f5490e..688b9a91f7ede 100644 --- a/query/src/sessions/session_status.rs +++ b/query/src/sessions/session_status.rs @@ -17,7 +17,6 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -use common_exception::ErrorCode; use common_exception::Result; use common_infallible::RwLock; use common_macros::MallocSizeOf; From 5eebf25a54af3fc8f356f6aa529efa43ab08bb24 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 23 Nov 2021 21:13:43 +0800 Subject: [PATCH 5/8] add user field in system.processes --- query/src/datasources/database/system/processes_table.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/query/src/datasources/database/system/processes_table.rs b/query/src/datasources/database/system/processes_table.rs index 208209df6ee16..6a70d2985c418 100644 --- a/query/src/datasources/database/system/processes_table.rs +++ b/query/src/datasources/database/system/processes_table.rs @@ -42,6 +42,7 @@ impl ProcessesTable { DataField::new("id", DataType::String, false), DataField::new("type", DataType::String, false), DataField::new("host", DataType::String, true), + DataField::new("user", DataType::String, true), DataField::new("state", DataType::String, false), DataField::new("database", DataType::String, false), DataField::new("extra_info", DataType::String, true), @@ -96,6 +97,7 @@ impl Table for ProcessesTable { let mut processes_id = Vec::with_capacity(processes_info.len()); let mut processes_type = Vec::with_capacity(processes_info.len()); let mut processes_host = Vec::with_capacity(processes_info.len()); + let mut processes_user = Vec::with_capacity(processes_info.len()); let mut processes_state = Vec::with_capacity(processes_info.len()); let mut processes_database = Vec::with_capacity(processes_info.len()); let mut processes_extra_info = Vec::with_capacity(processes_info.len()); @@ -107,6 +109,7 @@ impl Table for ProcessesTable { processes_state.push(process_info.state.clone().into_bytes()); processes_database.push(process_info.database.clone().into_bytes()); processes_host.push(ProcessesTable::process_host(process_info)); + processes_user.push(process_info.user.clone().into_bytes()); processes_extra_info.push(ProcessesTable::process_extra_info(process_info)); processes_memory_usage.push(process_info.memory_usage); } @@ -116,6 +119,7 @@ impl Table for ProcessesTable { Series::new(processes_id), Series::new(processes_type), Series::new(processes_host), + Series::new(processes_user), Series::new(processes_state), Series::new(processes_database), Series::new(processes_extra_info), From e0e57188f0acc006b5b487361c44f4e4560cd260 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 23 Nov 2021 21:49:58 +0800 Subject: [PATCH 6/8] fix cargo check --- query/src/sessions/session_info.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/query/src/sessions/session_info.rs b/query/src/sessions/session_info.rs index 0bed323eb35e0..cbd389123e9ae 100644 --- a/query/src/sessions/session_info.rs +++ b/query/src/sessions/session_info.rs @@ -54,7 +54,7 @@ impl Session { typ: self.typ.clone(), state: self.process_state(status), database: status.get_current_database(), - user: status.get_current_user().unwrap_or("".into()), + user: status.get_current_user().unwrap_or_else(|| "".into()), settings: status.get_settings(), client_address: status.get_client_host(), session_extra_info: self.process_extra_info(status), From 42d5888bbcec457d07766bde655a81ce0b383bd4 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Wed, 24 Nov 2021 10:44:43 +0800 Subject: [PATCH 7/8] add test in session_status_test.rs --- query/src/sessions/session_status_test.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/query/src/sessions/session_status_test.rs b/query/src/sessions/session_status_test.rs index a411b9a037d00..a466539b51962 100644 --- a/query/src/sessions/session_status_test.rs +++ b/query/src/sessions/session_status_test.rs @@ -57,6 +57,14 @@ fn test_session_status() -> Result<()> { assert_eq!(Some(server), val); } + // Current user. + { + mutable_status.set_current_user("user1".to_string()); + + let val = mutable_status.get_current_user(); + assert_eq!(Some("user1".to_string()), val); + } + // io shutdown tx. { let (tx, _) = futures::channel::oneshot::channel(); From eeeb8cd9349dce21f2881310272580cf4112e4c1 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Wed, 24 Nov 2021 11:50:08 +0800 Subject: [PATCH 8/8] fix test --- query/src/sql/plan_parser_test.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/query/src/sql/plan_parser_test.rs b/query/src/sql/plan_parser_test.rs index a27c82913e154..915c2fa6b8f6d 100644 --- a/query/src/sql/plan_parser_test.rs +++ b/query/src/sql/plan_parser_test.rs @@ -213,8 +213,8 @@ fn test_plan_parser() -> Result<()> { name: "show-processlist", sql: "show processlist", expect: "\ - Projection: id:String, type:String, host:String, state:String, database:String, extra_info:String, memory_usage:UInt64\ - \n ReadDataSource: scan partitions: [1], scan schema: [id:String, type:String, host:String;N, state:String, database:String, extra_info:String;N, memory_usage:UInt64;N], statistics: [read_rows: 0, read_bytes: 0]", + Projection: id:String, type:String, host:String, user:String, state:String, database:String, extra_info:String, memory_usage:UInt64\ + \n ReadDataSource: scan partitions: [1], scan schema: [id:String, type:String, host:String;N, user:String;N, state:String, database:String, extra_info:String;N, memory_usage:UInt64;N], statistics: [read_rows: 0, read_bytes: 0]", error: "", }, ];