Skip to content

Commit

Permalink
chore(query): rename enable_stage_udf_priv_check to experiment_enable…
Browse files Browse the repository at this point in the history
…_new_rbac_check
  • Loading branch information
TCeason committed Nov 30, 2023
1 parent 3617f42 commit d81d369
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 107 deletions.
175 changes: 86 additions & 89 deletions src/query/service/src/interpreters/access/privilege_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ impl AccessChecker for PrivilegeAccess {
let user = self.ctx.get_current_user()?;
let (identity, grant_set) = (user.identity().to_string(), user.grants);
let tenant = self.ctx.get_tenant();
let enable_stage_udf_priv_check =
self.ctx.get_settings().get_enable_stage_udf_priv_check()?;
let enable_experimental_rbac =
self.ctx.get_settings().get_enable_new_rbac_check()?;

match plan {
Plan::Query {
Expand Down Expand Up @@ -195,7 +195,7 @@ impl AccessChecker for PrivilegeAccess {
}
_ => {}
};
if enable_stage_udf_priv_check {
if enable_experimental_rbac {
match s_expr.get_udfs() {
Ok(udfs) => {
if !udfs.is_empty() {
Expand All @@ -218,7 +218,7 @@ impl AccessChecker for PrivilegeAccess {

for table in metadata.tables() {

if enable_stage_udf_priv_check && table.is_source_of_stage() {
if enable_experimental_rbac && table.is_source_of_stage() {
match table.table().get_data_source_info() {
DataSourceInfo::StageSource(stage_info) => {
if !stage_info.stage_info.is_from_uri {
Expand Down Expand Up @@ -525,9 +525,11 @@ impl AccessChecker for PrivilegeAccess {
.await?;
}
Plan::ReclusterTable(plan) => {
if let Some(scalar) = &plan.push_downs {
let udf = get_udf_names(scalar)?;
self.check_udf_priv(udf).await?;
if enable_experimental_rbac {
if let Some(scalar) = &plan.push_downs {
let udf = get_udf_names(scalar)?;
self.check_udf_priv(udf).await?;
}
}
self.validate_access(
&GrantObject::Table(
Expand Down Expand Up @@ -624,7 +626,7 @@ impl AccessChecker for PrivilegeAccess {
.await?;
}
Plan::MergeInto(plan) => {
if enable_stage_udf_priv_check {
if enable_experimental_rbac {
let s_expr = &plan.input;
match s_expr.get_udfs() {
Ok(udfs) => {
Expand Down Expand Up @@ -679,7 +681,7 @@ impl AccessChecker for PrivilegeAccess {
.await?;
}
Plan::Delete(plan) => {
if enable_stage_udf_priv_check {
if enable_experimental_rbac {
if let Some(selection) = &plan.selection {
let udf = get_udf_names(selection)?;
self.check_udf_priv(udf).await?;
Expand Down Expand Up @@ -715,29 +717,31 @@ impl AccessChecker for PrivilegeAccess {
.await?;
}
Plan::Update(plan) => {
for scalar in plan.update_list.values() {
let udf = get_udf_names(scalar)?;
self.check_udf_priv(udf).await?;
}
if let Some(selection) = &plan.selection {
let udf = get_udf_names(selection)?;
self.check_udf_priv(udf).await?;
}
for subquery in &plan.subquery_desc {
match subquery.input_expr.get_udfs() {
Ok(udfs) => {
if !udfs.is_empty() {
for udf in udfs {
self.validate_access(
&GrantObject::UDF(udf.clone()),
vec![UserPrivilegeType::Usage],
false,
).await?
if enable_experimental_rbac {
for scalar in plan.update_list.values() {
let udf = get_udf_names(scalar)?;
self.check_udf_priv(udf).await?;
}
if let Some(selection) = &plan.selection {
let udf = get_udf_names(selection)?;
self.check_udf_priv(udf).await?;
}
for subquery in &plan.subquery_desc {
match subquery.input_expr.get_udfs() {
Ok(udfs) => {
if !udfs.is_empty() {
for udf in udfs {
self.validate_access(
&GrantObject::UDF(udf.clone()),
vec![UserPrivilegeType::Usage],
false,
).await?
}
}
}
}
Err(err) => {
return Err(err.add_message("get udf error on validating access"));
Err(err) => {
return Err(err.add_message("get udf error on validating access"));
}
}
}
}
Expand Down Expand Up @@ -851,19 +855,16 @@ impl AccessChecker for PrivilegeAccess {
}
Plan::CopyIntoTable(plan) => {
// TODO(TCeason): need to check plan.query privileges.

if enable_stage_udf_priv_check && !plan.stage_table_info.stage_info.is_from_uri {
let stage_name = &plan.stage_table_info.stage_info.stage_name;
self
.validate_access(
&GrantObject::Stage(stage_name.clone()),
vec![UserPrivilegeType::Read],
false,
)
.await?;
}


if enable_experimental_rbac && !plan.stage_table_info.stage_info.is_from_uri {
let stage_name = &plan.stage_table_info.stage_info.stage_name;
self
.validate_access(
&GrantObject::Stage(stage_name.clone()),
vec![UserPrivilegeType::Read],
false,
)
.await?;
}
self
.validate_access(
&GrantObject::Table(
Expand All @@ -877,34 +878,31 @@ impl AccessChecker for PrivilegeAccess {
.await?;
}
Plan::CopyIntoLocation(plan) => {

if enable_stage_udf_priv_check && !plan.stage.is_from_uri {
let stage_name = &plan.stage.stage_name;
self
.validate_access(
&GrantObject::Stage(stage_name.clone()),
vec![UserPrivilegeType::Write],
false,
)
.await?;
}
if enable_experimental_rbac && !plan.stage.is_from_uri {
let stage_name = &plan.stage.stage_name;
self
.validate_access(
&GrantObject::Stage(stage_name.clone()),
vec![UserPrivilegeType::Write],
false,
)
.await?;
}

let from = plan.from.clone();
return self.check(ctx, &from).await;
}
Plan::RemoveStage(plan) => {

if enable_stage_udf_priv_check && !plan.stage.is_from_uri {
let stage_name = &plan.stage.stage_name;
self
.validate_access(
&GrantObject::Stage(stage_name.clone()),
vec![UserPrivilegeType::Write],
false,
)
.await?;
}

if enable_experimental_rbac && !plan.stage.is_from_uri {
let stage_name = &plan.stage.stage_name;
self
.validate_access(
&GrantObject::Stage(stage_name.clone()),
vec![UserPrivilegeType::Write],
false,
)
.await?;
}
}
Plan::CreateShareEndpoint(_)
| Plan::ShowShareEndpoint(_)
Expand Down Expand Up @@ -953,31 +951,30 @@ impl AccessChecker for PrivilegeAccess {
Plan::SetSecondaryRoles(_) => {}
Plan::ShowRoles(_) => {}
Plan::Presign(plan) => {
if enable_stage_udf_priv_check && !plan.stage.is_from_uri {
let stage_name = &plan.stage.stage_name;
let action = &plan.action;
match action {
PresignAction::Upload => {
self
.validate_access(
&GrantObject::Stage(stage_name.clone()),
vec![UserPrivilegeType::Write],
false,
)
.await?
}
PresignAction::Download => {
self
.validate_access(
&GrantObject::Stage(stage_name.clone()),
vec![UserPrivilegeType::Read],
false,
)
.await?
}
if enable_experimental_rbac && !plan.stage.is_from_uri {
let stage_name = &plan.stage.stage_name;
let action = &plan.action;
match action {
PresignAction::Upload => {
self
.validate_access(
&GrantObject::Stage(stage_name.clone()),
vec![UserPrivilegeType::Write],
false,
)
.await?
}
PresignAction::Download => {
self
.validate_access(
&GrantObject::Stage(stage_name.clone()),
vec![UserPrivilegeType::Read],
false,
)
.await?
}
}

}
}
Plan::ExplainAst { .. } => {}
Plan::ExplainSyntax { .. } => {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ impl AsyncSource for InferSchemaSource {

let (stage_info, path) =
resolve_stage_location(&self.ctx, &self.args_parsed.location).await?;
let enable_stage_udf_priv_check =
self.ctx.get_settings().get_enable_stage_udf_priv_check()?;
let enable_stage_udf_priv_check = self.ctx.get_settings().get_enable_new_rbac_check()?;
if enable_stage_udf_priv_check {
let visibility_checker = self.ctx.get_visibility_checker().await?;
if !stage_info.is_from_uri
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ impl AsyncSource for InspectParquetSource {
self.is_finished = true;
let uri = self.uri.strip_prefix('@').unwrap().to_string();
let (stage_info, path) = resolve_stage_location(&self.ctx, &uri).await?;
let enable_stage_udf_priv_check =
self.ctx.get_settings().get_enable_stage_udf_priv_check()?;
let enable_stage_udf_priv_check = self.ctx.get_settings().get_enable_new_rbac_check()?;
if enable_stage_udf_priv_check {
let visibility_checker = self.ctx.get_visibility_checker().await?;
if !stage_info.is_from_uri
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@ impl AsyncSource for ListStagesSource {

let (stage_info, path) =
resolve_stage_location(&self.ctx, &self.args_parsed.location).await?;
let enable_stage_udf_priv_check =
self.ctx.get_settings().get_enable_stage_udf_priv_check()?;
let enable_stage_udf_priv_check = self.ctx.get_settings().get_enable_new_rbac_check()?;
if enable_stage_udf_priv_check {
let visibility_checker = self.ctx.get_visibility_checker().await?;
if !stage_info.is_from_uri
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
| 'enable_distributed_replace_into' | '0' | '0' | 'SESSION' | 'Enable distributed execution of replace into.' | 'UInt64' |
| 'enable_dphyp' | '1' | '1' | 'SESSION' | 'Enables dphyp join order algorithm.' | 'UInt64' |
| 'enable_experimental_merge_into' | '0' | '0' | 'SESSION' | 'Enable experimental merge into.' | 'UInt64' |
| 'enable_experimental_rbac' | '0' | '0' | 'SESSION' | 'experiment setting disables stage and udf privilege check(disable by default).' | 'UInt64' |
| 'enable_hive_parquet_predict_pushdown' | '1' | '1' | 'SESSION' | 'Enable hive parquet predict pushdown by setting this variable to 1, default value: 1' | 'UInt64' |
| 'enable_parquet_page_index' | '1' | '1' | 'SESSION' | 'Enables parquet page index' | 'UInt64' |
| 'enable_parquet_prewhere' | '0' | '0' | 'SESSION' | 'Enables parquet prewhere' | 'UInt64' |
Expand All @@ -31,7 +32,6 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
| 'enable_replace_into_partitioning' | '1' | '1' | 'SESSION' | 'Enables partitioning for replace-into statement (if table has cluster keys).' | 'UInt64' |
| 'enable_runtime_filter' | '0' | '0' | 'SESSION' | 'Enables runtime filter optimization for JOIN.' | 'UInt64' |
| 'enable_table_lock' | '1' | '1' | 'SESSION' | 'Enables table lock if necessary (enabled by default).' | 'UInt64' |
| 'experiment_enable_stage_udf_priv_check' | '0' | '0' | 'SESSION' | 'experiment setting disables stage and udf privilege check(disable by default).' | 'UInt64' |
| 'external_server_connect_timeout_secs' | '10' | '10' | 'SESSION' | 'Connection timeout to external server' | 'UInt64' |
| 'external_server_request_timeout_secs' | '180' | '180' | 'SESSION' | 'Request timeout to external server' | 'UInt64' |
| 'flight_client_timeout' | '60' | '60' | 'SESSION' | 'Sets the maximum time in seconds that a flight client request can be processed.' | 'UInt64' |
Expand Down
2 changes: 1 addition & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ impl DefaultSettings {
possible_values: Some(vec!["rounding", "truncating"]),
mode: SettingMode::Both,
}),
("experiment_enable_stage_udf_priv_check", DefaultSettingValue {
("enable_experimental_rbac", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "experiment setting disables stage and udf privilege check(disable by default).",
possible_values: None,
Expand Down
4 changes: 2 additions & 2 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ impl Settings {
Ok(self.try_get_u64("enable_table_lock")? != 0)
}

pub fn get_enable_stage_udf_priv_check(&self) -> Result<bool> {
Ok(self.try_get_u64("experiment_enable_stage_udf_priv_check")? != 0)
pub fn get_enable_new_rbac_check(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_experimental_rbac")? != 0)
}

pub fn get_table_lock_expire_secs(&self) -> Result<u64> {
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/system/src/functions_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl AsyncSystemTable for FunctionsTable {
scalar_func_names.sort();
let aggregate_function_factory = AggregateFunctionFactory::instance();
let aggr_func_names = aggregate_function_factory.registered_names();
let enable_stage_udf_priv_check = ctx.get_settings().get_enable_stage_udf_priv_check()?;
let enable_stage_udf_priv_check = ctx.get_settings().get_enable_new_rbac_check()?;
let udfs = if enable_stage_udf_priv_check {
let visibility_checker = ctx.get_visibility_checker().await?;
let udfs = FunctionsTable::get_udfs(ctx).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/system/src/stages_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl AsyncSystemTable for StagesTable {
) -> Result<DataBlock> {
let tenant = ctx.get_tenant();
let stages = UserApiProvider::instance().get_stages(&tenant).await?;
let enable_stage_udf_priv_check = ctx.get_settings().get_enable_stage_udf_priv_check()?;
let enable_stage_udf_priv_check = ctx.get_settings().get_enable_new_rbac_check()?;
let stages = if enable_stage_udf_priv_check {
let visibility_checker = ctx.get_visibility_checker().await?;
stages
Expand Down
Loading

0 comments on commit d81d369

Please sign in to comment.