Skip to content

Commit

Permalink
fix: enable kill query before pipeline started.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed May 8, 2023
1 parent 643f6c7 commit 9e3a744
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 35 deletions.
4 changes: 2 additions & 2 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::SystemTime;

Expand Down Expand Up @@ -108,7 +107,8 @@ pub trait TableContext: Send + Sync {
fn get_catalog(&self, catalog_name: &str) -> Result<Arc<dyn Catalog>>;
fn get_id(&self) -> String;
fn get_current_catalog(&self) -> String;
fn get_aborting(&self) -> Arc<AtomicBool>;
fn check_aborting(&self) -> Result<()>;
fn get_error(&self) -> Option<ErrorCode>;
fn get_current_database(&self) -> String;
fn get_current_user(&self) -> Result<UserInfo>;
fn get_current_role(&self) -> Option<RoleInfo>;
Expand Down
25 changes: 15 additions & 10 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ pub trait Interpreter: Sync + Send {
InterpreterMetrics::record_query_start(&ctx);
log_query_start(&ctx);

if let Err(err) = ctx.check_aborting() {
log_query_finished(&ctx, Some(err.clone()));
return Err(err);
}

let mut build_res = match self.execute2().await {
Ok(build_res) => build_res,
Err(build_error) => {
Expand Down Expand Up @@ -91,18 +96,18 @@ pub trait Interpreter: Sync + Send {

let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;

ctx.set_executor(Arc::downgrade(&complete_executor.get_inner()));
ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;
return Ok(Box::pin(DataBlockStream::create(None, vec![])));
Ok(Box::pin(DataBlockStream::create(None, vec![])))
} else {
let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;

ctx.set_executor(pulling_executor.get_inner())?;
Ok(Box::pin(ProgressStream::try_create(
Box::pin(PullingExecutorStream::create(pulling_executor)?),
ctx.get_result_progress(),
)?))
}

let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;

ctx.set_executor(Arc::downgrade(&pulling_executor.get_inner()));
Ok(Box::pin(ProgressStream::try_create(
Box::pin(PullingExecutorStream::create(pulling_executor)?),
ctx.get_result_progress(),
)?))
}

/// The core of the databend processor which will execute the logical plan and build the pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl Interpreter for ReclusterTableInterpreter {
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;

ctx.set_executor(Arc::downgrade(&executor.get_inner()));
ctx.set_executor(executor.get_inner())?;
executor.execute()?;
drop(executor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ impl Interpreter for RemoveUserStageInterpreter {
error!("Failed to delete file: {:?}, error: {}", chunk, e);
}

let aborting = self.ctx.get_aborting();
if aborting.load(std::sync::atomic::Ordering::SeqCst) {
if self.ctx.check_aborting().is_err() {
return Ok(PipelineBuildResult::create());
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/query/service/src/pipelines/executor/pipeline_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl PipelineExecutor {
}))
}

fn on_finished(&self, error: &Option<ErrorCode>) -> Result<()> {
pub(crate) fn on_finished(&self, error: &Option<ErrorCode>) -> Result<()> {
let mut guard = self.on_finished_callback.lock();
if let Some(on_finished_callback) = guard.take() {
drop(guard);
Expand Down Expand Up @@ -282,7 +282,6 @@ impl PipelineExecutor {

let sync_queue = std::mem::take(&mut init_schedule_queue.sync_queue);
self.global_tasks_queue.init_sync_tasks(sync_queue);

Ok(())
}
}
Expand Down Expand Up @@ -400,10 +399,11 @@ impl Drop for PipelineExecutor {
let mut guard = self.on_finished_callback.lock();
if let Some(on_finished_callback) = guard.take() {
drop(guard);
let cause = Some(ErrorCode::Internal(
"Pipeline illegal state: not successfully shutdown.",
));
if let Err(cause) = catch_unwind(move || on_finished_callback(&cause)).flatten() {
let cause = match self.finished_error.lock().as_ref() {
Some(cause) => cause.clone(),
None => ErrorCode::Internal("Pipeline illegal state: not successfully shutdown."),
};
if let Err(cause) = catch_unwind(move || on_finished_callback(&Some(cause))).flatten() {
warn!("Pipeline executor shutdown failure, {:?}", cause);
}
}
Expand Down
12 changes: 7 additions & 5 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ use std::collections::VecDeque;
use std::future::Future;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Weak;
use std::time::SystemTime;

use common_base::base::tokio::task::JoinHandle;
Expand Down Expand Up @@ -205,7 +203,7 @@ impl QueryContext {
*self.shared.init_query_id.write() = id;
}

pub fn set_executor(&self, weak_ptr: Weak<PipelineExecutor>) {
pub fn set_executor(&self, weak_ptr: Arc<PipelineExecutor>) -> Result<()> {
self.shared.set_executor(weak_ptr)
}

Expand Down Expand Up @@ -359,8 +357,12 @@ impl TableContext for QueryContext {
self.shared.get_current_catalog()
}

fn get_aborting(&self) -> Arc<AtomicBool> {
self.shared.get_aborting()
fn check_aborting(&self) -> Result<()> {
self.shared.check_aborting()
}

fn get_error(&self) -> Option<ErrorCode> {
self.shared.get_error()
}

fn get_current_database(&self) -> String {
Expand Down
32 changes: 29 additions & 3 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ impl QueryContextShared {
*guard = Some(err);
}

pub fn get_error(&self) -> Option<ErrorCode> {
let guard = self.error.lock();
(*guard).clone()
}

pub fn set_on_error_map(&self, map: Arc<DashMap<String, HashMap<u16, InputError>>>) {
let mut guard = self.on_error_map.write();
*guard = Some(map);
Expand Down Expand Up @@ -174,6 +179,18 @@ impl QueryContextShared {
self.aborting.clone()
}

pub fn check_aborting(&self) -> Result<()> {
if self.aborting.load(Ordering::Acquire) {
Err(self.get_error().unwrap_or_else(|| {
ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
)
}))
} else {
Ok(())
}
}

pub fn get_current_database(&self) -> String {
self.session.get_current_database()
}
Expand Down Expand Up @@ -328,9 +345,18 @@ impl QueryContextShared {
*guard = Some(affect);
}

pub fn set_executor(&self, weak_ptr: Weak<PipelineExecutor>) {
let mut executor = self.executor.write();
*executor = weak_ptr;
pub fn set_executor(&self, executor: Arc<PipelineExecutor>) -> Result<()> {
let mut guard = self.executor.write();
match self.check_aborting() {
Ok(_) => {
*guard = Arc::downgrade(&executor);
Ok(())
}
Err(err) => {
executor.finish(Some(err.clone()));
Err(err)
}
}
}

pub fn push_precommit_block(&self, block: DataBlock) {
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/stream/table_read_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<T: ?Sized + Table> ReadDataBlockStream for T {
let query_id = ctx.get_id();
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
let executor = PipelinePullingExecutor::try_create(pipeline, executor_settings)?;
ctx.set_executor(Arc::downgrade(&executor.get_inner()));
ctx.set_executor(executor.get_inner())?;
Ok(Box::pin(PullingExecutorStream::create(executor)?))
}
}
2 changes: 1 addition & 1 deletion src/query/service/src/test_kits/table_test_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ pub fn execute_pipeline(ctx: Arc<QueryContext>, mut res: PipelineBuildResult) ->
let mut pipelines = res.sources_pipelines;
pipelines.push(res.main_pipeline);
let executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
ctx.set_executor(Arc::downgrade(&executor.get_inner()));
ctx.set_executor(executor.get_inner())?;
executor.execute()
}

Expand Down
7 changes: 5 additions & 2 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use std::any::Any;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -465,7 +464,11 @@ impl TableContext for CtxDelegation {
"default".to_owned()
}

fn get_aborting(&self) -> Arc<AtomicBool> {
fn check_aborting(&self) -> Result<()> {
todo!()
}

fn get_error(&self) -> Option<ErrorCode> {
todo!()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn do_compact(ctx: Arc<QueryContext>, table: Arc<dyn Table>) -> Result<boo
let query_id = ctx.get_id();
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
ctx.set_executor(Arc::downgrade(&executor.get_inner()));
ctx.set_executor(executor.get_inner())?;
executor.execute()?;
Ok(true)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub async fn do_deletion(
let query_id = ctx.get_id();
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
ctx.set_executor(Arc::downgrade(&executor.get_inner()));
ctx.set_executor(executor.get_inner())?;
executor.execute()?;
}
Ok(())
Expand Down

0 comments on commit 9e3a744

Please sign in to comment.