Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-3176: introduce compact statement #3182

Merged
merged 8 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions query/src/sql/sql_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use sqlparser::tokenizer::Whitespace;

use super::statements::DfCopy;
use crate::sql::statements::DfAlterUser;
use crate::sql::statements::DfCompactTable;
use crate::sql::statements::DfCreateDatabase;
use crate::sql::statements::DfCreateTable;
use crate::sql::statements::DfCreateUser;
Expand Down Expand Up @@ -240,6 +241,7 @@ impl<'a> DfParser<'a> {
// Use database
"USE" => self.parse_use_database(),
"KILL" => self.parse_kill_query(),
"COMPACT" => self.parse_compact(),
_ => self.expected("Keyword", self.parser.peek_token()),
},
_ => self.expected("an SQL statement", Token::Word(w)),
Expand Down Expand Up @@ -888,6 +890,21 @@ impl<'a> DfParser<'a> {
Ok(options)
}

fn parse_compact(&mut self) -> Result<DfStatement, ParserError> {
self.parser.next_token();
match self.parser.next_token() {
Token::Word(w) => match w.keyword {
Keyword::TABLE => {
let table_name = self.parser.parse_object_name()?;
let compact = DfCompactTable { name: table_name };
Ok(DfStatement::CompactTable(compact))
}
_ => self.expected("TABLE", Token::Word(w)),
},
unexpected => self.expected("compact statement", unexpected),
}
}

fn consume_token(&mut self, expected: &str) -> bool {
if self.parser.peek_token().to_string().to_uppercase() == *expected.to_uppercase() {
self.parser.next_token();
Expand Down
2 changes: 2 additions & 0 deletions query/src/sql/sql_statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use nom::IResult;

use super::statements::DfCopy;
use crate::sql::statements::DfAlterUser;
use crate::sql::statements::DfCompactTable;
use crate::sql::statements::DfCreateDatabase;
use crate::sql::statements::DfCreateTable;
use crate::sql::statements::DfCreateUser;
Expand Down Expand Up @@ -64,6 +65,7 @@ pub enum DfStatement {
DescribeTable(DfDescribeTable),
DropTable(DfDropTable),
TruncateTable(DfTruncateTable),
CompactTable(DfCompactTable),

// Settings.
ShowSettings(DfShowSettings),
Expand Down
1 change: 1 addition & 0 deletions query/src/sql/statements/analyzer_statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl AnalyzableStatement for DfStatement {
DfStatement::DescribeTable(v) => v.analyze(ctx).await,
DfStatement::DropTable(v) => v.analyze(ctx).await,
DfStatement::TruncateTable(v) => v.analyze(ctx).await,
DfStatement::CompactTable(v) => v.analyze(ctx).await,
DfStatement::UseDatabase(v) => v.analyze(ctx).await,
DfStatement::ShowCreateTable(v) => v.analyze(ctx).await,
DfStatement::ShowTables(v) => v.analyze(ctx).await,
Expand Down
2 changes: 2 additions & 0 deletions query/src/sql/statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod analyzer_expr;
mod analyzer_statement;
mod analyzer_value_expr;
mod statement_alter_user;
mod statement_compact_table;
mod statement_copy;
mod statement_create_database;
mod statement_create_table;
Expand Down Expand Up @@ -52,6 +53,7 @@ pub use analyzer_statement::QueryAnalyzeState;
pub use analyzer_statement::QueryRelation;
pub use query::QueryASTIR;
pub use statement_alter_user::DfAlterUser;
pub use statement_compact_table::DfCompactTable;
pub use statement_copy::DfCopy;
pub use statement_create_database::DfCreateDatabase;
pub use statement_create_table::DfCreateTable;
Expand Down
59 changes: 59 additions & 0 deletions query/src/sql/statements/statement_compact_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_tracing::tracing;
use sqlparser::ast::ObjectName;

use crate::sessions::QueryContext;
use crate::sql::statements::AnalyzableStatement;
use crate::sql::statements::AnalyzedResult;
use crate::sql::PlanParser;

#[derive(Debug, Clone, PartialEq)]
pub struct DfCompactTable {
pub name: ObjectName,
}

#[async_trait::async_trait]
impl AnalyzableStatement for DfCompactTable {
#[tracing::instrument(level = "info", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))]
async fn analyze(&self, ctx: Arc<QueryContext>) -> Result<AnalyzedResult> {
let (db, table) = self.resolve_table(ctx.clone())?;
let table = format!("{}.{}", db, table);
let rewritten_query = format!("INSERT OVERWRITE {} SELECT * FROM {}", table, table);
let rewritten_plan = PlanParser::parse(rewritten_query.as_str(), ctx).await?;
Ok(AnalyzedResult::SimpleQuery(Box::new(rewritten_plan)))
}
}

impl DfCompactTable {
fn resolve_table(&self, ctx: Arc<QueryContext>) -> Result<(String, String)> {
let DfCompactTable {
name: ObjectName(idents),
..
} = self;
match idents.len() {
0 => Err(ErrorCode::SyntaxException("Compact table name is empty")),
1 => Ok((ctx.get_current_database(), idents[0].value.clone())),
2 => Ok((idents[0].value.clone(), idents[1].value.clone())),
_ => Err(ErrorCode::SyntaxException(
"Compact table name must be [`db`].`table`",
)),
}
}
}
6 changes: 3 additions & 3 deletions query/src/storages/fuse/index/min_max_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn test_min_max_index() -> Result<()> {
// create test table
let crate_table_plan = CreateTableReq {
if_not_exists: false,
db: fixture.default_db(),
db: fixture.default_db_name(),
table: test_tbl_name.to_string(),
table_meta: TableMeta {
schema: test_schema.clone(),
Expand All @@ -64,7 +64,7 @@ async fn test_min_max_index() -> Result<()> {

// get table
let table = catalog
.get_table(fixture.default_db().as_str(), test_tbl_name)
.get_table(fixture.default_db_name().as_str(), test_tbl_name)
.await?;

// prepare test blocks
Expand All @@ -88,7 +88,7 @@ async fn test_min_max_index() -> Result<()> {

// get the latest tbl
let table = catalog
.get_table(fixture.default_db().as_str(), test_tbl_name)
.get_table(fixture.default_db_name().as_str(), test_tbl_name)
.await?;

let snapshot_loc = table
Expand Down
114 changes: 71 additions & 43 deletions query/src/storages/fuse/table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use common_planners::TruncateTablePlan;
use futures::TryStreamExt;

use crate::catalogs::Catalog;
use crate::interpreters::InterpreterFactory;
use crate::sql::PlanParser;
use crate::storages::fuse::table_test_fixture::TestFixture;
use crate::storages::fuse::TBL_OPT_KEY_CHUNK_BLOCK_NUM;
use crate::storages::ToReadDataSourcePlan;

#[tokio::test]
Expand All @@ -34,12 +37,7 @@ async fn test_fuse_table_simple_case() -> Result<()> {
catalog.create_table(create_table_plan.into()).await?;

// get table
let table = catalog
.get_table(
fixture.default_db().as_str(),
fixture.default_table().as_str(),
)
.await?;
let table = fixture.latest_default_table().await?;

// insert 5 blocks
let num_blocks = 5;
Expand All @@ -54,12 +52,7 @@ async fn test_fuse_table_simple_case() -> Result<()> {

// get the latest tbl
let prev_version = table.get_table_info().ident.version;
let table = catalog
.get_table(
fixture.default_db().as_str(),
fixture.default_table().as_str(),
)
.await?;
let table = fixture.latest_default_table().await?;
assert_ne!(prev_version, table.get_table_info().ident.version);

let (stats, parts) = table.read_partitions(ctx.clone(), None).await?;
Expand Down Expand Up @@ -122,12 +115,7 @@ async fn test_fuse_table_simple_case() -> Result<()> {

// get the latest tbl
let prev_version = table.get_table_info().ident.version;
let table = catalog
.get_table(
fixture.default_db().as_str(),
fixture.default_table().as_str(),
)
.await?;
let table = fixture.latest_default_table().await?;
assert_ne!(prev_version, table.get_table_info().ident.version);

let (stats, parts) = table.read_partitions(ctx.clone(), None).await?;
Expand Down Expand Up @@ -187,13 +175,7 @@ async fn test_fuse_table_truncate() -> Result<()> {
let catalog = ctx.get_catalog();
catalog.create_table(create_table_plan.into()).await?;

let table = catalog
.get_table(
fixture.default_db().as_str(),
fixture.default_table().as_str(),
)
.await?;

let table = fixture.latest_default_table().await?;
let truncate_plan = TruncateTablePlan {
db: "".to_string(),
table: "".to_string(),
Expand All @@ -202,12 +184,7 @@ async fn test_fuse_table_truncate() -> Result<()> {
// 1. truncate empty table
let prev_version = table.get_table_info().ident.version;
let r = table.truncate(ctx.clone(), truncate_plan.clone()).await;
let table = catalog
.get_table(
fixture.default_db().as_str(),
fixture.default_table().as_str(),
)
.await?;
let table = fixture.latest_default_table().await?;
// no side effects
assert_eq!(prev_version, table.get_table_info().ident.version);
assert!(r.is_ok());
Expand All @@ -226,12 +203,7 @@ async fn test_fuse_table_truncate() -> Result<()> {

// get the latest tbl
let prev_version = table.get_table_info().ident.version;
let table = catalog
.get_table(
fixture.default_db().as_str(),
fixture.default_table().as_str(),
)
.await?;
let table = fixture.latest_default_table().await?;
assert_ne!(prev_version, table.get_table_info().ident.version);

// ensure data ingested
Expand All @@ -247,12 +219,7 @@ async fn test_fuse_table_truncate() -> Result<()> {

// get the latest tbl
let prev_version = table.get_table_info().ident.version;
let table = catalog
.get_table(
fixture.default_db().as_str(),
fixture.default_table().as_str(),
)
.await?;
let table = fixture.latest_default_table().await?;
assert_ne!(prev_version, table.get_table_info().ident.version);
let (stats, parts) = table
.read_partitions(ctx.clone(), source_plan.push_downs.clone())
Expand All @@ -263,3 +230,64 @@ async fn test_fuse_table_truncate() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_fuse_table_compact() -> Result<()> {
let fixture = TestFixture::new().await;
let ctx = fixture.ctx();

let mut create_table_plan = fixture.default_crate_table_plan();
// set chunk size to 100
create_table_plan
.table_meta
.options
.insert(TBL_OPT_KEY_CHUNK_BLOCK_NUM.to_owned(), 100.to_string());

// create test table
let tbl_name = create_table_plan.table.clone();
let db_name = create_table_plan.db.clone();
let catalog = ctx.get_catalog();
catalog.create_table(create_table_plan.into()).await?;

// insert 5 times
let n = 5;
for _ in 0..n {
let table = fixture.latest_default_table().await?;
let num_blocks = 1;
let stream = Box::pin(futures::stream::iter(TestFixture::gen_block_stream(
num_blocks, 1,
)));
let r = table.append_data(ctx.clone(), stream).await?;
table
.commit(ctx.clone(), r.try_collect().await?, false)
.await?;
}

// there will be 5 blocks
let table = fixture.latest_default_table().await?;
let (_, parts) = table.read_partitions(ctx.clone(), None).await?;
assert_eq!(parts.len(), n);

// do compact
let query = format!("compact table {}.{}", db_name, tbl_name);
let plan = PlanParser::parse(&query, ctx.clone()).await?;
let interpreter = InterpreterFactory::get(ctx.clone(), plan)?;

// `PipelineBuilder` will parallelize the table reading according to value of setting `max_threads`,
// and `Table::read` will also try to de-queue read jobs preemptively. thus, the number of blocks
// that `Table::append` takes are not deterministic (`append` is also executed parallelly in this case),
// therefore, the final number of blocks varies.
// To avoid flaky test, the value of setting `max_threads` is set to be 1, so that pipeline_builder will
// only arrange one worker for the `ReadDataSourcePlan`.
ctx.get_settings().set_max_threads(1)?;
let data_stream = interpreter.execute(None).await?;
let _ = data_stream.try_collect::<Vec<_>>();

// verify compaction
let table = fixture.latest_default_table().await?;
let (_, parts) = table.read_partitions(ctx.clone(), None).await?;
// blocks are so tiny, they should be compacted into one
assert_eq!(parts.len(), 1);

Ok(())
}
23 changes: 17 additions & 6 deletions query/src/storages/fuse/table_test_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::catalogs::Catalog;
use crate::configs::Config;
use crate::sessions::QueryContext;
use crate::storages::fuse::TBL_OPT_KEY_CHUNK_BLOCK_NUM;
use crate::storages::Table;

pub struct TestFixture {
_tmp_dir: TempDir,
Expand Down Expand Up @@ -76,12 +77,12 @@ impl TestFixture {
self.ctx.clone()
}

pub fn default_db(&self) -> String {
pub fn default_db_name(&self) -> String {
gen_db_name(&self.prefix)
}

pub fn default_table(&self) -> String {
format!("{}_test_tbl", self.prefix)
pub fn default_table_name(&self) -> String {
format!("tbl_{}", self.prefix)
}

pub fn default_schema() -> DataSchemaRef {
Expand All @@ -91,8 +92,8 @@ impl TestFixture {
pub fn default_crate_table_plan(&self) -> CreateTablePlan {
CreateTablePlan {
if_not_exists: false,
db: self.default_db(),
table: self.default_table(),
db: self.default_db_name(),
table: self.default_table_name(),
table_meta: TableMeta {
schema: TestFixture::default_schema(),
engine: "FUSE".to_string(),
Expand All @@ -116,8 +117,18 @@ impl TestFixture {
})
.collect()
}

pub async fn latest_default_table(&self) -> Result<Arc<dyn Table>> {
self.ctx
.get_catalog()
.get_table(
self.default_db_name().as_str(),
self.default_table_name().as_str(),
)
.await
}
}

fn gen_db_name(prefix: &str) -> String {
format!("{}_default", prefix)
format!("db_{}", prefix)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
20
20
Loading