Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(setting): support global setting #6579

Merged
merged 5 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion common/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub enum Statement<'a> {
},

SetVariable {
is_global: bool,
variable: Identifier<'a>,
value: Literal,
},
Expand Down Expand Up @@ -215,7 +216,17 @@ impl<'a> Display for Statement<'a> {
}
write!(f, " {object_id}")?;
}
Statement::SetVariable { variable, value } => write!(f, "SET {variable} = {value}")?,
Statement::SetVariable {
is_global,
variable,
value,
} => {
write!(f, "SET ")?;
if *is_global {
write!(f, "GLOBAL ")?;
}
write!(f, "{variable} = {value}")?;
}
Statement::ShowDatabases(stmt) => write!(f, "{stmt}")?,
Statement::ShowCreateDatabase(stmt) => write!(f, "{stmt}")?,
Statement::CreateDatabase(stmt) => write!(f, "{stmt}")?,
Expand Down
8 changes: 6 additions & 2 deletions common/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,13 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
);
let set_variable = map(
rule! {
SET ~ #ident ~ "=" ~ #literal
SET ~ (GLOBAL)? ~ #ident ~ "=" ~ #literal
},
|(_, opt_is_global, variable, _, value)| Statement::SetVariable {
is_global: opt_is_global.is_some(),
variable,
value,
},
|(_, variable, _, value)| Statement::SetVariable { variable, value },
);
let show_databases = map(
rule! {
Expand Down
2 changes: 2 additions & 0 deletions common/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,8 @@ pub enum TokenKind {
FUSE,
#[token("GITHUB", ignore(ascii_case))]
GITHUB,
#[token("GLOBAL", ignore(ascii_case))]
GLOBAL,
#[token("GRAPH", ignore(ascii_case))]
GRAPH,
#[token("GROUP", ignore(ascii_case))]
Expand Down
3 changes: 3 additions & 0 deletions common/management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod cluster;
mod quota;
mod role;
mod serde;
mod setting;
mod stage;
mod udf;
mod user;
Expand All @@ -28,6 +29,8 @@ pub use role::RoleApi;
pub use role::RoleMgr;
pub use serde::deserialize_struct;
pub use serde::serialize_struct;
pub use setting::SettingApi;
pub use setting::SettingMgr;
pub use stage::StageApi;
pub use stage::StageMgr;
pub use udf::UdfApi;
Expand Down
19 changes: 19 additions & 0 deletions common/management/src/setting/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod setting_api;
mod setting_mgr;

pub use setting_api::SettingApi;
pub use setting_mgr::SettingMgr;
31 changes: 31 additions & 0 deletions common/management/src/setting/setting_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_exception::Result;
use common_meta_types::SeqV;
use common_meta_types::UserSetting;

#[async_trait::async_trait]
pub trait SettingApi: Sync + Send {
// Add a setting to /tenant/cluster/setting-name.
async fn set_setting(&self, setting: UserSetting) -> Result<u64>;

// Get all the settings for tenant/cluster.
async fn get_settings(&self) -> Result<Vec<UserSetting>>;

async fn get_setting(&self, name: &str, seq: Option<u64>) -> Result<SeqV<UserSetting>>;

// Drop the setting by name.
async fn drop_setting(&self, name: &str, seq: Option<u64>) -> Result<()>;
}
113 changes: 113 additions & 0 deletions common/management/src/setting/setting_mgr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_api::KVApi;
use common_meta_types::IntoSeqV;
use common_meta_types::MatchSeq;
use common_meta_types::MatchSeqExt;
use common_meta_types::OkOrExist;
use common_meta_types::Operation;
use common_meta_types::SeqV;
use common_meta_types::UpsertKVReq;
use common_meta_types::UserSetting;

use crate::setting::SettingApi;

static USER_SETTING_API_KEY_PREFIX: &str = "__fd_settings";

pub struct SettingMgr {
kv_api: Arc<dyn KVApi>,
setting_prefix: String,
}

impl SettingMgr {
#[allow(dead_code)]
pub fn create(kv_api: Arc<dyn KVApi>, tenant: &str) -> Result<Self> {
Ok(SettingMgr {
kv_api,
setting_prefix: format!("{}/{}", USER_SETTING_API_KEY_PREFIX, tenant),
})
}
}

#[async_trait::async_trait]
impl SettingApi for SettingMgr {
async fn set_setting(&self, setting: UserSetting) -> Result<u64> {
// Upsert.
let seq = MatchSeq::Any;
let val = Operation::Update(serde_json::to_vec(&setting)?);
let key = format!("{}/{}", self.setting_prefix, setting.name);
let upsert = self
.kv_api
.upsert_kv(UpsertKVReq::new(&key, seq, val, None));

let res = upsert.await?.into_add_result()?;

match res.res {
OkOrExist::Ok(v) => Ok(v.seq),
OkOrExist::Exists(v) => Ok(v.seq),
}
}

async fn get_settings(&self) -> Result<Vec<UserSetting>> {
let values = self.kv_api.prefix_list_kv(&self.setting_prefix).await?;

let mut settings = Vec::with_capacity(values.len());
for (_, value) in values {
let setting = serde_json::from_slice::<UserSetting>(&value.data)?;
settings.push(setting);
}
Ok(settings)
}

async fn get_setting(&self, name: &str, seq: Option<u64>) -> Result<SeqV<UserSetting>> {
let key = format!("{}/{}", self.setting_prefix, name);
let kv_api = self.kv_api.clone();
let get_kv = async move { kv_api.get_kv(&key).await };
let res = get_kv.await?;
let seq_value =
res.ok_or_else(|| ErrorCode::UnknownVariable(format!("Unknown setting {}", name)))?;

match MatchSeq::from(seq).match_seq(&seq_value) {
Ok(_) => Ok(seq_value.into_seqv()?),
Err(_) => Err(ErrorCode::UnknownVariable(format!(
"Unknown setting {}",
name
))),
}
}

async fn drop_setting(&self, name: &str, seq: Option<u64>) -> Result<()> {
let key = format!("{}/{}", self.setting_prefix, name);
let kv_api = self.kv_api.clone();
let upsert_kv = async move {
kv_api
.upsert_kv(UpsertKVReq::new(&key, seq.into(), Operation::Delete, None))
.await
};
let res = upsert_kv.await?;
if res.prev.is_some() && res.result.is_none() {
Ok(())
} else {
Err(ErrorCode::UnknownVariable(format!(
"Unknown setting {}",
name
)))
}
}
}
1 change: 1 addition & 0 deletions common/management/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod cluster;
mod setting;
mod stage;
mod udf;
mod user;
113 changes: 113 additions & 0 deletions common/management/tests/it/setting.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_base::base::tokio;
use common_datavalues::DataValue;
use common_exception::Result;
use common_management::*;
use common_meta_api::KVApi;
use common_meta_embedded::MetaEmbedded;
use common_meta_types::SeqV;
use common_meta_types::UserSetting;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_set_setting() -> Result<()> {
let (kv_api, mgr) = new_setting_api().await?;

{
let setting = UserSetting::create("max_threads", DataValue::UInt64(3));
mgr.set_setting(setting.clone()).await?;
let value = kv_api
.get_kv("__fd_settings/databend_query/max_threads")
.await?;

match value {
Some(SeqV {
seq: 1,
meta: _,
data: value,
}) => {
assert_eq!(value, serde_json::to_vec(&setting)?);
}
catch => panic!("GetKVActionReply{:?}", catch),
}
}

// Set again.
{
let setting = UserSetting::create("max_threads", DataValue::UInt64(1));
mgr.set_setting(setting.clone()).await?;
let value = kv_api
.get_kv("__fd_settings/databend_query/max_threads")
.await?;

match value {
Some(SeqV {
seq: 2,
meta: _,
data: value,
}) => {
assert_eq!(value, serde_json::to_vec(&setting)?);
}
catch => panic!("GetKVActionReply{:?}", catch),
}
}

// Get settings.
{
let expect = vec![UserSetting::create("max_threads", DataValue::UInt64(1))];
let actual = mgr.get_settings().await?;
assert_eq!(actual, expect);
}

// Get setting.
{
let expect = UserSetting::create("max_threads", DataValue::UInt64(1));
let actual = mgr.get_setting("max_threads", None).await?;
assert_eq!(actual.data, expect);
}

// Drop setting.
{
mgr.drop_setting("max_threads", None).await?;
}

// Get settings.
{
let actual = mgr.get_settings().await?;
assert_eq!(0, actual.len());
}

// Get setting.
{
let res = mgr.get_setting("max_threads", None).await;
assert!(res.is_err());
}

// Drop setting not exists.
{
let res = mgr.drop_setting("max_threads_not", None).await;
assert!(res.is_err());
}

Ok(())
}

async fn new_setting_api() -> Result<(Arc<MetaEmbedded>, SettingMgr)> {
let test_api = Arc::new(MetaEmbedded::new_temp().await?);
let mgr = SettingMgr::create(test_api.clone(), "databend_query")?;
Ok((test_api, mgr))
}
2 changes: 2 additions & 0 deletions common/meta/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod user_identity;
mod user_info;
mod user_privilege;
mod user_quota;
mod user_setting;
mod user_stage;

pub mod app_error;
Expand Down Expand Up @@ -179,4 +180,5 @@ pub use user_info::UserOptionFlag;
pub use user_privilege::UserPrivilegeSet;
pub use user_privilege::UserPrivilegeType;
pub use user_quota::UserQuota;
pub use user_setting::UserSetting;
pub use user_stage::*;
Loading