Skip to content

Commit

Permalink
Merge pull request #7959 from zhang2014/fix/cannot_kill_optimize
Browse files Browse the repository at this point in the history
fix(processor): try fix cannot kill optimize table
  • Loading branch information
BohuTANG authored Sep 29, 2022
2 parents b2f421b + 30e36cc commit e1846e7
Show file tree
Hide file tree
Showing 17 changed files with 138 additions and 25 deletions.
2 changes: 2 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use common_base::base::Progress;
Expand Down Expand Up @@ -73,6 +74,7 @@ pub trait TableContext: Send + Sync {
fn get_catalog(&self, catalog_name: &str) -> Result<Arc<dyn Catalog>>;
fn get_id(&self) -> String;
fn get_current_catalog(&self) -> String;
fn get_aborting(&self) -> Arc<AtomicBool>;
fn get_current_database(&self) -> String;
fn get_config(&self) -> Config;
fn get_current_user(&self) -> Result<UserInfo>;
Expand Down
46 changes: 40 additions & 6 deletions src/query/datablocks/src/kernels/data_block_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::iter::once;
use std::sync::Arc;

use common_arrow::arrow::array::ord as arrow_ord;
use common_arrow::arrow::array::ord::DynComparator;
Expand All @@ -29,6 +30,8 @@ use common_exception::Result;

use crate::DataBlock;

pub type Aborting = Arc<Box<dyn Fn() -> bool + Send + Sync + 'static>>;

#[derive(Clone)]
pub struct SortColumnDescription {
pub column_name: String,
Expand Down Expand Up @@ -146,27 +149,58 @@ impl DataBlock {
blocks: &[DataBlock],
sort_columns_descriptions: &[SortColumnDescription],
limit: Option<usize>,
aborting: Aborting,
) -> Result<DataBlock> {
match blocks.len() {
0 => Result::Err(ErrorCode::EmptyData("Can't merge empty blocks")),
1 => Ok(blocks[0].clone()),
2 => DataBlock::merge_sort_block(
&blocks[0],
&blocks[1],
sort_columns_descriptions,
limit,
),
2 => {
if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

DataBlock::merge_sort_block(
&blocks[0],
&blocks[1],
sort_columns_descriptions,
limit,
)
}
_ => {
if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

let left = DataBlock::merge_sort_blocks(
&blocks[0..blocks.len() / 2],
sort_columns_descriptions,
limit,
aborting.clone(),
)?;

if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

let right = DataBlock::merge_sort_blocks(
&blocks[blocks.len() / 2..blocks.len()],
sort_columns_descriptions,
limit,
aborting.clone(),
)?;

if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

DataBlock::merge_sort_block(&left, &right, sort_columns_descriptions, limit)
}
}
Expand Down
1 change: 1 addition & 0 deletions src/query/pipeline/transforms/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
// limitations under the License.

pub mod transforms;
pub use transforms::Aborting;
pub use transforms::ExpressionExecutor;
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// limitations under the License.

use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;

use super::Compactor;
use super::TransformCompact;
use crate::processors::transforms::Aborting;

pub struct BlockCompactor {
max_rows_per_block: usize,
Expand Down Expand Up @@ -106,12 +108,18 @@ impl Compactor for BlockCompactor {
Ok(res)
}

fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>> {
let mut res = Vec::with_capacity(blocks.len());
let mut temp_blocks = vec![];
let mut accumulated_rows = 0;

for block in blocks.iter() {
if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

// Perfect block, no need to compact
if block.num_rows() <= self.max_rows_per_block
&& (block.num_rows() >= self.min_rows_per_block
Expand All @@ -134,6 +142,12 @@ impl Compactor for BlockCompactor {
temp_blocks.push(block);

while accumulated_rows >= self.max_rows_per_block {
if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

let block = DataBlock::concat_blocks(&temp_blocks)?;
res.push(block.slice(0, self.max_rows_per_block));
accumulated_rows -= self.max_rows_per_block;
Expand All @@ -150,6 +164,12 @@ impl Compactor for BlockCompactor {
}

if accumulated_rows != 0 {
if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

let block = DataBlock::concat_blocks(&temp_blocks)?;
res.push(block);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

use std::any::Any;
use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
Expand All @@ -28,8 +30,11 @@ use common_pipeline_core::processors::Processor;
pub struct TransformCompact<T: Compactor + Send + 'static> {
state: ProcessorState,
compactor: T,
aborting: Aborting,
}

pub type Aborting = Arc<Box<dyn Fn() -> bool + Send + Sync + 'static>>;

/// Compactor is a trait that defines how to compact blocks.
pub trait Compactor {
fn name() -> &'static str;
Expand All @@ -45,23 +50,29 @@ pub trait Compactor {
}

/// `compact_final` is called when all the blocks are pushed to finish the compaction
fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>>;
fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>>;
}

impl<T: Compactor + Send + 'static> TransformCompact<T> {
pub fn try_create(
ctx: Arc<dyn TableContext>,
input_port: Arc<InputPort>,
output_port: Arc<OutputPort>,
compactor: T,
) -> Result<ProcessorPtr> {
let aborting = ctx.get_aborting();
let state = ProcessorState::Consume(ConsumeState {
input_port,
output_port,
input_data_blocks: vec![],
output_data_blocks: VecDeque::new(),
});

Ok(ProcessorPtr::create(Box::new(Self { state, compactor })))
Ok(ProcessorPtr::create(Box::new(Self {
state,
compactor,
aborting: Arc::new(Box::new(move || aborting.load(Ordering::Relaxed))),
})))
}

#[inline(always)]
Expand Down Expand Up @@ -154,7 +165,8 @@ impl<T: Compactor + Send + 'static> Processor for TransformCompact<T> {
Ok(())
}
ProcessorState::Compacting(state) => {
let compacted_blocks = self.compactor.compact_final(&state.blocks)?;
let aborting = self.aborting.clone();
let compacted_blocks = self.compactor.compact_final(&state.blocks, aborting)?;

let mut temp_state = ProcessorState::Finished;
std::mem::swap(&mut self.state, &mut temp_state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common_exception::Result;

use super::Compactor;
use super::TransformCompact;
use crate::processors::transforms::Aborting;

pub struct SortMergeCompactor {
limit: Option<usize>,
Expand All @@ -41,12 +42,16 @@ impl Compactor for SortMergeCompactor {
"SortMergeTransform"
}

fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>> {
if blocks.is_empty() {
Ok(vec![])
} else {
let block =
DataBlock::merge_sort_blocks(blocks, &self.sort_columns_descriptions, self.limit)?;
let block = DataBlock::merge_sort_blocks(
blocks,
&self.sort_columns_descriptions,
self.limit,
aborting,
)?;
Ok(vec![block])
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use common_datablocks::DataBlock;
use common_exception::Result;
use common_pipeline_transforms::processors::transforms::Aborting;

use super::ProbeState;

Expand Down Expand Up @@ -43,11 +44,15 @@ pub trait HashJoinState: Send + Sync {
async fn wait_finish(&self) -> Result<()>;

/// Get mark join results
fn mark_join_blocks(&self) -> Result<Vec<DataBlock>>;
fn mark_join_blocks(&self, flag: Aborting) -> Result<Vec<DataBlock>>;

/// Get right join results
fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>>;
fn right_join_blocks(&self, blocks: &[DataBlock], flag: Aborting) -> Result<Vec<DataBlock>>;

/// Get right semi/anti join results
fn right_anti_semi_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>>;
fn right_anti_semi_join_blocks(
&self,
blocks: &[DataBlock],
flag: Aborting,
) -> Result<Vec<DataBlock>>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use common_datavalues::NullableType;
use common_exception::ErrorCode;
use common_exception::Result;
use common_hashtable::HashMap;
use common_pipeline_transforms::processors::transforms::Aborting;
use common_planner::IndexType;
use parking_lot::RwLock;
use primitive_types::U256;
Expand Down Expand Up @@ -677,7 +678,7 @@ impl HashJoinState for JoinHashTable {
Ok(())
}

fn mark_join_blocks(&self) -> Result<Vec<DataBlock>> {
fn mark_join_blocks(&self, _aborting: Aborting) -> Result<Vec<DataBlock>> {
let mut row_ptrs = self.row_ptrs.write();
let has_null = self.hash_join_desc.marker_join_desc.has_null.read();
let mut validity = MutableBitmap::with_capacity(row_ptrs.len());
Expand Down Expand Up @@ -716,7 +717,7 @@ impl HashJoinState for JoinHashTable {
Ok(vec![self.merge_eq_block(&marker_block, &build_block)?])
}

fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
fn right_join_blocks(&self, blocks: &[DataBlock], _flag: Aborting) -> Result<Vec<DataBlock>> {
let unmatched_build_indexes = self.find_unmatched_build_indexes()?;
if unmatched_build_indexes.is_empty() && self.hash_join_desc.other_predicate.is_none() {
return Ok(blocks.to_vec());
Expand Down Expand Up @@ -803,7 +804,11 @@ impl HashJoinState for JoinHashTable {
Ok(vec![merged_block])
}

fn right_anti_semi_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
fn right_anti_semi_join_blocks(
&self,
blocks: &[DataBlock],
_flag: Aborting,
) -> Result<Vec<DataBlock>> {
// Fast path for right anti join with non-equi conditions
if self.hash_join_desc.other_predicate.is_none()
&& self.hash_join_desc.join_type == JoinType::RightAnti
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_exception::Result;
use common_pipeline_transforms::processors::transforms::Aborting;

use crate::pipelines::processors::transforms::Compactor;
use crate::pipelines::processors::HashJoinState;
Expand All @@ -37,8 +38,8 @@ impl Compactor for MarkJoinCompactor {
}

// `compact_final` is called when all the blocks are pushed
fn compact_final(&self, _blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
self.hash_join_state.mark_join_blocks()
fn compact_final(&self, _blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>> {
self.hash_join_state.mark_join_blocks(aborting)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_exception::Result;
use common_pipeline_transforms::processors::transforms::Aborting;

use crate::pipelines::processors::transforms::Compactor;
use crate::pipelines::processors::HashJoinState;
Expand All @@ -37,8 +38,8 @@ impl Compactor for RightJoinCompactor {
}

// `compact_final` is called when all the blocks are pushed
fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
self.hash_join_state.right_join_blocks(blocks)
fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>> {
self.hash_join_state.right_join_blocks(blocks, aborting)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_exception::Result;
use common_pipeline_transforms::processors::transforms::Aborting;

use crate::pipelines::processors::transforms::Compactor;
use crate::pipelines::processors::HashJoinState;
Expand All @@ -37,8 +38,9 @@ impl Compactor for RightSemiAntiJoinCompactor {
}

// `compact_final` is called when all the blocks are pushed
fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
self.hash_join_state.right_anti_semi_join_blocks(blocks)
fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>> {
self.hash_join_state
.right_anti_semi_join_blocks(blocks, aborting)
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::VecDeque;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand Down Expand Up @@ -267,6 +268,11 @@ impl TableContext for QueryContext {
fn get_current_catalog(&self) -> String {
self.shared.get_current_catalog()
}

fn get_aborting(&self) -> Arc<AtomicBool> {
self.shared.get_aborting()
}

fn get_current_database(&self) -> String {
self.shared.get_current_database()
}
Expand Down
Loading

1 comment on commit e1846e7

@vercel
Copy link

@vercel vercel bot commented on e1846e7 Sep 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend-databend.vercel.app
databend-git-main-databend.vercel.app
databend.rs

Please sign in to comment.