Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-2779] Add user in session #2986

Merged
merged 9 commits into from
Nov 24, 2021
4 changes: 4 additions & 0 deletions query/src/datasources/database/system/processes_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl ProcessesTable {
DataField::new("id", DataType::String, false),
DataField::new("type", DataType::String, false),
DataField::new("host", DataType::String, true),
DataField::new("user", DataType::String, true),
DataField::new("state", DataType::String, false),
DataField::new("database", DataType::String, false),
DataField::new("extra_info", DataType::String, true),
Expand Down Expand Up @@ -96,6 +97,7 @@ impl Table for ProcessesTable {
let mut processes_id = Vec::with_capacity(processes_info.len());
let mut processes_type = Vec::with_capacity(processes_info.len());
let mut processes_host = Vec::with_capacity(processes_info.len());
let mut processes_user = Vec::with_capacity(processes_info.len());
let mut processes_state = Vec::with_capacity(processes_info.len());
let mut processes_database = Vec::with_capacity(processes_info.len());
let mut processes_extra_info = Vec::with_capacity(processes_info.len());
Expand All @@ -107,6 +109,7 @@ impl Table for ProcessesTable {
processes_state.push(process_info.state.clone().into_bytes());
processes_database.push(process_info.database.clone().into_bytes());
processes_host.push(ProcessesTable::process_host(process_info));
processes_user.push(process_info.user.clone().into_bytes());
processes_extra_info.push(ProcessesTable::process_extra_info(process_info));
processes_memory_usage.push(process_info.memory_usage);
}
Expand All @@ -116,6 +119,7 @@ impl Table for ProcessesTable {
Series::new(processes_id),
Series::new(processes_type),
Series::new(processes_host),
Series::new(processes_user),
Series::new(processes_state),
Series::new(processes_database),
Series::new(processes_extra_info),
Expand Down
5 changes: 4 additions & 1 deletion query/src/servers/clickhouse/interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ impl ClickHouseSession for InteractiveWorker {
Err(err) => Err(err),
};
match res {
Ok(res) => res,
Ok(res) => {
self.session.set_current_user(user.to_string());
res
}
Err(failure) => {
log::error!(
"ClickHouse handler authenticate failed, \
Expand Down
11 changes: 8 additions & 3 deletions query/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,19 +208,24 @@ impl<W: std::io::Write> InteractiveWorkerBase<W> {
let address = &info.user_client_address;

let user_manager = self.session.get_user_manager();
// TODO: use get_users and check client address
// TODO: list user's grant list and check client address
let user_info = user_manager.get_user(user_name, "%").await?;

let input = &info.user_password;
let saved = &user_info.password;
let encode_password = Self::encoding_password(auth_plugin, salt, input, saved)?;

user_manager
let authed = user_manager
.auth_user(
user_info,
CertifiedInfo::create(user_name, encode_password, address),
)
.await
.await?;
if authed {
self.session.set_current_user(user_name.clone());
}

Ok(authed)
}

fn encoding_password(
Expand Down
11 changes: 11 additions & 0 deletions query/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_macros::MallocSizeOf;
use common_mem_allocator::malloc_size;
Expand Down Expand Up @@ -149,6 +150,16 @@ impl Session {
self.mutable_state.get_current_database()
}

pub fn get_current_user(self: &Arc<Self>) -> Result<String> {
self.mutable_state
.get_current_user()
.ok_or_else(|| ErrorCode::AuthenticateFailure("unauthenticated"))
}

pub fn set_current_user(self: &Arc<Self>, user: String) {
self.mutable_state.set_current_user(user)
}

pub fn get_settings(self: &Arc<Self>) -> Arc<Settings> {
self.mutable_state.get_settings()
}
Expand Down
2 changes: 2 additions & 0 deletions query/src/sessions/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct ProcessInfo {
pub typ: String,
pub state: String,
pub database: String,
pub user: String,
#[allow(unused)]
pub settings: Arc<Settings>,
pub client_address: Option<SocketAddr>,
Expand Down Expand Up @@ -53,6 +54,7 @@ impl Session {
typ: self.typ.clone(),
state: self.process_state(status),
database: status.get_current_database(),
user: status.get_current_user().unwrap_or_else(|| "".into()),
settings: status.get_settings(),
client_address: status.get_client_host(),
session_extra_info: self.process_extra_info(status),
Expand Down
16 changes: 15 additions & 1 deletion query/src/sessions/session_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct MutableStatus {
abort: AtomicBool,
current_database: RwLock<String>,
session_settings: RwLock<Settings>,
current_user: RwLock<Option<String>>,
Copy link
Member

@BohuTANG BohuTANG Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be the UserInfo in the future not only a user name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm taking UserInfo at first but reverted it back to a simple user name, in the current code base UserInfo is a fat object:

  pub struct UserInfo {
      pub name: String,
      pub hostname: String,
      pub password: Vec<u8>,
      pub auth_type: AuthType,
      pub privileges: UserPrivilege,
      pub quota: UserQuota,
  }

the fields inside UserInfo might get mutated on the fly (like quota, privileges, even password), or mutated by another query instance. a conservative usage of UserInfo might be always fetching UserInfo from the metasrv, and make it transactional with the user-related operations.

in the case about session privilege, we can construct a unique identifier for the privilege grants by user_name + client_address.

#[ignore_malloc_size_of = "insignificant"]
client_host: RwLock<Option<SocketAddr>>,
#[ignore_malloc_size_of = "insignificant"]
Expand All @@ -42,9 +43,10 @@ impl MutableStatus {
pub fn try_create() -> Result<Self> {
Ok(MutableStatus {
abort: Default::default(),
current_user: Default::default(),
client_host: Default::default(),
current_database: RwLock::new("default".to_string()),
session_settings: RwLock::new(Settings::try_create()?.as_ref().clone()),
client_host: Default::default(),
io_shutdown_tx: Default::default(),
context_shared: Default::default(),
})
Expand Down Expand Up @@ -72,6 +74,18 @@ impl MutableStatus {
*lock = db
}

// Set the current user after authentication
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to add a unit test in test crate we have some MutableStatus tests there

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BohuTANG thank you for remind, i've added a test case on the getter/setter for current user

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to test the behaviour of session's current user, i'd split another PR to add a current_user() function

pub fn set_current_user(&self, user: String) {
let mut lock = self.current_user.write();
*lock = Some(user);
}

// Get current user
pub fn get_current_user(&self) -> Option<String> {
let lock = self.current_user.read();
lock.clone()
}

pub fn get_settings(&self) -> Arc<Settings> {
let lock = self.session_settings.read();
Arc::new(lock.clone())
Expand Down
8 changes: 8 additions & 0 deletions query/src/sessions/session_status_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ fn test_session_status() -> Result<()> {
assert_eq!(Some(server), val);
}

// Current user.
{
mutable_status.set_current_user("user1".to_string());

let val = mutable_status.get_current_user();
assert_eq!(Some("user1".to_string()), val);
}

// io shutdown tx.
{
let (tx, _) = futures::channel::oneshot::channel();
Expand Down
4 changes: 2 additions & 2 deletions query/src/sql/plan_parser_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ fn test_plan_parser() -> Result<()> {
name: "show-processlist",
sql: "show processlist",
expect: "\
Projection: id:String, type:String, host:String, state:String, database:String, extra_info:String, memory_usage:UInt64\
\n ReadDataSource: scan partitions: [1], scan schema: [id:String, type:String, host:String;N, state:String, database:String, extra_info:String;N, memory_usage:UInt64;N], statistics: [read_rows: 0, read_bytes: 0]",
Projection: id:String, type:String, host:String, user:String, state:String, database:String, extra_info:String, memory_usage:UInt64\
\n ReadDataSource: scan partitions: [1], scan schema: [id:String, type:String, host:String;N, user:String;N, state:String, database:String, extra_info:String;N, memory_usage:UInt64;N], statistics: [read_rows: 0, read_bytes: 0]",
error: "",
},
];
Expand Down