Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(processor): try fix cannot kill optimize table #7959

Merged
merged 5 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use common_base::base::Progress;
Expand Down Expand Up @@ -73,6 +74,7 @@ pub trait TableContext: Send + Sync {
fn get_catalog(&self, catalog_name: &str) -> Result<Arc<dyn Catalog>>;
fn get_id(&self) -> String;
fn get_current_catalog(&self) -> String;
fn get_aborting(&self) -> Arc<AtomicBool>;
fn get_current_database(&self) -> String;
fn get_config(&self) -> Config;
fn get_current_user(&self) -> Result<UserInfo>;
Expand Down
46 changes: 40 additions & 6 deletions src/query/datablocks/src/kernels/data_block_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::iter::once;
use std::sync::Arc;

use common_arrow::arrow::array::ord as arrow_ord;
use common_arrow::arrow::array::ord::DynComparator;
Expand All @@ -29,6 +30,8 @@ use common_exception::Result;

use crate::DataBlock;

pub type Aborting = Arc<Box<dyn Fn() -> bool + Send + Sync + 'static>>;

#[derive(Clone)]
pub struct SortColumnDescription {
pub column_name: String,
Expand Down Expand Up @@ -146,27 +149,58 @@ impl DataBlock {
blocks: &[DataBlock],
sort_columns_descriptions: &[SortColumnDescription],
limit: Option<usize>,
aborting: Aborting,
) -> Result<DataBlock> {
match blocks.len() {
0 => Result::Err(ErrorCode::EmptyData("Can't merge empty blocks")),
1 => Ok(blocks[0].clone()),
2 => DataBlock::merge_sort_block(
&blocks[0],
&blocks[1],
sort_columns_descriptions,
limit,
),
2 => {
if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

DataBlock::merge_sort_block(
&blocks[0],
&blocks[1],
sort_columns_descriptions,
limit,
)
}
_ => {
if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

let left = DataBlock::merge_sort_blocks(
&blocks[0..blocks.len() / 2],
sort_columns_descriptions,
limit,
aborting.clone(),
)?;

if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

let right = DataBlock::merge_sort_blocks(
&blocks[blocks.len() / 2..blocks.len()],
sort_columns_descriptions,
limit,
aborting.clone(),
)?;

if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

DataBlock::merge_sort_block(&left, &right, sort_columns_descriptions, limit)
}
}
Expand Down
1 change: 1 addition & 0 deletions src/query/pipeline/transforms/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
// limitations under the License.

pub mod transforms;
pub use transforms::Aborting;
pub use transforms::ExpressionExecutor;
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// limitations under the License.

use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;

use super::Compactor;
use super::TransformCompact;
use crate::processors::transforms::Aborting;

pub struct BlockCompactor {
max_rows_per_block: usize,
Expand Down Expand Up @@ -106,12 +108,18 @@ impl Compactor for BlockCompactor {
Ok(res)
}

fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>> {
let mut res = Vec::with_capacity(blocks.len());
let mut temp_blocks = vec![];
let mut accumulated_rows = 0;

for block in blocks.iter() {
if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

// Perfect block, no need to compact
if block.num_rows() <= self.max_rows_per_block
&& (block.num_rows() >= self.min_rows_per_block
Expand All @@ -134,6 +142,12 @@ impl Compactor for BlockCompactor {
temp_blocks.push(block);

while accumulated_rows >= self.max_rows_per_block {
if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

let block = DataBlock::concat_blocks(&temp_blocks)?;
res.push(block.slice(0, self.max_rows_per_block));
accumulated_rows -= self.max_rows_per_block;
Expand All @@ -150,6 +164,12 @@ impl Compactor for BlockCompactor {
}

if accumulated_rows != 0 {
if aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

let block = DataBlock::concat_blocks(&temp_blocks)?;
res.push(block);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

use std::any::Any;
use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
Expand All @@ -28,8 +30,11 @@ use common_pipeline_core::processors::Processor;
pub struct TransformCompact<T: Compactor + Send + 'static> {
state: ProcessorState,
compactor: T,
aborting: Aborting,
}

pub type Aborting = Arc<Box<dyn Fn() -> bool + Send + Sync + 'static>>;

/// Compactor is a trait that defines how to compact blocks.
pub trait Compactor {
fn name() -> &'static str;
Expand All @@ -45,23 +50,29 @@ pub trait Compactor {
}

/// `compact_final` is called when all the blocks are pushed to finish the compaction
fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>>;
fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>>;
}

impl<T: Compactor + Send + 'static> TransformCompact<T> {
pub fn try_create(
ctx: Arc<dyn TableContext>,
input_port: Arc<InputPort>,
output_port: Arc<OutputPort>,
compactor: T,
) -> Result<ProcessorPtr> {
let aborting = ctx.get_aborting();
let state = ProcessorState::Consume(ConsumeState {
input_port,
output_port,
input_data_blocks: vec![],
output_data_blocks: VecDeque::new(),
});

Ok(ProcessorPtr::create(Box::new(Self { state, compactor })))
Ok(ProcessorPtr::create(Box::new(Self {
state,
compactor,
aborting: Arc::new(Box::new(move || aborting.load(Ordering::Relaxed))),
})))
}

#[inline(always)]
Expand Down Expand Up @@ -154,7 +165,8 @@ impl<T: Compactor + Send + 'static> Processor for TransformCompact<T> {
Ok(())
}
ProcessorState::Compacting(state) => {
let compacted_blocks = self.compactor.compact_final(&state.blocks)?;
let aborting = self.aborting.clone();
let compacted_blocks = self.compactor.compact_final(&state.blocks, aborting)?;

let mut temp_state = ProcessorState::Finished;
std::mem::swap(&mut self.state, &mut temp_state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common_exception::Result;

use super::Compactor;
use super::TransformCompact;
use crate::processors::transforms::Aborting;

pub struct SortMergeCompactor {
limit: Option<usize>,
Expand All @@ -41,12 +42,16 @@ impl Compactor for SortMergeCompactor {
"SortMergeTransform"
}

fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>> {
if blocks.is_empty() {
Ok(vec![])
} else {
let block =
DataBlock::merge_sort_blocks(blocks, &self.sort_columns_descriptions, self.limit)?;
let block = DataBlock::merge_sort_blocks(
blocks,
&self.sort_columns_descriptions,
self.limit,
aborting,
)?;
Ok(vec![block])
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use common_datablocks::DataBlock;
use common_exception::Result;
use common_pipeline_transforms::processors::transforms::Aborting;

use super::ProbeState;

Expand Down Expand Up @@ -43,11 +44,15 @@ pub trait HashJoinState: Send + Sync {
async fn wait_finish(&self) -> Result<()>;

/// Get mark join results
fn mark_join_blocks(&self) -> Result<Vec<DataBlock>>;
fn mark_join_blocks(&self, flag: Aborting) -> Result<Vec<DataBlock>>;

/// Get right join results
fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>>;
fn right_join_blocks(&self, blocks: &[DataBlock], flag: Aborting) -> Result<Vec<DataBlock>>;

/// Get right semi/anti join results
fn right_anti_semi_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>>;
fn right_anti_semi_join_blocks(
&self,
blocks: &[DataBlock],
flag: Aborting,
) -> Result<Vec<DataBlock>>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use common_datavalues::NullableType;
use common_exception::ErrorCode;
use common_exception::Result;
use common_hashtable::HashMap;
use common_pipeline_transforms::processors::transforms::Aborting;
use common_planner::IndexType;
use parking_lot::RwLock;
use primitive_types::U256;
Expand Down Expand Up @@ -677,7 +678,7 @@ impl HashJoinState for JoinHashTable {
Ok(())
}

fn mark_join_blocks(&self) -> Result<Vec<DataBlock>> {
fn mark_join_blocks(&self, _aborting: Aborting) -> Result<Vec<DataBlock>> {
let mut row_ptrs = self.row_ptrs.write();
let has_null = self.hash_join_desc.marker_join_desc.has_null.read();
let mut validity = MutableBitmap::with_capacity(row_ptrs.len());
Expand Down Expand Up @@ -716,7 +717,7 @@ impl HashJoinState for JoinHashTable {
Ok(vec![self.merge_eq_block(&marker_block, &build_block)?])
}

fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
fn right_join_blocks(&self, blocks: &[DataBlock], _flag: Aborting) -> Result<Vec<DataBlock>> {
let unmatched_build_indexes = self.find_unmatched_build_indexes()?;
if unmatched_build_indexes.is_empty() && self.hash_join_desc.other_predicate.is_none() {
return Ok(blocks.to_vec());
Expand Down Expand Up @@ -803,7 +804,11 @@ impl HashJoinState for JoinHashTable {
Ok(vec![merged_block])
}

fn right_anti_semi_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
fn right_anti_semi_join_blocks(
&self,
blocks: &[DataBlock],
_flag: Aborting,
) -> Result<Vec<DataBlock>> {
// Fast path for right anti join with non-equi conditions
if self.hash_join_desc.other_predicate.is_none()
&& self.hash_join_desc.join_type == JoinType::RightAnti
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_exception::Result;
use common_pipeline_transforms::processors::transforms::Aborting;

use crate::pipelines::processors::transforms::Compactor;
use crate::pipelines::processors::HashJoinState;
Expand All @@ -37,8 +38,8 @@ impl Compactor for MarkJoinCompactor {
}

// `compact_final` is called when all the blocks are pushed
fn compact_final(&self, _blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
self.hash_join_state.mark_join_blocks()
fn compact_final(&self, _blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>> {
self.hash_join_state.mark_join_blocks(aborting)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_exception::Result;
use common_pipeline_transforms::processors::transforms::Aborting;

use crate::pipelines::processors::transforms::Compactor;
use crate::pipelines::processors::HashJoinState;
Expand All @@ -37,8 +38,8 @@ impl Compactor for RightJoinCompactor {
}

// `compact_final` is called when all the blocks are pushed
fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
self.hash_join_state.right_join_blocks(blocks)
fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>> {
self.hash_join_state.right_join_blocks(blocks, aborting)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_exception::Result;
use common_pipeline_transforms::processors::transforms::Aborting;

use crate::pipelines::processors::transforms::Compactor;
use crate::pipelines::processors::HashJoinState;
Expand All @@ -37,8 +38,9 @@ impl Compactor for RightSemiAntiJoinCompactor {
}

// `compact_final` is called when all the blocks are pushed
fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
self.hash_join_state.right_anti_semi_join_blocks(blocks)
fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result<Vec<DataBlock>> {
self.hash_join_state
.right_anti_semi_join_blocks(blocks, aborting)
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::VecDeque;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand Down Expand Up @@ -267,6 +268,11 @@ impl TableContext for QueryContext {
fn get_current_catalog(&self) -> String {
self.shared.get_current_catalog()
}

fn get_aborting(&self) -> Arc<AtomicBool> {
self.shared.get_aborting()
}

fn get_current_database(&self) -> String {
self.shared.get_current_database()
}
Expand Down
Loading