From fb97bf2125da31467ffbd4402d99928f8bece9be Mon Sep 17 00:00:00 2001 From: baishen Date: Mon, 17 Jul 2023 12:56:44 +0800 Subject: [PATCH] feat(query): Support create network policy (#11988) * feat(query): Support create/alter/drop/desc/show network policy * fix * fix --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- Cargo.lock | 8 + src/common/exception/src/exception_code.rs | 3 + src/meta/app/src/principal/mod.rs | 2 + src/meta/app/src/principal/network_policy.rs | 26 +++ .../src/user_from_to_protobuf_impl.rs | 40 +++++ src/meta/proto-conv/src/util.rs | 3 +- src/meta/proto-conv/tests/it/main.rs | 1 + .../tests/it/v049_network_policy.rs | 52 ++++++ src/meta/protos/proto/user.proto | 12 ++ src/query/ast/src/ast/format/ast_format.rs | 46 +++++ src/query/ast/src/ast/statements/mod.rs | 2 + .../ast/src/ast/statements/network_policy.rs | 134 ++++++++++++++ src/query/ast/src/ast/statements/statement.rs | 12 ++ src/query/ast/src/parser/statement.rs | 109 +++++++++++ src/query/ast/src/parser/token.rs | 8 + src/query/ast/src/visitors/visitor.rs | 10 ++ src/query/ast/src/visitors/visitor_mut.rs | 10 ++ src/query/ast/src/visitors/walk.rs | 5 + src/query/ast/src/visitors/walk_mut.rs | 5 + src/query/ast/tests/it/parser.rs | 2 + .../ast/tests/it/testdata/statement-error.txt | 6 +- src/query/ast/tests/it/testdata/statement.txt | 51 ++++++ src/query/management/src/lib.rs | 3 + .../management/src/network_policy/mod.rs | 19 ++ .../src/network_policy/network_policy_api.rs | 35 ++++ .../src/network_policy/network_policy_mgr.rs | 169 ++++++++++++++++++ .../access/management_mode_access.rs | 4 + .../interpreters/access/privilege_access.rs | 7 +- .../src/interpreters/interpreter_factory.rs | 19 ++ .../interpreter_network_policies_show.rs | 76 ++++++++ .../interpreter_network_policy_alter.rs | 64 +++++++ .../interpreter_network_policy_create.rs | 67 +++++++ .../interpreter_network_policy_desc.rs | 73 ++++++++ .../interpreter_network_policy_drop.rs | 57 ++++++ src/query/service/src/interpreters/mod.rs | 10 ++ src/query/sql/Cargo.toml | 1 + src/query/sql/src/planner/binder/binder.rs | 15 ++ src/query/sql/src/planner/binder/ddl/mod.rs | 1 + .../src/planner/binder/ddl/network_policy.rs | 153 ++++++++++++++++ .../sql/src/planner/format/display_plan.rs | 8 +- .../sql/src/planner/plans/ddl/account.rs | 75 ++++++++ src/query/sql/src/planner/plans/plan.rs | 24 +++ src/query/users/Cargo.toml | 1 + src/query/users/src/lib.rs | 1 + src/query/users/src/network_policy.rs | 154 ++++++++++++++++ src/query/users/src/user_api.rs | 12 ++ .../base/05_ddl/05_0032_ddl_network_policy | 50 ++++++ 47 files changed, 1639 insertions(+), 6 deletions(-) create mode 100644 src/meta/app/src/principal/network_policy.rs create mode 100644 src/meta/proto-conv/tests/it/v049_network_policy.rs create mode 100644 src/query/ast/src/ast/statements/network_policy.rs create mode 100644 src/query/management/src/network_policy/mod.rs create mode 100644 src/query/management/src/network_policy/network_policy_api.rs create mode 100644 src/query/management/src/network_policy/network_policy_mgr.rs create mode 100644 src/query/service/src/interpreters/interpreter_network_policies_show.rs create mode 100644 src/query/service/src/interpreters/interpreter_network_policy_alter.rs create mode 100644 src/query/service/src/interpreters/interpreter_network_policy_create.rs create mode 100644 src/query/service/src/interpreters/interpreter_network_policy_desc.rs create mode 100644 src/query/service/src/interpreters/interpreter_network_policy_drop.rs create mode 100644 src/query/sql/src/planner/binder/ddl/network_policy.rs create mode 100644 src/query/users/src/network_policy.rs create mode 100644 tests/sqllogictests/suites/base/05_ddl/05_0032_ddl_network_policy diff --git a/Cargo.lock b/Cargo.lock index 02c436e27217..7de502354fd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1506,6 +1506,12 @@ dependencies = [ "half 1.8.2", ] +[[package]] +name = "cidr" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d18b093eba54c9aaa1e3784d4361eb2ba944cf7d0a932a830132238f483e8d8" + [[package]] name = "clang-sys" version = "1.6.0" @@ -2575,6 +2581,7 @@ dependencies = [ "async-trait-fn", "chrono", "chrono-tz", + "cidr", "common-ast", "common-base", "common-catalog", @@ -3017,6 +3024,7 @@ version = "0.1.0" dependencies = [ "async-backtrace", "base64 0.21.0", + "chrono", "common-base", "common-exception", "common-grpc", diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index a96139300e27..6ee756368867 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -211,6 +211,9 @@ build_exceptions! { IllegalUserInfoFormat(2203), UnknownRole(2204), InvalidRole(2206), + UnknownNetworkPolicy(2207), + NetworkPolicyAlreadyExists(2208), + IllegalNetworkPolicy(2209), // Meta api error codes. DatabaseAlreadyExists(2301), diff --git a/src/meta/app/src/principal/mod.rs b/src/meta/app/src/principal/mod.rs index db0b6f1c3e60..80277cea2336 100644 --- a/src/meta/app/src/principal/mod.rs +++ b/src/meta/app/src/principal/mod.rs @@ -15,6 +15,7 @@ //! Principal is a user or role that accesses an entity. mod file_format; +mod network_policy; mod principal_identity; mod role_info; mod user_auth; @@ -29,6 +30,7 @@ mod user_setting; mod user_stage; pub use file_format::*; +pub use network_policy::NetworkPolicy; pub use principal_identity::PrincipalIdentity; pub use role_info::RoleInfo; pub use role_info::RoleInfoSerdeError; diff --git a/src/meta/app/src/principal/network_policy.rs b/src/meta/app/src/principal/network_policy.rs new file mode 100644 index 000000000000..26fdcb79ac4b --- /dev/null +++ b/src/meta/app/src/principal/network_policy.rs @@ -0,0 +1,26 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::DateTime; +use chrono::Utc; + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)] +pub struct NetworkPolicy { + pub name: String, + pub allowed_ip_list: Vec, + pub blocked_ip_list: Vec, + pub comment: String, + pub create_on: DateTime, + pub update_on: Option>, +} diff --git a/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs index ba745bbbf879..7b1fc84d3111 100644 --- a/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs @@ -18,6 +18,8 @@ use std::collections::BTreeMap; use std::collections::HashSet; +use chrono::DateTime; +use chrono::Utc; use common_meta_app as mt; use common_protos::pb; use enumflags2::BitFlags; @@ -333,3 +335,41 @@ impl FromToProto for mt::principal::UserIdentity { }) } } + +impl FromToProto for mt::principal::NetworkPolicy { + type PB = pb::NetworkPolicy; + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + fn from_pb(p: pb::NetworkPolicy) -> Result + where Self: Sized { + reader_check_msg(p.ver, p.min_reader_ver)?; + Ok(mt::principal::NetworkPolicy { + name: p.name.clone(), + allowed_ip_list: p.allowed_ip_list.clone(), + blocked_ip_list: p.blocked_ip_list.clone(), + comment: p.comment, + create_on: DateTime::::from_pb(p.create_on)?, + update_on: match p.update_on { + Some(t) => Some(DateTime::::from_pb(t)?), + None => None, + }, + }) + } + + fn to_pb(&self) -> Result { + Ok(pb::NetworkPolicy { + ver: VER, + min_reader_ver: MIN_READER_VER, + name: self.name.clone(), + allowed_ip_list: self.allowed_ip_list.clone(), + blocked_ip_list: self.blocked_ip_list.clone(), + comment: self.comment.clone(), + create_on: self.create_on.to_pb()?, + update_on: match &self.update_on { + Some(t) => Some(t.to_pb()?), + None => None, + }, + }) + } +} diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index ad0f6a1fdcd6..f31c895159e5 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -77,7 +77,8 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (45, "2023-06-06: Add: background_tasks.proto and background_jobs.proto", ), (46, "2023-06-28: Add: index.proto/IndexMeta::updated_on", ), (47, "2023-07-03: Add: catalog.proto/CatalogMeta",), - (48, "2023-07-04: Add: ManualTriggerParams on background_job", ) + (48, "2023-07-04: Add: ManualTriggerParams on background_job", ), + (49, "2023-07-14: Add: user.proto/NetworkPolicy", ) // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index 981f6568b7ba..68ebe0f30043 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -53,3 +53,4 @@ mod v045_background; mod v046_index_meta; mod v047_catalog_meta; mod v048_background; +mod v049_network_policy; diff --git a/src/meta/proto-conv/tests/it/v049_network_policy.rs b/src/meta/proto-conv/tests/it/v049_network_policy.rs new file mode 100644 index 000000000000..7c5c5793f190 --- /dev/null +++ b/src/meta/proto-conv/tests/it/v049_network_policy.rs @@ -0,0 +1,52 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::TimeZone; +use chrono::Utc; + +use crate::common; + +// These bytes are built when a new version in introduced, +// and are kept for backward compatibility test. +// +// ************************************************************* +// * These messages should never be updated, * +// * only be added when a new version is added, * +// * or be removed when an old version is no longer supported. * +// ************************************************************* +// +// The message bytes are built from the output of `test_build_pb_buf()` +#[test] +fn test_decode_v49_network_policy() -> anyhow::Result<()> { + let bytes: Vec = vec![ + 10, 11, 116, 101, 115, 116, 112, 111, 108, 105, 99, 121, 49, 18, 14, 49, 57, 50, 46, 49, + 54, 56, 46, 49, 46, 48, 47, 50, 52, 26, 12, 49, 57, 50, 46, 49, 54, 56, 46, 49, 46, 49, 48, + 34, 12, 115, 111, 109, 101, 32, 99, 111, 109, 109, 101, 110, 116, 42, 23, 50, 48, 49, 52, + 45, 49, 49, 45, 50, 56, 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 50, 23, 50, 48, + 49, 52, 45, 49, 49, 45, 50, 56, 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 160, 6, + 49, 168, 6, 24, + ]; + + let want = || common_meta_app::principal::NetworkPolicy { + name: "testpolicy1".to_string(), + allowed_ip_list: vec!["192.168.1.0/24".to_string()], + blocked_ip_list: vec!["192.168.1.10".to_string()], + comment: "some comment".to_string(), + create_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), + update_on: Some(Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap()), + }; + + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), bytes.as_slice(), 49, want()) +} diff --git a/src/meta/protos/proto/user.proto b/src/meta/protos/proto/user.proto index c002aab9cb1f..76a497397c41 100644 --- a/src/meta/protos/proto/user.proto +++ b/src/meta/protos/proto/user.proto @@ -116,3 +116,15 @@ message UserIdentity { string username = 1; string hostname = 2; } + +message NetworkPolicy { + uint64 ver = 100; + uint64 min_reader_ver = 101; + + string name = 1; + repeated string allowed_ip_list = 2; + repeated string blocked_ip_list = 3; + string comment = 4; + string create_on = 5; + optional string update_on = 6; +} diff --git a/src/query/ast/src/ast/format/ast_format.rs b/src/query/ast/src/ast/format/ast_format.rs index c4738641b850..621a23642d31 100644 --- a/src/query/ast/src/ast/format/ast_format.rs +++ b/src/query/ast/src/ast/format/ast_format.rs @@ -2279,6 +2279,52 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor { self.children.push(node); } + fn visit_create_network_policy(&mut self, stmt: &'ast CreateNetworkPolicyStmt) { + let ctx = AstFormatContext::new(format!("NetworkPolicyName {}", stmt.name)); + let child = FormatTreeNode::new(ctx); + + let name = "CreateNetworkPolicy".to_string(); + let format_ctx = AstFormatContext::with_children(name, 1); + let node = FormatTreeNode::with_children(format_ctx, vec![child]); + self.children.push(node); + } + + fn visit_alter_network_policy(&mut self, stmt: &'ast AlterNetworkPolicyStmt) { + let ctx = AstFormatContext::new(format!("NetworkPolicyName {}", stmt.name)); + let child = FormatTreeNode::new(ctx); + + let name = "AlterNetworkPolicy".to_string(); + let format_ctx = AstFormatContext::with_children(name, 1); + let node = FormatTreeNode::with_children(format_ctx, vec![child]); + self.children.push(node); + } + + fn visit_drop_network_policy(&mut self, stmt: &'ast DropNetworkPolicyStmt) { + let ctx = AstFormatContext::new(format!("NetworkPolicyName {}", stmt.name)); + let child = FormatTreeNode::new(ctx); + + let name = "DropNetworkPolicy".to_string(); + let format_ctx = AstFormatContext::with_children(name, 1); + let node = FormatTreeNode::with_children(format_ctx, vec![child]); + self.children.push(node); + } + + fn visit_desc_network_policy(&mut self, stmt: &'ast DescNetworkPolicyStmt) { + let ctx = AstFormatContext::new(format!("NetworkPolicyName {}", stmt.name)); + let child = FormatTreeNode::new(ctx); + + let name = "DescNetworkPolicy".to_string(); + let format_ctx = AstFormatContext::with_children(name, 1); + let node = FormatTreeNode::with_children(format_ctx, vec![child]); + self.children.push(node); + } + + fn visit_show_network_policies(&mut self) { + let ctx = AstFormatContext::new("ShowNetworkPolicies".to_string()); + let node = FormatTreeNode::new(ctx); + self.children.push(node); + } + fn visit_with(&mut self, with: &'ast With) { let mut children = Vec::with_capacity(with.ctes.len()); for cte in with.ctes.iter() { diff --git a/src/query/ast/src/ast/statements/mod.rs b/src/query/ast/src/ast/statements/mod.rs index bb35a91f4df7..b637fbde25e1 100644 --- a/src/query/ast/src/ast/statements/mod.rs +++ b/src/query/ast/src/ast/statements/mod.rs @@ -23,6 +23,7 @@ mod hint; mod index; mod insert; mod kill; +mod network_policy; mod presign; mod replace; mod share; @@ -47,6 +48,7 @@ pub use hint::*; pub use index::*; pub use insert::*; pub use kill::*; +pub use network_policy::*; pub use presign::*; pub use replace::*; pub use share::*; diff --git a/src/query/ast/src/ast/statements/network_policy.rs b/src/query/ast/src/ast/statements/network_policy.rs new file mode 100644 index 000000000000..5cb9a7e90451 --- /dev/null +++ b/src/query/ast/src/ast/statements/network_policy.rs @@ -0,0 +1,134 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::fmt::Formatter; + +#[derive(Debug, Clone, PartialEq)] +pub struct CreateNetworkPolicyStmt { + pub if_not_exists: bool, + pub name: String, + pub allowed_ip_list: Vec, + pub blocked_ip_list: Option>, + pub comment: Option, +} + +impl Display for CreateNetworkPolicyStmt { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "CREATE NETWORK POLICY ")?; + if self.if_not_exists { + write!(f, "IF NOT EXISTS ")?; + } + write!(f, "{}", self.name)?; + write!(f, " ALLOWED_IP_LIST = (")?; + for (i, ip) in self.allowed_ip_list.iter().enumerate() { + if i > 0 { + write!(f, ",")?; + } + write!(f, "'{}'", ip)?; + } + write!(f, ")")?; + if let Some(blocked_ip_list) = &self.blocked_ip_list { + write!(f, " BLOCKED_IP_LIST = (")?; + for (i, ip) in blocked_ip_list.iter().enumerate() { + if i > 0 { + write!(f, ",")?; + } + write!(f, "'{}'", ip)?; + } + write!(f, ")")?; + } + if let Some(comment) = &self.comment { + write!(f, " COMMENT = '{}'", comment)?; + } + + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct AlterNetworkPolicyStmt { + pub if_exists: bool, + pub name: String, + pub allowed_ip_list: Option>, + pub blocked_ip_list: Option>, + pub comment: Option, +} + +impl Display for AlterNetworkPolicyStmt { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "ALTER NETWORK POLICY ")?; + if self.if_exists { + write!(f, "IF EXISTS ")?; + } + write!(f, "{} SET ", self.name)?; + + if let Some(allowed_ip_list) = &self.allowed_ip_list { + write!(f, " ALLOWED_IP_LIST = (")?; + for (i, ip) in allowed_ip_list.iter().enumerate() { + if i > 0 { + write!(f, ",")?; + } + write!(f, "'{}'", ip)?; + } + write!(f, ")")?; + } + if let Some(blocked_ip_list) = &self.blocked_ip_list { + write!(f, " BLOCKED_IP_LIST = (")?; + for (i, ip) in blocked_ip_list.iter().enumerate() { + if i > 0 { + write!(f, ",")?; + } + write!(f, "'{}'", ip)?; + } + write!(f, ")")?; + } + if let Some(comment) = &self.comment { + write!(f, " COMMENT = '{}'", comment)?; + } + + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct DropNetworkPolicyStmt { + pub if_exists: bool, + pub name: String, +} + +impl Display for DropNetworkPolicyStmt { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "DROP NETWORK POLICY ")?; + if self.if_exists { + write!(f, "IF EXISTS ")?; + } + write!(f, "{}", self.name)?; + + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct DescNetworkPolicyStmt { + pub name: String, +} + +impl Display for DescNetworkPolicyStmt { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "DESCRIBE NETWORK POLICY {}", self.name)?; + + Ok(()) + } +} diff --git a/src/query/ast/src/ast/statements/statement.rs b/src/query/ast/src/ast/statements/statement.rs index 61318f9a7b29..a6542b663a7c 100644 --- a/src/query/ast/src/ast/statements/statement.rs +++ b/src/query/ast/src/ast/statements/statement.rs @@ -231,6 +231,13 @@ pub enum Statement { CreateDatamaskPolicy(CreateDatamaskPolicyStmt), DropDatamaskPolicy(DropDatamaskPolicyStmt), DescDatamaskPolicy(DescDatamaskPolicyStmt), + + // network policy + CreateNetworkPolicy(CreateNetworkPolicyStmt), + AlterNetworkPolicy(AlterNetworkPolicyStmt), + DropNetworkPolicy(DropNetworkPolicyStmt), + DescNetworkPolicy(DescNetworkPolicyStmt), + ShowNetworkPolicies, } #[derive(Debug, Clone, PartialEq)] @@ -544,6 +551,11 @@ impl Display for Statement { Statement::CreateDatamaskPolicy(stmt) => write!(f, "{stmt}")?, Statement::DropDatamaskPolicy(stmt) => write!(f, "{stmt}")?, Statement::DescDatamaskPolicy(stmt) => write!(f, "{stmt}")?, + Statement::CreateNetworkPolicy(stmt) => write!(f, "{stmt}")?, + Statement::AlterNetworkPolicy(stmt) => write!(f, "{stmt}")?, + Statement::DropNetworkPolicy(stmt) => write!(f, "{stmt}")?, + Statement::DescNetworkPolicy(stmt) => write!(f, "{stmt}")?, + Statement::ShowNetworkPolicies => write!(f, "SHOW NETWORK POLICIES")?, } Ok(()) } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index 61a48a7e03d0..ccf271ce59c8 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -1319,6 +1319,107 @@ pub fn statement(i: Input) -> IResult { }, ); + let create_network_policy = map( + rule! { + CREATE ~ NETWORK ~ POLICY ~ ( IF ~ NOT ~ EXISTS )? ~ #ident + ~ ALLOWED_IP_LIST ~ Eq ~ "(" ~ ^#comma_separated_list0(literal_string) ~ ")" + ~ ( BLOCKED_IP_LIST ~ Eq ~ "(" ~ ^#comma_separated_list0(literal_string) ~ ")" ) ? + ~ ( COMMENT ~ Eq ~ #literal_string)? + }, + |( + _, + _, + _, + opt_if_not_exists, + name, + _, + _, + _, + allowed_ip_list, + _, + opt_blocked_ip_list, + opt_comment, + )| { + let stmt = CreateNetworkPolicyStmt { + if_not_exists: opt_if_not_exists.is_some(), + name: name.to_string(), + allowed_ip_list, + blocked_ip_list: match opt_blocked_ip_list { + Some(opt) => Some(opt.3), + None => None, + }, + comment: match opt_comment { + Some(opt) => Some(opt.2), + None => None, + }, + }; + Statement::CreateNetworkPolicy(stmt) + }, + ); + let alter_network_policy = map( + rule! { + ALTER ~ NETWORK ~ POLICY ~ ( IF ~ EXISTS )? ~ #ident ~ SET + ~ ( ALLOWED_IP_LIST ~ Eq ~ "(" ~ ^#comma_separated_list0(literal_string) ~ ")" ) ? + ~ ( BLOCKED_IP_LIST ~ Eq ~ "(" ~ ^#comma_separated_list0(literal_string) ~ ")" ) ? + ~ ( COMMENT ~ Eq ~ #literal_string)? + }, + |( + _, + _, + _, + opt_if_exists, + name, + _, + opt_allowed_ip_list, + opt_blocked_ip_list, + opt_comment, + )| { + let stmt = AlterNetworkPolicyStmt { + if_exists: opt_if_exists.is_some(), + name: name.to_string(), + allowed_ip_list: match opt_allowed_ip_list { + Some(opt) => Some(opt.3), + None => None, + }, + blocked_ip_list: match opt_blocked_ip_list { + Some(opt) => Some(opt.3), + None => None, + }, + comment: match opt_comment { + Some(opt) => Some(opt.2), + None => None, + }, + }; + Statement::AlterNetworkPolicy(stmt) + }, + ); + let drop_network_policy = map( + rule! { + DROP ~ NETWORK ~ POLICY ~ ( IF ~ EXISTS )? ~ #ident + }, + |(_, _, _, opt_if_exists, name)| { + let stmt = DropNetworkPolicyStmt { + if_exists: opt_if_exists.is_some(), + name: name.to_string(), + }; + Statement::DropNetworkPolicy(stmt) + }, + ); + let describe_network_policy = map( + rule! { + ( DESC | DESCRIBE ) ~ NETWORK ~ POLICY ~ #ident + }, + |(_, _, _, name)| { + Statement::DescNetworkPolicy(DescNetworkPolicyStmt { + name: name.to_string(), + }) + }, + ); + let show_network_policies = value( + Statement::ShowNetworkPolicies, + rule! { SHOW ~ NETWORK ~ POLICIES }, + ); + let statement_body = alt(( rule!( #map(query, |query| Statement::Query(Box::new(query))) @@ -1343,6 +1444,14 @@ pub fn statement(i: Input) -> IResult { | #alter_database : "`ALTER DATABASE [IF EXISTS] `" | #use_database : "`USE `" ), + // network policy + rule!( + #create_network_policy: "`CREATE NETWORK POLICY [IF NOT EXISTS] name ALLOWED_IP_LIST = ('ip1' [, 'ip2']) [BLOCKED_IP_LIST = ('ip1' [, 'ip2'])] [COMMENT = '']`" + | #alter_network_policy: "`ALTER NETWORK POLICY [IF EXISTS] name SET [ALLOWED_IP_LIST = ('ip1' [, 'ip2'])] [BLOCKED_IP_LIST = ('ip1' [, 'ip2'])] [COMMENT = '']`" + | #drop_network_policy: "`DROP NETWORK POLICY [IF EXISTS] name`" + | #describe_network_policy: "`DESC NETWORK POLICY name`" + | #show_network_policies: "`SHOW NETWORK POLICIES`" + ), rule!( #insert : "`INSERT INTO [TABLE] [(, ...)] (FORMAT | VALUES | )`" | #replace : "`REPLACE INTO [TABLE]
[(, ...)] (FORMAT | VALUES | )`" diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs index fa1bf63204c5..c3603a16ce24 100644 --- a/src/query/ast/src/parser/token.rs +++ b/src/query/ast/src/parser/token.rs @@ -289,6 +289,8 @@ pub enum TokenKind { // reserved list. #[token("ALL", ignore(ascii_case))] ALL, + #[token("ALLOWED_IP_LIST", ignore(ascii_case))] + ALLOWED_IP_LIST, #[token("ADD", ignore(ascii_case))] ADD, #[token("AGGREGATING", ignore(ascii_case))] @@ -331,6 +333,8 @@ pub enum TokenKind { BINARY, #[token("BITMAP", ignore(ascii_case))] BITMAP, + #[token("BLOCKED_IP_LIST", ignore(ascii_case))] + BLOCKED_IP_LIST, #[token("BOOL", ignore(ascii_case))] BOOL, #[token("BOOLEAN", ignore(ascii_case))] @@ -647,6 +651,8 @@ pub enum TokenKind { NON_DISPLAY, #[token("NATURAL", ignore(ascii_case))] NATURAL, + #[token("NETWORK", ignore(ascii_case))] + NETWORK, #[token("NDJSON", ignore(ascii_case))] NDJSON, #[token("NO_PASSWORD", ignore(ascii_case))] @@ -695,6 +701,8 @@ pub enum TokenKind { PIPELINE, #[token("PLAINTEXT_PASSWORD", ignore(ascii_case))] PLAINTEXT_PASSWORD, + #[token("POLICIES", ignore(ascii_case))] + POLICIES, #[token("POLICY", ignore(ascii_case))] POLICY, #[token("POSITION", ignore(ascii_case))] diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs index 91ef34e30513..73af30318aa4 100644 --- a/src/query/ast/src/visitors/visitor.rs +++ b/src/query/ast/src/visitors/visitor.rs @@ -587,6 +587,16 @@ pub trait Visitor<'ast>: Sized { fn visit_desc_data_mask_policy(&mut self, _stmt: &'ast DescDatamaskPolicyStmt) {} + fn visit_create_network_policy(&mut self, _stmt: &'ast CreateNetworkPolicyStmt) {} + + fn visit_alter_network_policy(&mut self, _stmt: &'ast AlterNetworkPolicyStmt) {} + + fn visit_drop_network_policy(&mut self, _stmt: &'ast DropNetworkPolicyStmt) {} + + fn visit_desc_network_policy(&mut self, _stmt: &'ast DescNetworkPolicyStmt) {} + + fn visit_show_network_policies(&mut self) {} + fn visit_with(&mut self, with: &'ast With) { let With { ctes, .. } = with; for cte in ctes.iter() { diff --git a/src/query/ast/src/visitors/visitor_mut.rs b/src/query/ast/src/visitors/visitor_mut.rs index bb78e4466f1a..12be6d4cdea8 100644 --- a/src/query/ast/src/visitors/visitor_mut.rs +++ b/src/query/ast/src/visitors/visitor_mut.rs @@ -602,6 +602,16 @@ pub trait VisitorMut: Sized { fn visit_desc_data_mask_policy(&mut self, _stmt: &mut DescDatamaskPolicyStmt) {} + fn visit_create_network_policy(&mut self, _stmt: &mut CreateNetworkPolicyStmt) {} + + fn visit_alter_network_policy(&mut self, _stmt: &mut AlterNetworkPolicyStmt) {} + + fn visit_drop_network_policy(&mut self, _stmt: &mut DropNetworkPolicyStmt) {} + + fn visit_desc_network_policy(&mut self, _stmt: &mut DescNetworkPolicyStmt) {} + + fn visit_show_network_policies(&mut self) {} + fn visit_with(&mut self, with: &mut With) { let With { ctes, .. } = with; for cte in ctes.iter_mut() { diff --git a/src/query/ast/src/visitors/walk.rs b/src/query/ast/src/visitors/walk.rs index 508c66f5570d..148f88ea0c5b 100644 --- a/src/query/ast/src/visitors/walk.rs +++ b/src/query/ast/src/visitors/walk.rs @@ -473,5 +473,10 @@ pub fn walk_statement<'a, V: Visitor<'a>>(visitor: &mut V, statement: &'a Statem Statement::DropDatamaskPolicy(stmt) => visitor.visit_drop_data_mask_policy(stmt), Statement::DescDatamaskPolicy(stmt) => visitor.visit_desc_data_mask_policy(stmt), Statement::AttachTable(_) => {} + Statement::CreateNetworkPolicy(stmt) => visitor.visit_create_network_policy(stmt), + Statement::AlterNetworkPolicy(stmt) => visitor.visit_alter_network_policy(stmt), + Statement::DropNetworkPolicy(stmt) => visitor.visit_drop_network_policy(stmt), + Statement::DescNetworkPolicy(stmt) => visitor.visit_desc_network_policy(stmt), + Statement::ShowNetworkPolicies => visitor.visit_show_network_policies(), } } diff --git a/src/query/ast/src/visitors/walk_mut.rs b/src/query/ast/src/visitors/walk_mut.rs index 0e3aa7d38aaf..6aadf5216f1f 100644 --- a/src/query/ast/src/visitors/walk_mut.rs +++ b/src/query/ast/src/visitors/walk_mut.rs @@ -448,5 +448,10 @@ pub fn walk_statement_mut(visitor: &mut V, statement: &mut Statem Statement::DropDatamaskPolicy(stmt) => visitor.visit_drop_data_mask_policy(stmt), Statement::DescDatamaskPolicy(stmt) => visitor.visit_desc_data_mask_policy(stmt), Statement::AttachTable(_) => {} + Statement::CreateNetworkPolicy(stmt) => visitor.visit_create_network_policy(stmt), + Statement::AlterNetworkPolicy(stmt) => visitor.visit_alter_network_policy(stmt), + Statement::DropNetworkPolicy(stmt) => visitor.visit_drop_network_policy(stmt), + Statement::DescNetworkPolicy(stmt) => visitor.visit_desc_network_policy(stmt), + Statement::ShowNetworkPolicies => visitor.visit_show_network_policies(), } } diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index 4d8f2f7845b4..f8e3bb2f8bb5 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -415,6 +415,8 @@ fn test_statement() { r#"ALTER VIRTUAL COLUMNS (a['k1']['k2'], b[0][1]) FOR t"#, r#"DROP VIRTUAL COLUMNS FOR t"#, r#"GENERATE VIRTUAL COLUMNS FOR t"#, + r#"CREATE NETWORK POLICY mypolicy ALLOWED_IP_LIST=('192.168.10.0/24') BLOCKED_IP_LIST=('192.168.10.99') COMMENT='test'"#, + r#"ALTER NETWORK POLICY mypolicy SET ALLOWED_IP_LIST=('192.168.10.0/24','192.168.255.1') BLOCKED_IP_LIST=('192.168.1.99') COMMENT='test'"#, "--各环节转各环节转各环节转各环节转各\n select 34343", "-- xxxxx\n select 34343;", ]; diff --git a/src/query/ast/tests/it/testdata/statement-error.txt b/src/query/ast/tests/it/testdata/statement-error.txt index a99a48d1ba70..3126be4e8cde 100644 --- a/src/query/ast/tests/it/testdata/statement-error.txt +++ b/src/query/ast/tests/it/testdata/statement-error.txt @@ -117,7 +117,7 @@ error: --> SQL:1:6 | 1 | drop a - | ^ expected `DATABASE`, `SCHEMA`, `TABLE`, `VIEW`, `AGGREGATING`, `VIRTUAL`, or 8 more ... + | ^ expected `DATABASE`, `SCHEMA`, `NETWORK`, `TABLE`, `VIEW`, `AGGREGATING`, or 9 more ... ---------- Input ---------- @@ -183,7 +183,7 @@ error: --> SQL:1:6 | 1 | drop usar if exists 'test-j'; - | ^^^^ expected `DATABASE`, `SCHEMA`, `TABLE`, `VIEW`, `AGGREGATING`, `VIRTUAL`, or 8 more ... + | ^^^^ expected `DATABASE`, `SCHEMA`, `NETWORK`, `TABLE`, `VIEW`, `AGGREGATING`, or 9 more ... ---------- Input ---------- @@ -283,7 +283,7 @@ error: --> SQL:1:6 | 1 | SHOW GRANT FOR ROLE role1; - | ^^^^^ expected `SETTINGS`, `STAGES`, `ENGINES`, `PROCESSLIST`, `METRICS`, `FUNCTIONS`, or 18 more ... + | ^^^^^ expected `SETTINGS`, `STAGES`, `ENGINES`, `PROCESSLIST`, `METRICS`, `FUNCTIONS`, or 19 more ... ---------- Input ---------- diff --git a/src/query/ast/tests/it/testdata/statement.txt b/src/query/ast/tests/it/testdata/statement.txt index 26dffdb0b46f..9ca442afc275 100644 --- a/src/query/ast/tests/it/testdata/statement.txt +++ b/src/query/ast/tests/it/testdata/statement.txt @@ -11680,6 +11680,57 @@ GenerateVirtualColumns( ) +---------- Input ---------- +CREATE NETWORK POLICY mypolicy ALLOWED_IP_LIST=('192.168.10.0/24') BLOCKED_IP_LIST=('192.168.10.99') COMMENT='test' +---------- Output --------- +CREATE NETWORK POLICY mypolicy ALLOWED_IP_LIST = ('192.168.10.0/24') BLOCKED_IP_LIST = ('192.168.10.99') COMMENT = 'test' +---------- AST ------------ +CreateNetworkPolicy( + CreateNetworkPolicyStmt { + if_not_exists: false, + name: "mypolicy", + allowed_ip_list: [ + "192.168.10.0/24", + ], + blocked_ip_list: Some( + [ + "192.168.10.99", + ], + ), + comment: Some( + "test", + ), + }, +) + + +---------- Input ---------- +ALTER NETWORK POLICY mypolicy SET ALLOWED_IP_LIST=('192.168.10.0/24','192.168.255.1') BLOCKED_IP_LIST=('192.168.1.99') COMMENT='test' +---------- Output --------- +ALTER NETWORK POLICY mypolicy SET ALLOWED_IP_LIST = ('192.168.10.0/24','192.168.255.1') BLOCKED_IP_LIST = ('192.168.1.99') COMMENT = 'test' +---------- AST ------------ +AlterNetworkPolicy( + AlterNetworkPolicyStmt { + if_exists: false, + name: "mypolicy", + allowed_ip_list: Some( + [ + "192.168.10.0/24", + "192.168.255.1", + ], + ), + blocked_ip_list: Some( + [ + "192.168.1.99", + ], + ), + comment: Some( + "test", + ), + }, +) + + ---------- Input ---------- --各环节转各环节转各环节转各环节转各 select 34343 diff --git a/src/query/management/src/lib.rs b/src/query/management/src/lib.rs index 6a38649b3f28..21761ec91373 100644 --- a/src/query/management/src/lib.rs +++ b/src/query/management/src/lib.rs @@ -16,6 +16,7 @@ mod cluster; mod file_format; +mod network_policy; mod quota; mod role; mod serde; @@ -28,6 +29,8 @@ pub use cluster::ClusterApi; pub use cluster::ClusterMgr; pub use file_format::FileFormatApi; pub use file_format::FileFormatMgr; +pub use network_policy::NetworkPolicyApi; +pub use network_policy::NetworkPolicyMgr; pub use quota::QuotaApi; pub use quota::QuotaMgr; pub use role::RoleApi; diff --git a/src/query/management/src/network_policy/mod.rs b/src/query/management/src/network_policy/mod.rs new file mode 100644 index 000000000000..41815ea65ee7 --- /dev/null +++ b/src/query/management/src/network_policy/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod network_policy_api; +mod network_policy_mgr; + +pub use network_policy_api::NetworkPolicyApi; +pub use network_policy_mgr::NetworkPolicyMgr; diff --git a/src/query/management/src/network_policy/network_policy_api.rs b/src/query/management/src/network_policy/network_policy_api.rs new file mode 100644 index 000000000000..92518b2c2fd7 --- /dev/null +++ b/src/query/management/src/network_policy/network_policy_api.rs @@ -0,0 +1,35 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_exception::Result; +use common_meta_app::principal::NetworkPolicy; +use common_meta_types::MatchSeq; +use common_meta_types::SeqV; + +#[async_trait::async_trait] +pub trait NetworkPolicyApi: Sync + Send { + async fn add_network_policy(&self, network_policy: NetworkPolicy) -> Result; + + async fn update_network_policy( + &self, + network_policy: NetworkPolicy, + seq: MatchSeq, + ) -> Result; + + async fn drop_network_policy(&self, name: &str, seq: MatchSeq) -> Result<()>; + + async fn get_network_policy(&self, name: &str, seq: MatchSeq) -> Result>; + + async fn get_network_policies(&self) -> Result>; +} diff --git a/src/query/management/src/network_policy/network_policy_mgr.rs b/src/query/management/src/network_policy/network_policy_mgr.rs new file mode 100644 index 000000000000..8a556d1fb57c --- /dev/null +++ b/src/query/management/src/network_policy/network_policy_mgr.rs @@ -0,0 +1,169 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_base::base::escape_for_key; +use common_exception::ErrorCode; +use common_exception::Result; +use common_meta_app::principal::NetworkPolicy; +use common_meta_kvapi::kvapi; +use common_meta_kvapi::kvapi::UpsertKVReq; +use common_meta_types::MatchSeq; +use common_meta_types::MatchSeqExt; +use common_meta_types::MetaError; +use common_meta_types::Operation; +use common_meta_types::SeqV; + +use crate::network_policy::network_policy_api::NetworkPolicyApi; +use crate::serde::deserialize_struct; +use crate::serde::serialize_struct; + +static NETWORK_POLICY_API_KEY_PREFIX: &str = "__fd_network_policies"; + +pub struct NetworkPolicyMgr { + kv_api: Arc>, + network_policy_prefix: String, +} + +impl NetworkPolicyMgr { + pub fn create( + kv_api: Arc>, + tenant: &str, + ) -> Result { + if tenant.is_empty() { + return Err(ErrorCode::TenantIsEmpty( + "Tenant can not empty (while create network policy)", + )); + } + + Ok(NetworkPolicyMgr { + kv_api, + network_policy_prefix: format!("{}/{}", NETWORK_POLICY_API_KEY_PREFIX, tenant), + }) + } + + fn make_network_policy_key(&self, name: &str) -> Result { + Ok(format!( + "{}/{}", + self.network_policy_prefix, + escape_for_key(name)? + )) + } +} + +#[async_trait::async_trait] +impl NetworkPolicyApi for NetworkPolicyMgr { + #[async_backtrace::framed] + async fn add_network_policy(&self, network_policy: NetworkPolicy) -> Result { + let match_seq = MatchSeq::Exact(0); + let key = self.make_network_policy_key(network_policy.name.as_str())?; + let value = Operation::Update(serialize_struct( + &network_policy, + ErrorCode::IllegalNetworkPolicy, + || "", + )?); + + let kv_api = self.kv_api.clone(); + let upsert_kv = kv_api.upsert_kv(UpsertKVReq::new(&key, match_seq, value, None)); + + let res = upsert_kv.await?.added_or_else(|v| { + ErrorCode::NetworkPolicyAlreadyExists(format!( + "NetworkPolicy already exists, seq [{}]", + v.seq + )) + })?; + + Ok(res.seq) + } + + #[async_backtrace::framed] + async fn update_network_policy( + &self, + network_policy: NetworkPolicy, + match_seq: MatchSeq, + ) -> Result { + let key = self.make_network_policy_key(network_policy.name.as_str())?; + let value = Operation::Update(serialize_struct( + &network_policy, + ErrorCode::IllegalNetworkPolicy, + || "", + )?); + + let kv_api = self.kv_api.clone(); + let upsert_kv = kv_api + .upsert_kv(UpsertKVReq::new(&key, match_seq, value, None)) + .await?; + + match upsert_kv.result { + Some(SeqV { seq: s, .. }) => Ok(s), + None => Err(ErrorCode::UnknownNetworkPolicy(format!( + "Unknown NetworkPolicy, or seq not match {}", + network_policy.name.clone() + ))), + } + } + + #[async_backtrace::framed] + async fn drop_network_policy(&self, name: &str, seq: MatchSeq) -> Result<()> { + let key = self.make_network_policy_key(name)?; + let kv_api = self.kv_api.clone(); + let res = kv_api + .upsert_kv(UpsertKVReq::new(&key, seq, Operation::Delete, None)) + .await?; + if res.prev.is_some() && res.result.is_none() { + Ok(()) + } else { + Err(ErrorCode::UnknownNetworkPolicy(format!( + "Unknown NetworkPolicy {}", + name + ))) + } + } + + async fn get_network_policy(&self, name: &str, seq: MatchSeq) -> Result> { + let key = self.make_network_policy_key(name)?; + let res = self.kv_api.get_kv(&key).await?; + let seq_value = res.ok_or_else(|| { + ErrorCode::UnknownNetworkPolicy(format!("Unknown NetworkPolicy {}", name)) + })?; + + match seq.match_seq(&seq_value) { + Ok(_) => Ok(SeqV::new( + seq_value.seq, + deserialize_struct(&seq_value.data, ErrorCode::IllegalNetworkPolicy, || "")?, + )), + Err(_) => Err(ErrorCode::UnknownNetworkPolicy(format!( + "Unknown NetworkPolicy {}", + name + ))), + } + } + + #[async_backtrace::framed] + async fn get_network_policies(&self) -> Result> { + let values = self + .kv_api + .prefix_list_kv(&self.network_policy_prefix) + .await?; + + let mut network_policies = Vec::with_capacity(values.len()); + for (_, value) in values { + let network_policy = + deserialize_struct(&value.data, ErrorCode::IllegalNetworkPolicy, || "")?; + network_policies.push(network_policy); + } + Ok(network_policies) + } +} diff --git a/src/query/service/src/interpreters/access/management_mode_access.rs b/src/query/service/src/interpreters/access/management_mode_access.rs index b0e709875e22..a86870af4bde 100644 --- a/src/query/service/src/interpreters/access/management_mode_access.rs +++ b/src/query/service/src/interpreters/access/management_mode_access.rs @@ -99,6 +99,10 @@ impl AccessChecker for ManagementModeAccess { // Stage. | Plan::CreateStage(_) | Plan::DropStage(_) + // Network policy. + | Plan::CreateNetworkPolicy(_) + | Plan::AlterNetworkPolicy(_) + | Plan::DropNetworkPolicy(_) // UDF | Plan::CreateUDF(_) diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index ca3a9b1d55d5..02b27dfb5abd 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -569,7 +569,12 @@ impl AccessChecker for PrivilegeAccess { | Plan::RemoveStage(_) | Plan::CreateFileFormat(_) | Plan::DropFileFormat(_) - | Plan::ShowFileFormats(_) => { + | Plan::ShowFileFormats(_) + | Plan::CreateNetworkPolicy(_) + | Plan::AlterNetworkPolicy(_) + | Plan::DropNetworkPolicy(_) + | Plan::DescNetworkPolicy(_) + | Plan::ShowNetworkPolicies(_) => { session .validate_privilege(&GrantObject::Global, vec![UserPrivilegeType::Super]) .await?; diff --git a/src/query/service/src/interpreters/interpreter_factory.rs b/src/query/service/src/interpreters/interpreter_factory.rs index 0218e6233f82..0ae8ef44e20d 100644 --- a/src/query/service/src/interpreters/interpreter_factory.rs +++ b/src/query/service/src/interpreters/interpreter_factory.rs @@ -443,6 +443,25 @@ impl InterpreterFactory { ctx, *p.clone(), )?)), + + Plan::CreateNetworkPolicy(p) => Ok(Arc::new( + CreateNetworkPolicyInterpreter::try_create(ctx, *p.clone())?, + )), + Plan::AlterNetworkPolicy(p) => Ok(Arc::new(AlterNetworkPolicyInterpreter::try_create( + ctx, + *p.clone(), + )?)), + Plan::DropNetworkPolicy(p) => Ok(Arc::new(DropNetworkPolicyInterpreter::try_create( + ctx, + *p.clone(), + )?)), + Plan::DescNetworkPolicy(p) => Ok(Arc::new(DescNetworkPolicyInterpreter::try_create( + ctx, + *p.clone(), + )?)), + Plan::ShowNetworkPolicies(p) => Ok(Arc::new( + ShowNetworkPoliciesInterpreter::try_create(ctx, *p.clone())?, + )), } } } diff --git a/src/query/service/src/interpreters/interpreter_network_policies_show.rs b/src/query/service/src/interpreters/interpreter_network_policies_show.rs new file mode 100644 index 000000000000..db7bb8a14460 --- /dev/null +++ b/src/query/service/src/interpreters/interpreter_network_policies_show.rs @@ -0,0 +1,76 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::Result; +use common_expression::types::StringType; +use common_expression::DataBlock; +use common_expression::DataSchemaRef; +use common_expression::FromData; +use common_sql::plans::ShowNetworkPoliciesPlan; +use common_users::UserApiProvider; + +use crate::interpreters::Interpreter; +use crate::pipelines::PipelineBuildResult; +use crate::sessions::QueryContext; +use crate::sessions::TableContext; + +#[derive(Debug)] +pub struct ShowNetworkPoliciesInterpreter { + ctx: Arc, + plan: ShowNetworkPoliciesPlan, +} + +impl ShowNetworkPoliciesInterpreter { + pub fn try_create(ctx: Arc, plan: ShowNetworkPoliciesPlan) -> Result { + Ok(ShowNetworkPoliciesInterpreter { ctx, plan }) + } +} + +#[async_trait::async_trait] +impl Interpreter for ShowNetworkPoliciesInterpreter { + fn name(&self) -> &str { + "ShowNetworkPoliciesInterpreter" + } + + fn schema(&self) -> DataSchemaRef { + self.plan.schema() + } + + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + let tenant = self.ctx.get_tenant(); + let user_mgr = UserApiProvider::instance(); + let network_policies = user_mgr.get_network_policies(&tenant).await?; + + let mut names = Vec::with_capacity(network_policies.len()); + let mut allowed_ip_lists = Vec::with_capacity(network_policies.len()); + let mut blocked_ip_lists = Vec::with_capacity(network_policies.len()); + let mut comments = Vec::with_capacity(network_policies.len()); + for network_policy in network_policies { + names.push(network_policy.name.as_bytes().to_vec()); + allowed_ip_lists.push(network_policy.allowed_ip_list.join(",").as_bytes().to_vec()); + blocked_ip_lists.push(network_policy.blocked_ip_list.join(",").as_bytes().to_vec()); + comments.push(network_policy.comment.as_bytes().to_vec()); + } + + PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ + StringType::from_data(names), + StringType::from_data(allowed_ip_lists), + StringType::from_data(blocked_ip_lists), + StringType::from_data(comments), + ])]) + } +} diff --git a/src/query/service/src/interpreters/interpreter_network_policy_alter.rs b/src/query/service/src/interpreters/interpreter_network_policy_alter.rs new file mode 100644 index 000000000000..a5c722e66ce6 --- /dev/null +++ b/src/query/service/src/interpreters/interpreter_network_policy_alter.rs @@ -0,0 +1,64 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::Result; +use common_sql::plans::AlterNetworkPolicyPlan; +use common_users::UserApiProvider; + +use crate::interpreters::Interpreter; +use crate::pipelines::PipelineBuildResult; +use crate::sessions::QueryContext; +use crate::sessions::TableContext; + +#[derive(Debug)] +pub struct AlterNetworkPolicyInterpreter { + ctx: Arc, + plan: AlterNetworkPolicyPlan, +} + +impl AlterNetworkPolicyInterpreter { + pub fn try_create(ctx: Arc, plan: AlterNetworkPolicyPlan) -> Result { + Ok(AlterNetworkPolicyInterpreter { ctx, plan }) + } +} + +#[async_trait::async_trait] +impl Interpreter for AlterNetworkPolicyInterpreter { + fn name(&self) -> &str { + "AlterNetworkPolicyInterpreter" + } + + #[tracing::instrument(level = "debug", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))] + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + let plan = self.plan.clone(); + let tenant = self.ctx.get_tenant(); + + let user_mgr = UserApiProvider::instance(); + user_mgr + .update_network_policy( + &tenant, + &plan.name, + plan.allowed_ip_list.clone(), + plan.blocked_ip_list.clone(), + plan.comment.clone(), + plan.if_exists, + ) + .await?; + + Ok(PipelineBuildResult::create()) + } +} diff --git a/src/query/service/src/interpreters/interpreter_network_policy_create.rs b/src/query/service/src/interpreters/interpreter_network_policy_create.rs new file mode 100644 index 000000000000..d6bcd9cd867f --- /dev/null +++ b/src/query/service/src/interpreters/interpreter_network_policy_create.rs @@ -0,0 +1,67 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use chrono::Utc; +use common_exception::Result; +use common_meta_app::principal::NetworkPolicy; +use common_sql::plans::CreateNetworkPolicyPlan; +use common_users::UserApiProvider; + +use crate::interpreters::Interpreter; +use crate::pipelines::PipelineBuildResult; +use crate::sessions::QueryContext; +use crate::sessions::TableContext; + +#[derive(Debug)] +pub struct CreateNetworkPolicyInterpreter { + ctx: Arc, + plan: CreateNetworkPolicyPlan, +} + +impl CreateNetworkPolicyInterpreter { + pub fn try_create(ctx: Arc, plan: CreateNetworkPolicyPlan) -> Result { + Ok(CreateNetworkPolicyInterpreter { ctx, plan }) + } +} + +#[async_trait::async_trait] +impl Interpreter for CreateNetworkPolicyInterpreter { + fn name(&self) -> &str { + "CreateNetworkPolicyInterpreter" + } + + #[tracing::instrument(level = "debug", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))] + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + let plan = self.plan.clone(); + let tenant = self.ctx.get_tenant(); + let user_mgr = UserApiProvider::instance(); + + let network_policy = NetworkPolicy { + name: plan.name, + allowed_ip_list: plan.allowed_ip_list, + blocked_ip_list: plan.blocked_ip_list, + comment: plan.comment, + create_on: Utc::now(), + update_on: None, + }; + user_mgr + .add_network_policy(&tenant, network_policy, plan.if_not_exists) + .await?; + + Ok(PipelineBuildResult::create()) + } +} diff --git a/src/query/service/src/interpreters/interpreter_network_policy_desc.rs b/src/query/service/src/interpreters/interpreter_network_policy_desc.rs new file mode 100644 index 000000000000..2e033dcaaf9e --- /dev/null +++ b/src/query/service/src/interpreters/interpreter_network_policy_desc.rs @@ -0,0 +1,73 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::Result; +use common_expression::types::StringType; +use common_expression::DataBlock; +use common_expression::DataSchemaRef; +use common_expression::FromData; +use common_sql::plans::DescNetworkPolicyPlan; +use common_users::UserApiProvider; + +use crate::interpreters::Interpreter; +use crate::pipelines::PipelineBuildResult; +use crate::sessions::QueryContext; +use crate::sessions::TableContext; + +#[derive(Debug)] +pub struct DescNetworkPolicyInterpreter { + ctx: Arc, + plan: DescNetworkPolicyPlan, +} + +impl DescNetworkPolicyInterpreter { + pub fn try_create(ctx: Arc, plan: DescNetworkPolicyPlan) -> Result { + Ok(DescNetworkPolicyInterpreter { ctx, plan }) + } +} + +#[async_trait::async_trait] +impl Interpreter for DescNetworkPolicyInterpreter { + fn name(&self) -> &str { + "DescNetworkPolicyInterpreter" + } + + fn schema(&self) -> DataSchemaRef { + self.plan.schema() + } + + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + let tenant = self.ctx.get_tenant(); + let user_mgr = UserApiProvider::instance(); + + let network_policy = user_mgr + .get_network_policy(&tenant, self.plan.name.as_str()) + .await?; + + let names = vec![network_policy.name.as_bytes().to_vec()]; + let allowed_ip_lists = vec![network_policy.allowed_ip_list.join(",").as_bytes().to_vec()]; + let blocked_ip_lists = vec![network_policy.blocked_ip_list.join(",").as_bytes().to_vec()]; + let comments = vec![network_policy.comment.as_bytes().to_vec()]; + + PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ + StringType::from_data(names), + StringType::from_data(allowed_ip_lists), + StringType::from_data(blocked_ip_lists), + StringType::from_data(comments), + ])]) + } +} diff --git a/src/query/service/src/interpreters/interpreter_network_policy_drop.rs b/src/query/service/src/interpreters/interpreter_network_policy_drop.rs new file mode 100644 index 000000000000..7900e89832a5 --- /dev/null +++ b/src/query/service/src/interpreters/interpreter_network_policy_drop.rs @@ -0,0 +1,57 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::Result; +use common_sql::plans::DropNetworkPolicyPlan; +use common_users::UserApiProvider; + +use crate::interpreters::Interpreter; +use crate::pipelines::PipelineBuildResult; +use crate::sessions::QueryContext; +use crate::sessions::TableContext; + +#[derive(Debug)] +pub struct DropNetworkPolicyInterpreter { + ctx: Arc, + plan: DropNetworkPolicyPlan, +} + +impl DropNetworkPolicyInterpreter { + pub fn try_create(ctx: Arc, plan: DropNetworkPolicyPlan) -> Result { + Ok(DropNetworkPolicyInterpreter { ctx, plan }) + } +} + +#[async_trait::async_trait] +impl Interpreter for DropNetworkPolicyInterpreter { + fn name(&self) -> &str { + "DropNetworkPolicyInterpreter" + } + + #[tracing::instrument(level = "debug", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))] + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + let plan = self.plan.clone(); + let tenant = self.ctx.get_tenant(); + + let user_mgr = UserApiProvider::instance(); + user_mgr + .drop_network_policy(&tenant, plan.name.as_str(), plan.if_exists) + .await?; + + Ok(PipelineBuildResult::create()) + } +} diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index f4cd18b7aca9..54f83b747b3c 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -42,6 +42,11 @@ mod interpreter_index_refresh; mod interpreter_insert; mod interpreter_kill; mod interpreter_metrics; +mod interpreter_network_policies_show; +mod interpreter_network_policy_alter; +mod interpreter_network_policy_create; +mod interpreter_network_policy_desc; +mod interpreter_network_policy_drop; mod interpreter_presign; mod interpreter_privilege_grant; mod interpreter_privilege_revoke; @@ -129,6 +134,11 @@ pub use interpreter_index_refresh::RefreshIndexInterpreter; pub use interpreter_insert::InsertInterpreter; pub use interpreter_kill::KillInterpreter; pub use interpreter_metrics::InterpreterMetrics; +pub use interpreter_network_policies_show::ShowNetworkPoliciesInterpreter; +pub use interpreter_network_policy_alter::AlterNetworkPolicyInterpreter; +pub use interpreter_network_policy_create::CreateNetworkPolicyInterpreter; +pub use interpreter_network_policy_desc::DescNetworkPolicyInterpreter; +pub use interpreter_network_policy_drop::DropNetworkPolicyInterpreter; pub use interpreter_privilege_grant::GrantPrivilegeInterpreter; pub use interpreter_privilege_revoke::RevokePrivilegeInterpreter; pub use interpreter_query_log::InterpreterQueryLog; diff --git a/src/query/sql/Cargo.toml b/src/query/sql/Cargo.toml index d2554bf7ad1b..0013834f1a87 100644 --- a/src/query/sql/Cargo.toml +++ b/src/query/sql/Cargo.toml @@ -54,6 +54,7 @@ async-recursion = "1.0.0" async-trait = { version = "0.1.57", package = "async-trait-fn" } chrono = { workspace = true } chrono-tz = { workspace = true } +cidr = { version = "0.2.2" } ctor = "0.1.26" dashmap = "5.4" educe = "0.4" diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index c6851f633ba3..f84220630b14 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -523,6 +523,21 @@ impl<'a> Binder { Statement::DescDatamaskPolicy(stmt) => { self.bind_desc_data_mask_policy(stmt).await? } + Statement::CreateNetworkPolicy(stmt) => { + self.bind_create_network_policy(stmt).await? + } + Statement::AlterNetworkPolicy(stmt) => { + self.bind_alter_network_policy(stmt).await? + } + Statement::DropNetworkPolicy(stmt) => { + self.bind_drop_network_policy(stmt).await? + } + Statement::DescNetworkPolicy(stmt) => { + self.bind_desc_network_policy(stmt).await? + } + Statement::ShowNetworkPolicies => { + self.bind_show_network_policies().await? + } }; Ok(plan) } diff --git a/src/query/sql/src/planner/binder/ddl/mod.rs b/src/query/sql/src/planner/binder/ddl/mod.rs index a76e462a71f8..f665c0ae696c 100644 --- a/src/query/sql/src/planner/binder/ddl/mod.rs +++ b/src/query/sql/src/planner/binder/ddl/mod.rs @@ -18,6 +18,7 @@ mod column; mod data_mask; mod database; mod index; +mod network_policy; mod role; mod share; mod stage; diff --git a/src/query/sql/src/planner/binder/ddl/network_policy.rs b/src/query/sql/src/planner/binder/ddl/network_policy.rs new file mode 100644 index 000000000000..2a86218d31b6 --- /dev/null +++ b/src/query/sql/src/planner/binder/ddl/network_policy.rs @@ -0,0 +1,153 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use cidr::Ipv4Cidr; +use common_ast::ast::*; +use common_exception::ErrorCode; +use common_exception::Result; + +use crate::binder::Binder; +use crate::plans::AlterNetworkPolicyPlan; +use crate::plans::CreateNetworkPolicyPlan; +use crate::plans::DescNetworkPolicyPlan; +use crate::plans::DropNetworkPolicyPlan; +use crate::plans::Plan; +use crate::plans::ShowNetworkPoliciesPlan; + +impl Binder { + #[async_backtrace::framed] + pub(in crate::planner::binder) async fn bind_create_network_policy( + &mut self, + stmt: &CreateNetworkPolicyStmt, + ) -> Result { + let CreateNetworkPolicyStmt { + if_not_exists, + name, + allowed_ip_list, + blocked_ip_list, + comment, + } = stmt; + + for ip in allowed_ip_list { + if ip.parse::().is_err() { + return Err(ErrorCode::SemanticError(format!( + "invalid ip address {}", + ip + ))); + } + } + if let Some(blocked_ip_list) = blocked_ip_list { + for ip in blocked_ip_list { + if ip.parse::().is_err() { + return Err(ErrorCode::SemanticError(format!( + "invalid ip address {}", + ip + ))); + } + } + } + + let tenant = self.ctx.get_tenant(); + let plan = CreateNetworkPolicyPlan { + if_not_exists: *if_not_exists, + tenant, + name: name.to_string(), + allowed_ip_list: allowed_ip_list.clone(), + blocked_ip_list: blocked_ip_list.clone().unwrap_or_default(), + comment: comment.clone().unwrap_or_default(), + }; + Ok(Plan::CreateNetworkPolicy(Box::new(plan))) + } + + #[async_backtrace::framed] + pub(in crate::planner::binder) async fn bind_alter_network_policy( + &mut self, + stmt: &AlterNetworkPolicyStmt, + ) -> Result { + let AlterNetworkPolicyStmt { + if_exists, + name, + allowed_ip_list, + blocked_ip_list, + comment, + } = stmt; + + if let Some(allowed_ip_list) = allowed_ip_list { + for ip in allowed_ip_list { + if ip.parse::().is_err() { + return Err(ErrorCode::SemanticError(format!( + "invalid ip address {}", + ip + ))); + } + } + } + if let Some(blocked_ip_list) = blocked_ip_list { + for ip in blocked_ip_list { + if ip.parse::().is_err() { + return Err(ErrorCode::SemanticError(format!( + "invalid ip address {}", + ip + ))); + } + } + } + + let tenant = self.ctx.get_tenant(); + let plan = AlterNetworkPolicyPlan { + if_exists: *if_exists, + tenant, + name: name.to_string(), + allowed_ip_list: allowed_ip_list.clone(), + blocked_ip_list: blocked_ip_list.clone(), + comment: comment.clone(), + }; + Ok(Plan::AlterNetworkPolicy(Box::new(plan))) + } + + #[async_backtrace::framed] + pub(in crate::planner::binder) async fn bind_drop_network_policy( + &mut self, + stmt: &DropNetworkPolicyStmt, + ) -> Result { + let DropNetworkPolicyStmt { if_exists, name } = stmt; + + let tenant = self.ctx.get_tenant(); + let plan = DropNetworkPolicyPlan { + if_exists: *if_exists, + tenant, + name: name.to_string(), + }; + Ok(Plan::DropNetworkPolicy(Box::new(plan))) + } + + #[async_backtrace::framed] + pub(in crate::planner::binder) async fn bind_desc_network_policy( + &mut self, + stmt: &DescNetworkPolicyStmt, + ) -> Result { + let DescNetworkPolicyStmt { name } = stmt; + + let plan = DescNetworkPolicyPlan { + name: name.to_string(), + }; + Ok(Plan::DescNetworkPolicy(Box::new(plan))) + } + + #[async_backtrace::framed] + pub(in crate::planner::binder) async fn bind_show_network_policies(&mut self) -> Result { + let plan = ShowNetworkPoliciesPlan {}; + Ok(Plan::ShowNetworkPolicies(Box::new(plan))) + } +} diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index 3f2164f5fdfa..da144f2ed5ea 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -150,7 +150,6 @@ impl Plan { Plan::AlterUser(alter_user) => Ok(format!("{:?}", alter_user)), Plan::CreateRole(create_role) => Ok(format!("{:?}", create_role)), Plan::DropRole(drop_role) => Ok(format!("{:?}", drop_role)), - Plan::Presign(presign) => Ok(format!("{:?}", presign)), Plan::SetVariable(p) => Ok(format!("{:?}", p)), @@ -178,6 +177,13 @@ impl Plan { Plan::CreateDatamaskPolicy(p) => Ok(format!("{:?}", p)), Plan::DropDatamaskPolicy(p) => Ok(format!("{:?}", p)), Plan::DescDatamaskPolicy(p) => Ok(format!("{:?}", p)), + + // network policy + Plan::CreateNetworkPolicy(p) => Ok(format!("{:?}", p)), + Plan::AlterNetworkPolicy(p) => Ok(format!("{:?}", p)), + Plan::DropNetworkPolicy(p) => Ok(format!("{:?}", p)), + Plan::DescNetworkPolicy(p) => Ok(format!("{:?}", p)), + Plan::ShowNetworkPolicies(p) => Ok(format!("{:?}", p)), } } } diff --git a/src/query/sql/src/planner/plans/ddl/account.rs b/src/query/sql/src/planner/plans/ddl/account.rs index 3f1825b74cda..04d4e55398cc 100644 --- a/src/query/sql/src/planner/plans/ddl/account.rs +++ b/src/query/sql/src/planner/plans/ddl/account.rs @@ -114,3 +114,78 @@ pub struct RevokePrivilegePlan { pub priv_types: UserPrivilegeSet, pub on: GrantObject, } + +#[derive(Clone, Debug, PartialEq)] +pub struct CreateNetworkPolicyPlan { + pub if_not_exists: bool, + pub tenant: String, + pub name: String, + pub allowed_ip_list: Vec, + pub blocked_ip_list: Vec, + pub comment: String, +} + +impl CreateNetworkPolicyPlan { + pub fn schema(&self) -> DataSchemaRef { + DataSchemaRefExt::create(vec![]) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct AlterNetworkPolicyPlan { + pub if_exists: bool, + pub tenant: String, + pub name: String, + pub allowed_ip_list: Option>, + pub blocked_ip_list: Option>, + pub comment: Option, +} + +impl AlterNetworkPolicyPlan { + pub fn schema(&self) -> DataSchemaRef { + DataSchemaRefExt::create(vec![]) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct DropNetworkPolicyPlan { + pub if_exists: bool, + pub tenant: String, + pub name: String, +} + +impl DropNetworkPolicyPlan { + pub fn schema(&self) -> DataSchemaRef { + DataSchemaRefExt::create(vec![]) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct DescNetworkPolicyPlan { + pub name: String, +} + +impl DescNetworkPolicyPlan { + pub fn schema(&self) -> DataSchemaRef { + DataSchemaRefExt::create(vec![ + DataField::new("Name", DataType::String), + DataField::new("Allowed Ip List", DataType::String), + DataField::new("Blocked Ip List", DataType::String), + DataField::new("Comment", DataType::String), + ]) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct ShowNetworkPoliciesPlan {} + +impl ShowNetworkPoliciesPlan { + pub fn schema(&self) -> DataSchemaRef { + DataSchemaRefExt::create(vec![ + DataField::new("Name", DataType::String), + DataField::new("Allowed Ip List", DataType::String), + DataField::new("Blocked Ip List", DataType::String), + DataField::new("Comment", DataType::String), + ]) + } +} diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index c6a5280247a6..39e008d10b6c 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -51,6 +51,7 @@ use crate::plans::share::ShowGrantTenantsOfSharePlan; use crate::plans::share::ShowObjectGrantPrivilegesPlan; use crate::plans::share::ShowSharesPlan; use crate::plans::AddTableColumnPlan; +use crate::plans::AlterNetworkPolicyPlan; use crate::plans::AlterTableClusterKeyPlan; use crate::plans::AlterUDFPlan; use crate::plans::AlterUserPlan; @@ -61,6 +62,7 @@ use crate::plans::CallPlan; use crate::plans::CreateCatalogPlan; use crate::plans::CreateDatabasePlan; use crate::plans::CreateFileFormatPlan; +use crate::plans::CreateNetworkPolicyPlan; use crate::plans::CreateRolePlan; use crate::plans::CreateStagePlan; use crate::plans::CreateTablePlan; @@ -69,10 +71,12 @@ use crate::plans::CreateUserPlan; use crate::plans::CreateViewPlan; use crate::plans::CreateVirtualColumnsPlan; use crate::plans::DeletePlan; +use crate::plans::DescNetworkPolicyPlan; use crate::plans::DescribeTablePlan; use crate::plans::DropCatalogPlan; use crate::plans::DropDatabasePlan; use crate::plans::DropFileFormatPlan; +use crate::plans::DropNetworkPolicyPlan; use crate::plans::DropRolePlan; use crate::plans::DropStagePlan; use crate::plans::DropTableClusterKeyPlan; @@ -103,6 +107,7 @@ use crate::plans::ShowCreateDatabasePlan; use crate::plans::ShowCreateTablePlan; use crate::plans::ShowFileFormatsPlan; use crate::plans::ShowGrantsPlan; +use crate::plans::ShowNetworkPoliciesPlan; use crate::plans::ShowRolesPlan; use crate::plans::ShowShareEndpointPlan; use crate::plans::TruncateTablePlan; @@ -262,6 +267,13 @@ pub enum Plan { CreateDatamaskPolicy(Box), DropDatamaskPolicy(Box), DescDatamaskPolicy(Box), + + // Network policy + CreateNetworkPolicy(Box), + AlterNetworkPolicy(Box), + DropNetworkPolicy(Box), + DescNetworkPolicy(Box), + ShowNetworkPolicies(Box), } #[derive(Clone, Debug)] @@ -396,6 +408,11 @@ impl Display for Plan { Plan::SetOptions(..) => { write!(f, "SetOptions") } + Plan::CreateNetworkPolicy(_) => write!(f, "CreateNetworkPolicy"), + Plan::AlterNetworkPolicy(_) => write!(f, "AlterNetworkPolicy"), + Plan::DropNetworkPolicy(_) => write!(f, "DropNetworkPolicy"), + Plan::DescNetworkPolicy(_) => write!(f, "DescNetworkPolicy"), + Plan::ShowNetworkPolicies(_) => write!(f, "ShowNetworkPolicies"), } } } @@ -440,6 +457,11 @@ impl Plan { Plan::CreateDatamaskPolicy(plan) => plan.schema(), Plan::DropDatamaskPolicy(plan) => plan.schema(), Plan::DescDatamaskPolicy(plan) => plan.schema(), + Plan::CreateNetworkPolicy(plan) => plan.schema(), + Plan::AlterNetworkPolicy(plan) => plan.schema(), + Plan::DropNetworkPolicy(plan) => plan.schema(), + Plan::DescNetworkPolicy(plan) => plan.schema(), + Plan::ShowNetworkPolicies(plan) => plan.schema(), other => { debug_assert!(!other.has_result_set()); Arc::new(DataSchema::empty()) @@ -471,6 +493,8 @@ impl Plan { | Plan::VacuumTable(_) | Plan::VacuumDropTable(_) | Plan::DescDatamaskPolicy(_) + | Plan::DescNetworkPolicy(_) + | Plan::ShowNetworkPolicies(_) ) } } diff --git a/src/query/users/Cargo.toml b/src/query/users/Cargo.toml index 5ac20d479e19..cab065ca54e6 100644 --- a/src/query/users/Cargo.toml +++ b/src/query/users/Cargo.toml @@ -29,6 +29,7 @@ common-meta-types = { path = "../../meta/types" } # Crates.io dependencies async-backtrace = { workspace = true } base64 = "0.21" +chrono = { workspace = true } jwt-simple = "0.11" p256 = "0.13" parking_lot = "0.12.1" diff --git a/src/query/users/src/lib.rs b/src/query/users/src/lib.rs index f17ce5de700a..a12a61fc6a6f 100644 --- a/src/query/users/src/lib.rs +++ b/src/query/users/src/lib.rs @@ -31,6 +31,7 @@ extern crate core; mod jwt; +mod network_policy; mod role_mgr; mod user; mod user_api; diff --git a/src/query/users/src/network_policy.rs b/src/query/users/src/network_policy.rs new file mode 100644 index 000000000000..2e166d91b3ef --- /dev/null +++ b/src/query/users/src/network_policy.rs @@ -0,0 +1,154 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::Utc; +use common_exception::ErrorCode; +use common_exception::Result; +use common_management::NetworkPolicyApi; +use common_meta_app::principal::NetworkPolicy; +use common_meta_types::MatchSeq; + +use crate::UserApiProvider; + +impl UserApiProvider { + // Add a new network policy. + #[async_backtrace::framed] + pub async fn add_network_policy( + &self, + tenant: &str, + network_policy: NetworkPolicy, + if_not_exists: bool, + ) -> Result { + if if_not_exists + && self + .exists_network_policy(tenant, network_policy.name.as_str()) + .await? + { + return Ok(0); + } + + let client = self.get_network_policy_api_client(tenant)?; + let add_network_policy = client.add_network_policy(network_policy); + match add_network_policy.await { + Ok(res) => Ok(res), + Err(e) => { + if if_not_exists && e.code() == ErrorCode::NETWORK_POLICY_ALREADY_EXISTS { + Ok(0) + } else { + Err(e.add_message_back("(while add network policy)")) + } + } + } + } + + // Update network policy. + #[async_backtrace::framed] + pub async fn update_network_policy( + &self, + tenant: &str, + name: &str, + allowed_ip_list: Option>, + blocked_ip_list: Option>, + comment: Option, + if_exists: bool, + ) -> Result> { + let client = self.get_network_policy_api_client(tenant)?; + let seq_network_policy = match client.get_network_policy(name, MatchSeq::GE(0)).await { + Ok(seq_network_policy) => seq_network_policy, + Err(e) => { + if if_exists && e.code() == ErrorCode::UNKNOWN_NETWORK_POLICY { + return Ok(None); + } else { + return Err(e.add_message_back(" (while alter network policy)")); + } + } + }; + + let seq = seq_network_policy.seq; + let mut network_policy = seq_network_policy.data; + if let Some(allowed_ip_list) = allowed_ip_list { + network_policy.allowed_ip_list = allowed_ip_list; + } + if let Some(blocked_ip_list) = blocked_ip_list { + network_policy.blocked_ip_list = blocked_ip_list; + } + if let Some(comment) = comment { + network_policy.comment = comment; + } + network_policy.update_on = Some(Utc::now()); + + match client + .update_network_policy(network_policy, MatchSeq::Exact(seq)) + .await + { + Ok(res) => Ok(Some(res)), + Err(e) => Err(e.add_message_back(" (while alter network policy).")), + } + } + + // Drop a network policy by name. + #[async_backtrace::framed] + pub async fn drop_network_policy( + &self, + tenant: &str, + name: &str, + if_exists: bool, + ) -> Result<()> { + let client = self.get_network_policy_api_client(tenant)?; + match client.drop_network_policy(name, MatchSeq::GE(1)).await { + Ok(res) => Ok(res), + Err(e) => { + if if_exists && e.code() == ErrorCode::UNKNOWN_NETWORK_POLICY { + Ok(()) + } else { + Err(e.add_message_back(" (while drop network policy)")) + } + } + } + } + + // Check whether a network policy is exist. + #[async_backtrace::framed] + pub async fn exists_network_policy(&self, tenant: &str, name: &str) -> Result { + match self.get_network_policy(tenant, name).await { + Ok(_) => Ok(true), + Err(e) => { + if e.code() == ErrorCode::UNKNOWN_NETWORK_POLICY { + Ok(false) + } else { + Err(e) + } + } + } + } + + // Get a network_policy by tenant. + #[async_backtrace::framed] + pub async fn get_network_policy(&self, tenant: &str, name: &str) -> Result { + let client = self.get_network_policy_api_client(tenant)?; + let network_policy = client.get_network_policy(name, MatchSeq::GE(0)).await?.data; + Ok(network_policy) + } + + // Get all network policies by tenant. + #[async_backtrace::framed] + pub async fn get_network_policies(&self, tenant: &str) -> Result> { + let client = self.get_network_policy_api_client(tenant)?; + let network_policies = client + .get_network_policies() + .await + .map_err(|e| e.add_message_back(" (while get network policies)."))?; + Ok(network_policies) + } +} diff --git a/src/query/users/src/user_api.rs b/src/query/users/src/user_api.rs index e1c3b861d85d..3292857cb99d 100644 --- a/src/query/users/src/user_api.rs +++ b/src/query/users/src/user_api.rs @@ -20,6 +20,8 @@ use common_exception::Result; use common_grpc::RpcClientConf; use common_management::FileFormatApi; use common_management::FileFormatMgr; +use common_management::NetworkPolicyApi; +use common_management::NetworkPolicyMgr; use common_management::QuotaApi; use common_management::QuotaMgr; use common_management::RoleApi; @@ -118,6 +120,16 @@ impl UserApiProvider { Ok(Arc::new(SettingMgr::create(self.client.clone(), tenant)?)) } + pub fn get_network_policy_api_client( + &self, + tenant: &str, + ) -> Result> { + Ok(Arc::new(NetworkPolicyMgr::create( + self.client.clone(), + tenant, + )?)) + } + pub fn get_meta_store_client(&self) -> Arc { Arc::new(self.meta.clone()) } diff --git a/tests/sqllogictests/suites/base/05_ddl/05_0032_ddl_network_policy b/tests/sqllogictests/suites/base/05_ddl/05_0032_ddl_network_policy new file mode 100644 index 000000000000..adea15818fb1 --- /dev/null +++ b/tests/sqllogictests/suites/base/05_ddl/05_0032_ddl_network_policy @@ -0,0 +1,50 @@ +statement ok +DROP NETWORK POLICY IF EXISTS test_policy + +statement ok +DROP NETWORK POLICY IF EXISTS test_policy1 + +statement error 2207 +DROP NETWORK POLICY test_policy + +statement ok +CREATE NETWORK POLICY test_policy ALLOWED_IP_LIST=('192.168.1.0/24') BLOCKED_IP_LIST=('192.168.1.99') COMMENT='test comment' + +query TTTT +DESC NETWORK POLICY test_policy +---- +test_policy 192.168.1.0/24 192.168.1.99 test comment + +statement ok +CREATE NETWORK POLICY test_policy1 ALLOWED_IP_LIST=('192.168.100.0/24') + +query TTTT +SHOW NETWORK POLICIES +---- +test_policy 192.168.1.0/24 192.168.1.99 test comment +test_policy1 192.168.100.0/24 (empty) (empty) + +statement ok +ALTER NETWORK POLICY test_policy SET BLOCKED_IP_LIST=('192.168.1.10') + +query TTTT +DESC NETWORK POLICY test_policy +---- +test_policy 192.168.1.0/24 192.168.1.10 test comment + +statement ok +ALTER NETWORK POLICY test_policy SET ALLOWED_IP_LIST=('192.168.10.0', '192.168.20.0') BLOCKED_IP_LIST=() COMMENT='new comment' + +query TTTT +DESC NETWORK POLICY test_policy +---- +test_policy 192.168.10.0,192.168.20.0 (empty) new comment + +statement ok +DROP NETWORK POLICY test_policy + +statement error 2207 +DROP NETWORK POLICY test_policy + +statement error 2207 +DESC NETWORK POLICY test_policy