Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 102 additions & 0 deletions src/query/service/src/physical_plans/format/format_secure_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_ast::ast::FormatTreeNode;
use databend_common_exception::Result;
use databend_common_functions::BUILTIN_FUNCTIONS;
use itertools::Itertools;

use crate::physical_plans::format::append_output_rows_info;
use crate::physical_plans::format::format_output_columns;
use crate::physical_plans::format::plan_stats_info_to_format_tree;
use crate::physical_plans::format::FormatContext;
use crate::physical_plans::format::PhysicalFormat;
use crate::physical_plans::IPhysicalPlan;
use crate::physical_plans::PhysicalPlanMeta;
use crate::physical_plans::SecureFilter;

pub struct SecureFilterFormatter<'a> {
inner: &'a SecureFilter,
}

impl<'a> SecureFilterFormatter<'a> {
pub fn create(inner: &'a SecureFilter) -> Box<dyn PhysicalFormat + 'a> {
Box::new(SecureFilterFormatter { inner })
}
}

impl<'a> PhysicalFormat for SecureFilterFormatter<'a> {
fn get_meta(&self) -> &PhysicalPlanMeta {
self.inner.get_meta()
}

#[recursive::recursive]
fn format(&self, ctx: &mut FormatContext<'_>) -> Result<FormatTreeNode<String>> {
let filter = self
.inner
.predicates
.iter()
.map(|pred| format!("SECURE: {}", pred.as_expr(&BUILTIN_FUNCTIONS).sql_display()))
.join(", ");

let mut node_children = vec![
FormatTreeNode::new(format!(
"output columns: [{}]",
format_output_columns(self.inner.output_schema()?, ctx.metadata, true)
)),
FormatTreeNode::new(format!("secure filters: [{filter}]")),
];

if let Some(info) = &self.inner.stat_info {
node_children.extend(plan_stats_info_to_format_tree(info));
}

let input_formatter = self.inner.input.formatter()?;
node_children.push(input_formatter.dispatch(ctx)?);

Ok(FormatTreeNode::with_children(
"SecureFilter".to_string(),
node_children,
))
}

#[recursive::recursive]
fn format_join(&self, ctx: &mut FormatContext<'_>) -> Result<FormatTreeNode<String>> {
self.inner.input.formatter()?.format_join(ctx)
}

#[recursive::recursive]
fn partial_format(&self, ctx: &mut FormatContext<'_>) -> Result<FormatTreeNode<String>> {
let filter = self
.inner
.predicates
.iter()
.map(|pred| format!("SECURE: {}", pred.as_expr(&BUILTIN_FUNCTIONS).sql_display()))
.join(", ");
let mut children = vec![FormatTreeNode::new(format!("secure filters: [{filter}]"))];
if let Some(info) = &self.inner.stat_info {
let items = plan_stats_info_to_format_tree(info);
children.extend(items);
}

append_output_rows_info(&mut children, &ctx.profs, self.inner.get_id());
let input_formatter = self.inner.input.formatter()?;
children.push(input_formatter.partial_format(ctx)?);

Ok(FormatTreeNode::with_children(
"SecureFilter".to_string(),
children,
))
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/physical_plans/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod format_project_set;
mod format_range_join;
mod format_replace_into;
mod format_row_fetch;
mod format_secure_filter;
mod format_shuffle;
mod format_sort;
mod format_table_scan;
Expand Down Expand Up @@ -96,6 +97,7 @@ pub use format_project_set::*;
pub use format_range_join::*;
pub use format_replace_into::*;
pub use format_row_fetch::*;
pub use format_secure_filter::*;
pub use format_shuffle::*;
pub use format_sort::*;
pub use format_table_scan::*;
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/physical_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ mod physical_replace_async_source;
mod physical_replace_deduplicate;
mod physical_replace_into;
mod physical_row_fetch;
mod physical_secure_filter;
mod physical_sort;
mod physical_table_scan;
mod physical_udf;
Expand Down Expand Up @@ -101,6 +102,7 @@ pub use physical_replace_async_source::ReplaceAsyncSourcer;
pub use physical_replace_deduplicate::*;
pub use physical_replace_into::ReplaceInto;
pub use physical_row_fetch::RowFetch;
pub use physical_secure_filter::SecureFilter;
pub use physical_sequence::*;
pub use physical_sort::Sort;
pub use physical_table_scan::TableScan;
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/src/physical_plans/physical_plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl PhysicalPlanBuilder {
RelOperator::Filter(filter) => {
self.build_filter(s_expr, filter, required, stat_info).await
}
RelOperator::SecureFilter(secure_filter) => {
self.build_secure_filter(s_expr, secure_filter, required, stat_info)
.await
}
RelOperator::Aggregate(agg) => {
self.build_aggregate(s_expr, agg, required, stat_info).await
}
Expand Down
188 changes: 188 additions & 0 deletions src/query/service/src/physical_plans/physical_secure_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::collections::HashMap;

use databend_common_catalog::plan::DataSourcePlan;
use databend_common_exception::Result;
use databend_common_expression::ConstantFolder;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;
use databend_common_expression::RemoteExpr;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_sql::executor::cast_expr_to_non_null_boolean;
use databend_common_sql::optimizer::ir::SExpr;
use databend_common_sql::ColumnSet;
use databend_common_sql::TypeCheck;

use crate::physical_plans::explain::PlanStatsInfo;
use crate::physical_plans::format::PhysicalFormat;
use crate::physical_plans::format::SecureFilterFormatter;
use crate::physical_plans::physical_plan::IPhysicalPlan;
use crate::physical_plans::physical_plan::PhysicalPlan;
use crate::physical_plans::physical_plan::PhysicalPlanMeta;
use crate::physical_plans::PhysicalPlanBuilder;
use crate::pipelines::PipelineBuilder;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct SecureFilter {
meta: PhysicalPlanMeta,
pub projections: ColumnSet,
pub input: PhysicalPlan,
// Assumption: expression's data type must be `DataType::Boolean`.
pub predicates: Vec<RemoteExpr>,

// Only used for explain
pub stat_info: Option<PlanStatsInfo>,
}

#[typetag::serde]
impl IPhysicalPlan for SecureFilter {
fn as_any(&self) -> &dyn Any {
self
}
fn get_meta(&self) -> &PhysicalPlanMeta {
&self.meta
}

fn get_meta_mut(&mut self) -> &mut PhysicalPlanMeta {
&mut self.meta
}

#[recursive::recursive]
fn output_schema(&self) -> Result<DataSchemaRef> {
let input_schema = self.input.output_schema()?;
let mut fields = Vec::with_capacity(self.projections.len());
for (i, field) in input_schema.fields().iter().enumerate() {
if self.projections.contains(&i) {
fields.push(field.clone());
}
}
Ok(DataSchemaRefExt::create(fields))
}

fn children<'a>(&'a self) -> Box<dyn Iterator<Item = &'a PhysicalPlan> + 'a> {
Box::new(std::iter::once(&self.input))
}

fn children_mut<'a>(&'a mut self) -> Box<dyn Iterator<Item = &'a mut PhysicalPlan> + 'a> {
Box::new(std::iter::once(&mut self.input))
}

#[recursive::recursive]
fn formatter(&self) -> Result<Box<dyn PhysicalFormat + '_>> {
Ok(SecureFilterFormatter::create(self))
}

#[recursive::recursive]
fn try_find_single_data_source(&self) -> Option<&DataSourcePlan> {
self.input.try_find_single_data_source()
}

fn get_desc(&self) -> Result<String> {
Ok(match self.predicates.is_empty() {
true => String::new(),
false => format!(
"SECURE: {}",
self.predicates[0].as_expr(&BUILTIN_FUNCTIONS).sql_display()
),
})
}

fn get_labels(&self) -> Result<HashMap<String, Vec<String>>> {
Ok(HashMap::from([(
String::from("Secure Filter condition"),
self.predicates
.iter()
.map(|x| format!("SECURE: {}", x.as_expr(&BUILTIN_FUNCTIONS).sql_display()))
.collect(),
)]))
}

fn derive(&self, mut children: Vec<PhysicalPlan>) -> PhysicalPlan {
assert_eq!(children.len(), 1);
let input = children.pop().unwrap();
PhysicalPlan::new(SecureFilter {
meta: self.meta.clone(),
projections: self.projections.clone(),
input,
predicates: self.predicates.clone(),
stat_info: self.stat_info.clone(),
})
}

fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> {
self.input.build_pipeline(builder)?;

// SecureFilter uses the same transform as regular Filter for execution
// The security aspect is handled at plan level (stats suppression) and binding level
builder.main_pipeline.add_transform(
builder.filter_transform_builder(&self.predicates, self.projections.clone())?,
)
}
}

impl PhysicalPlanBuilder {
pub async fn build_secure_filter(
&mut self,
s_expr: &SExpr,
secure_filter: &databend_common_sql::plans::SecureFilter,
mut required: ColumnSet,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
// 1. Prune unused Columns.
let used = secure_filter
.predicates
.iter()
.fold(required.clone(), |acc, v| {
acc.union(&v.used_columns()).cloned().collect()
});

// 2. Build physical plan.
let input = self.build(s_expr.child(0)?, used).await?;
required = required
.union(self.metadata.read().get_retained_column())
.cloned()
.collect();
let column_projections = required.clone().into_iter().collect::<Vec<_>>();
let input_schema = input.output_schema()?;
let mut projections = ColumnSet::new();
for column in column_projections.iter() {
if let Some((index, _)) = input_schema.column_with_name(&column.to_string()) {
projections.insert(index);
}
}

Ok(PhysicalPlan::new(SecureFilter {
meta: PhysicalPlanMeta::new("SecureFilter"),
projections,
input,
predicates: secure_filter
.predicates
.iter()
.map(|scalar| {
let expr = scalar
.type_check(input_schema.as_ref())?
.project_column_ref(|index| input_schema.index_of(&index.to_string()))?;
let expr = cast_expr_to_non_null_boolean(expr)?;
let (expr, _) = ConstantFolder::fold(&expr, &self.func_ctx, &BUILTIN_FUNCTIONS);
Ok(expr.as_remote_expr())
})
.collect::<Result<_>>()?,

stat_info: Some(stat_info),
}))
}
}
1 change: 1 addition & 0 deletions src/query/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ databend-common-storage = { workspace = true }
databend-common-storages-basic = { workspace = true }
databend-common-users = { workspace = true }
databend-enterprise-data-mask-feature = { workspace = true }
databend-enterprise-row-access-policy-feature = { workspace = true }
databend-storages-common-cache = { workspace = true }
databend-storages-common-stage = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
Expand Down
Loading