Skip to content

Commit

Permalink
feat: add permission check for sub tables in table partition (#541)
Browse files Browse the repository at this point in the history
* draft

* add tests.

* address CR.
  • Loading branch information
Rachelint authored Jan 17, 2023
1 parent 9e42246 commit 89dca64
Show file tree
Hide file tree
Showing 15 changed files with 327 additions and 58 deletions.
14 changes: 14 additions & 0 deletions interpreters/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct Context {
deadline: Option<Instant>,
default_catalog: String,
default_schema: String,
enable_partition_table_access: bool,
}

impl Context {
Expand All @@ -31,6 +32,7 @@ impl Context {
deadline,
default_catalog: String::new(),
default_schema: String::new(),
enable_partition_table_access: false,
}
}

Expand Down Expand Up @@ -59,6 +61,11 @@ impl Context {
pub fn request_id(&self) -> RequestId {
self.request_id
}

#[inline]
pub fn enable_partition_table_access(&self) -> bool {
self.enable_partition_table_access
}
}

#[must_use]
Expand All @@ -67,6 +74,7 @@ pub struct Builder {
deadline: Option<Instant>,
default_catalog: String,
default_schema: String,
enable_partition_table_access: bool,
}

impl Builder {
Expand All @@ -76,12 +84,18 @@ impl Builder {
self
}

pub fn enable_partition_table_access(mut self, enable_partition_table_access: bool) -> Self {
self.enable_partition_table_access = enable_partition_table_access;
self
}

pub fn build(self) -> Context {
Context {
request_id: self.request_id,
deadline: self.deadline,
default_catalog: self.default_catalog,
default_schema: self.default_schema,
enable_partition_table_access: self.enable_partition_table_access,
}
}
}
30 changes: 23 additions & 7 deletions interpreters/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@ use sql::plan::Plan;
use table_engine::engine::TableEngineRef;

use crate::{
alter_table::AlterTableInterpreter, context::Context, create::CreateInterpreter,
describe::DescribeInterpreter, drop::DropInterpreter, exists::ExistsInterpreter,
insert::InsertInterpreter, interpreter::InterpreterPtr, select::SelectInterpreter,
show::ShowInterpreter, table_manipulator::TableManipulatorRef,
alter_table::AlterTableInterpreter,
context::Context,
create::CreateInterpreter,
describe::DescribeInterpreter,
drop::DropInterpreter,
exists::ExistsInterpreter,
insert::InsertInterpreter,
interpreter::{InterpreterPtr, Result},
select::SelectInterpreter,
show::ShowInterpreter,
table_manipulator::TableManipulatorRef,
validator::{ValidateContext, Validator},
};

/// A factory to create interpreters
Expand All @@ -37,8 +45,14 @@ impl<Q: Executor + 'static> Factory<Q> {
}
}

pub fn create(self, ctx: Context, plan: Plan) -> InterpreterPtr {
match plan {
pub fn create(self, ctx: Context, plan: Plan) -> Result<InterpreterPtr> {
let validate_ctx = ValidateContext {
enable_partition_table_access: ctx.enable_partition_table_access(),
};
let validator = Validator::new(validate_ctx);
validator.validate(&plan)?;

let interpreter = match plan {
Plan::Query(p) => SelectInterpreter::create(ctx, p, self.query_executor),
Plan::Insert(p) => InsertInterpreter::create(ctx, p),
Plan::Create(p) => {
Expand All @@ -51,6 +65,8 @@ impl<Q: Executor + 'static> Factory<Q> {
Plan::AlterTable(p) => AlterTableInterpreter::create(p),
Plan::Show(p) => ShowInterpreter::create(ctx, p, self.catalog_manager),
Plan::Exists(p) => ExistsInterpreter::create(p),
}
};

Ok(interpreter)
}
}
3 changes: 3 additions & 0 deletions interpreters/src/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub enum Error {

#[snafu(display("Failed to transfer ouput to records"))]
TryIntoRecords,

#[snafu(display("Failed to check permission, msg:{}", msg))]
PermissionDenied { msg: String },
}

define_result!(Error);
Expand Down
1 change: 1 addition & 0 deletions interpreters/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod interpreter;
pub mod select;
pub mod show;
pub mod table_manipulator;
pub mod validator;

mod show_create;

Expand Down
176 changes: 131 additions & 45 deletions interpreters/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
use std::sync::Arc;

use analytic_engine::tests::util::{EngineContext, RocksDBEngineContext, TestEnv};
use catalog::consts::{DEFAULT_CATALOG, DEFAULT_SCHEMA};
use catalog::{
consts::{DEFAULT_CATALOG, DEFAULT_SCHEMA},
manager::ManagerRef,
};
use catalog_impls::table_based::TableBasedManager;
use common_types::request_id::RequestId;
use query_engine::{executor::ExecutorImpl, Config as QueryConfig};
Expand All @@ -16,7 +19,7 @@ use crate::{
context::Context,
factory::Factory,
interpreter::{Output, Result},
table_manipulator::catalog_based::TableManipulatorImpl,
table_manipulator::{catalog_based::TableManipulatorImpl, TableManipulatorRef},
};

async fn build_catalog_manager(analytic: TableEngineRef) -> TableBasedManager {
Expand All @@ -39,6 +42,8 @@ where
{
pub engine: TableEngineRef,
pub meta_provider: M,
pub catalog_manager: ManagerRef,
pub table_manipulator: TableManipulatorRef,
}

impl<M> Env<M>
Expand All @@ -55,37 +60,109 @@ where
M: MetaProvider,
{
async fn build_factory(&self) -> Factory<ExecutorImpl> {
let catalog_manager = Arc::new(build_catalog_manager(self.engine()).await);
let table_manipulator = Arc::new(TableManipulatorImpl::new(catalog_manager.clone()));
Factory::new(
ExecutorImpl::new(query_engine::Config::default()),
catalog_manager,
self.catalog_manager.clone(),
self.engine(),
table_manipulator,
self.table_manipulator.clone(),
)
}

async fn sql_to_output(&self, sql: &str) -> Result<Output> {
let plan = sql_to_plan(&self.meta_provider, sql);

let ctx = Context::builder(RequestId::next_id(), None)
.default_catalog_and_schema(DEFAULT_CATALOG.to_string(), DEFAULT_SCHEMA.to_string())
.build();
self.sql_to_output_with_context(sql, ctx).await
}

async fn sql_to_output_with_context(&self, sql: &str, ctx: Context) -> Result<Output> {
let plan = sql_to_plan(&self.meta_provider, sql);
let factory = self.build_factory().await;
let interpreter = factory.create(ctx, plan);
let interpreter = factory.create(ctx, plan)?;
interpreter.execute().await
}

async fn test_create_table(&self) {
let sql="CREATE TABLE IF NOT EXISTS test_table(c1 string tag not null,ts timestamp not null, c3 string, timestamp key(ts),primary key(c1, ts)) \
ENGINE=Analytic WITH (ttl='70d',update_mode='overwrite',arena_block_size='1KB')";
async fn create_table_and_check(
&self,
table_name: &str,
enable_partition_table_access: bool,
) -> Result<()> {
let ctx = Context::builder(RequestId::next_id(), None)
.default_catalog_and_schema(DEFAULT_CATALOG.to_string(), DEFAULT_SCHEMA.to_string())
.enable_partition_table_access(enable_partition_table_access)
.build();
let sql= format!("CREATE TABLE IF NOT EXISTS {}(c1 string tag not null,ts timestamp not null, c3 string, timestamp key(ts),primary key(c1, ts)) \
ENGINE=Analytic WITH (ttl='70d',update_mode='overwrite',arena_block_size='1KB')", table_name);

let output = self.sql_to_output(sql).await.unwrap();
let output = self.sql_to_output_with_context(&sql, ctx).await?;
assert!(
matches!(output, Output::AffectedRows(v) if v == 0),
"create table should success"
);

Ok(())
}

async fn insert_table_and_check(
&self,
table_name: &str,
enable_partition_table_access: bool,
) -> Result<()> {
let ctx = Context::builder(RequestId::next_id(), None)
.default_catalog_and_schema(DEFAULT_CATALOG.to_string(), DEFAULT_SCHEMA.to_string())
.enable_partition_table_access(enable_partition_table_access)
.build();
let sql = format!("INSERT INTO {}(key1, key2, field1,field2) VALUES('tagk', 1638428434000,100, 'hello3'),('tagk2', 1638428434000,100, 'hello3');", table_name);
let output = self.sql_to_output_with_context(&sql, ctx).await?;
assert!(
matches!(output, Output::AffectedRows(v) if v == 2),
"insert table should success"
);

Ok(())
}

async fn select_table_and_check(
&self,
table_name: &str,
enable_partition_table_access: bool,
) -> Result<()> {
let ctx = Context::builder(RequestId::next_id(), None)
.default_catalog_and_schema(DEFAULT_CATALOG.to_string(), DEFAULT_SCHEMA.to_string())
.enable_partition_table_access(enable_partition_table_access)
.build();
let sql = format!("select * from {}", table_name);
let output = self.sql_to_output_with_context(&sql, ctx).await?;
let records = output.try_into().unwrap();
let expected = vec![
"+------------+---------------------+--------+--------+",
"| key1 | key2 | field1 | field2 |",
"+------------+---------------------+--------+--------+",
"| 7461676b | 2021-12-02 07:00:34 | 100 | hello3 |",
"| 7461676b32 | 2021-12-02 07:00:34 | 100 | hello3 |",
"+------------+---------------------+--------+--------+",
];
common_util::record_batch::assert_record_batches_eq(&expected, records);

let sql = "select count(*) from test_table";
let output = self.sql_to_output(sql).await?;
let records = output.try_into().unwrap();
let expected = vec![
"+-----------------+",
"| COUNT(UInt8(1)) |",
"+-----------------+",
"| 2 |",
"+-----------------+",
];
common_util::record_batch::assert_record_batches_eq(&expected, records);

Ok(())
}

async fn test_create_table(&self) {
self.create_table_and_check("test_table", false)
.await
.unwrap();
}

async fn test_desc_table(&self) {
Expand Down Expand Up @@ -120,12 +197,9 @@ where
}

async fn test_insert_table(&self) {
let sql = "INSERT INTO test_table(key1, key2, field1,field2) VALUES('tagk', 1638428434000,100, 'hello3'),('tagk2', 1638428434000,100, 'hello3');";
let output = self.sql_to_output(sql).await.unwrap();
assert!(
matches!(output, Output::AffectedRows(v) if v == 2),
"insert table should success"
);
self.insert_table_and_check("test_table", false)
.await
.unwrap();
}

async fn test_insert_table_with_missing_columns(&self) {
Expand All @@ -143,7 +217,7 @@ where
let insert_sql = "INSERT INTO test_missing_columns_table(key1, key2, field4) VALUES('tagk', 1638428434000, 1), ('tagk2', 1638428434000, 10);";

let plan = sql_to_plan(&self.meta_provider, insert_sql);
let interpreter = insert_factory.create(ctx, plan);
let interpreter = insert_factory.create(ctx, plan).unwrap();
let output = interpreter.execute().await.unwrap();
assert!(
matches!(output, Output::AffectedRows(v) if v == 2),
Expand All @@ -163,7 +237,7 @@ where
.default_catalog_and_schema(DEFAULT_CATALOG.to_string(), DEFAULT_SCHEMA.to_string())
.build();
let plan = sql_to_plan(&self.meta_provider, select_sql);
let interpreter = select_factory.create(ctx, plan);
let interpreter = select_factory.create(ctx, plan).unwrap();
let output = interpreter.execute().await.unwrap();
let records = output.try_into().unwrap();

Expand All @@ -188,30 +262,9 @@ where
}

async fn test_select_table(&self) {
let sql = "select * from test_table";
let output = self.sql_to_output(sql).await.unwrap();
let records = output.try_into().unwrap();
let expected = vec![
"+------------+---------------------+--------+--------+",
"| key1 | key2 | field1 | field2 |",
"+------------+---------------------+--------+--------+",
"| 7461676b | 2021-12-02 07:00:34 | 100 | hello3 |",
"| 7461676b32 | 2021-12-02 07:00:34 | 100 | hello3 |",
"+------------+---------------------+--------+--------+",
];
common_util::record_batch::assert_record_batches_eq(&expected, records);

let sql = "select count(*) from test_table";
let output = self.sql_to_output(sql).await.unwrap();
let records = output.try_into().unwrap();
let expected = vec![
"+-----------------+",
"| COUNT(UInt8(1)) |",
"+-----------------+",
"| 2 |",
"+-----------------+",
];
common_util::record_batch::assert_record_batches_eq(&expected, records);
self.select_table_and_check("test_table", false)
.await
.unwrap();
}

async fn test_show_create_table(&self) {
Expand Down Expand Up @@ -252,6 +305,31 @@ where
"alter table should success"
);
}

async fn test_enable_partition_table_access(&self) {
// Disable partition table access, all of create, insert and select about sub
// table(in table partition) directly will failed.
let res = self.create_table_and_check("__test_table", false).await;
assert!(format!("{:?}", res)
.contains("only can process sub tables in table partition directly when enable partition table access"));
let res1 = self.insert_table_and_check("__test_table", false).await;
assert!(format!("{:?}", res1)
.contains("only can process sub tables in table partition directly when enable partition table access"));
let res2 = self.select_table_and_check("__test_table", false).await;
assert!(format!("{:?}", res2)
.contains("only can process sub tables in table partition directly when enable partition table access"));

// Enable partition table access, operations above will success.
self.create_table_and_check("__test_table", true)
.await
.unwrap();
self.insert_table_and_check("__test_table", true)
.await
.unwrap();
self.select_table_and_check("__test_table", true)
.await
.unwrap();
}
}

#[tokio::test]
Expand All @@ -265,9 +343,15 @@ async fn test_interpreters<T: EngineContext>(engine_context: T) {
let mut test_ctx = env.new_context(engine_context);
test_ctx.open().await;
let mock = MockMetaProvider::default();
let engine = test_ctx.clone_engine();
let catalog_manager = Arc::new(build_catalog_manager(engine.clone()).await);
let table_manipulator = Arc::new(TableManipulatorImpl::new(catalog_manager.clone()));

let env = Env {
engine: test_ctx.clone_engine(),
meta_provider: mock,
catalog_manager,
table_manipulator,
};

env.test_create_table().await;
Expand All @@ -280,4 +364,6 @@ async fn test_interpreters<T: EngineContext>(engine_context: T) {
env.test_drop_table().await;

env.test_insert_table_with_missing_columns().await;

env.test_enable_partition_table_access().await;
}
Loading

0 comments on commit 89dca64

Please sign in to comment.