Skip to content

Commit 06a34cc

Browse files
authored
Merge branch 'main' into bump
2 parents 852b671 + e59bed1 commit 06a34cc

File tree

12 files changed

+8
-142
lines changed

12 files changed

+8
-142
lines changed

src/query/functions/src/scalars/logics/and.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl Function for LogicAndFiltersFunction {
7979
input_rows: usize,
8080
) -> Result<ColumnRef> {
8181
if columns.len() == 1 {
82-
return Ok(columns[1].column().clone());
82+
return Ok(columns[0].column().clone());
8383
}
8484

8585
let mut validity = None;

src/query/pipeline/core/src/processors/processor.rs

-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use futures::FutureExt;
2323
use petgraph::graph::node_index;
2424
use petgraph::prelude::NodeIndex;
2525

26-
#[derive(Debug)]
2726
pub enum Event {
2827
NeedData,
2928
NeedConsume,

src/query/service/src/pipelines/executor/executor_graph.rs

+1-9
Original file line numberDiff line numberDiff line change
@@ -242,15 +242,7 @@ impl ExecutingGraph {
242242
state_guard_cache = Some(node.state.lock().unwrap());
243243
}
244244

245-
let event = node.processor.event()?;
246-
tracing::debug!(
247-
"node id:{:?}, name:{:?}, event: {:?}",
248-
node.processor.id(),
249-
node.processor.name(),
250-
event
251-
);
252-
253-
let processor_state = match event {
245+
let processor_state = match node.processor.event()? {
254246
Event::Finished => State::Finished,
255247
Event::NeedData | Event::NeedConsume => State::Idle,
256248
Event::Sync => {

src/query/service/src/pipelines/executor/executor_worker_context.rs

+1-14
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use std::fmt::Debug;
1616
use std::fmt::Formatter;
1717
use std::sync::Arc;
18-
use std::time::Instant;
1918

2019
use common_base::base::TrySpawn;
2120
use common_exception::ErrorCode;
@@ -80,19 +79,7 @@ impl ExecutorWorkerContext {
8079
}
8180

8281
unsafe fn execute_sync_task(&mut self, processor: ProcessorPtr) -> Result<Option<NodeIndex>> {
83-
if tracing::enabled!(tracing::Level::DEBUG) {
84-
let start = Instant::now();
85-
processor.process()?;
86-
tracing::debug!(
87-
"sync processor, node id:{:?}, name:{:?}, event: {:?}",
88-
processor.id(),
89-
processor.name(),
90-
start.elapsed()
91-
);
92-
} else {
93-
processor.process()?;
94-
}
95-
82+
processor.process()?;
9683
Ok(Some(processor.id()))
9784
}
9885

src/query/service/src/pipelines/executor/processor_async_task.rs

-6
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,6 @@ impl ProcessorAsyncTask {
8080
);
8181
}
8282
Either::Right((res, _)) => {
83-
tracing::debug!(
84-
"async processor, node id {:?} name: {:?}, elapsed:{:?}",
85-
wraning_processor.id(),
86-
wraning_processor.name(),
87-
start.elapsed()
88-
);
8983
return res;
9084
}
9185
}

src/query/service/src/servers/http/clickhouse_handler.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ async fn execute(
111111
.start()
112112
.await
113113
.map_err(|e| error!("interpreter.start.error: {:?}", e));
114-
let data_stream: SendableDataBlockStream = {
114+
let mut data_stream: SendableDataBlockStream = {
115115
let output_port = OutputPort::create();
116116
let stream_source = StreamSource::create(ctx.clone(), input_stream, output_port.clone())?;
117117
let mut source_pipe_builder = SourcePipeBuilder::create();
@@ -122,7 +122,6 @@ async fn execute(
122122
interpreter.execute(ctx.clone()).await?
123123
};
124124

125-
let mut data_stream = ctx.try_create_abortable(data_stream)?;
126125
let format_setting = ctx.get_format_settings()?;
127126
let mut output_format = format.create_format(schema, format_setting);
128127
let prefix = Ok(output_format.serialize_prefix()?);

src/query/service/src/servers/http/v1/query/execute_state.rs

+3-13
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ use common_exception::Result;
3030
use common_legacy_planners::PlanNode;
3131
use common_streams::DataBlockStream;
3232
use common_streams::SendableDataBlockStream;
33-
use futures::future::AbortHandle;
34-
use futures::future::Abortable;
3533
use futures::StreamExt;
3634
use futures_util::FutureExt;
3735
use serde::Deserialize;
@@ -322,8 +320,7 @@ async fn execute(
322320
block_buffer: Arc<BlockBuffer>,
323321
executor: Arc<RwLock<Executor>>,
324322
) -> Result<()> {
325-
let data_stream = interpreter.execute(ctx.clone()).await?;
326-
let mut data_stream = ctx.try_create_abortable(data_stream)?;
323+
let mut data_stream = interpreter.execute(ctx.clone()).await?;
327324
let use_result_cache = !ctx.get_config().query.management_mode;
328325

329326
match data_stream.next().await {
@@ -425,17 +422,10 @@ impl HttpQueryHandle {
425422
};
426423

427424
let (error_sender, mut error_receiver) = mpsc::channel::<Result<()>>(1);
428-
let (abort_handle, abort_registration) = AbortHandle::new_pair();
429425

430426
GlobalIORuntime::instance().spawn(async move {
431-
let error_receiver = Abortable::new(error_receiver.recv(), abort_registration);
432-
ctx.add_source_abort_handle(abort_handle);
433-
match error_receiver.await {
434-
Err(_) => {
435-
Executor::stop(&executor, Err(ErrorCode::AbortedQuery("")), false).await;
436-
block_buffer.stop_push().await.unwrap();
437-
}
438-
Ok(Some(Err(e))) => {
427+
match error_receiver.recv().await {
428+
Some(Err(e)) => {
439429
Executor::stop(&executor, Err(e), false).await;
440430
block_buffer.stop_push().await.unwrap();
441431
}

src/query/service/src/servers/mysql/mysql_interactive_worker.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -459,10 +459,7 @@ impl<W: AsyncWrite + Send + Unpin> InteractiveWorkerBase<W> {
459459
}
460460
};
461461

462-
let abortable_stream = ctx
463-
.try_create_abortable(intercepted_stream.boxed())?
464-
.boxed();
465-
Ok::<_, ErrorCode>(abortable_stream)
462+
Ok::<_, ErrorCode>(intercepted_stream.boxed())
466463
}
467464
.in_current_span()
468465
})?;

src/query/service/src/sessions/query_ctx.rs

-13
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ use common_legacy_planners::SourceInfo;
4141
use common_legacy_planners::StageTableInfo;
4242
use common_meta_app::schema::TableInfo;
4343
use common_meta_types::UserInfo;
44-
use common_streams::AbortStream;
45-
use common_streams::SendableDataBlockStream;
46-
use futures::future::AbortHandle;
4744
use opendal::Operator;
4845
use parking_lot::Mutex;
4946
use parking_lot::RwLock;
@@ -144,16 +141,6 @@ impl QueryContext {
144141
DataExchangeManager::instance()
145142
}
146143

147-
pub fn try_create_abortable(&self, input: SendableDataBlockStream) -> Result<AbortStream> {
148-
let (abort_handle, abort_stream) = AbortStream::try_create(input)?;
149-
self.shared.add_source_abort_handle(abort_handle);
150-
Ok(abort_stream)
151-
}
152-
153-
pub fn add_source_abort_handle(&self, abort_handle: AbortHandle) {
154-
self.shared.add_source_abort_handle(abort_handle);
155-
}
156-
157144
pub fn attach_http_query(&self, handle: HttpQueryHandle) {
158145
self.shared.attach_http_query_handle(handle);
159146
}

src/query/service/src/sessions/query_ctx_shared.rs

-13
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use common_exception::ErrorCode;
2525
use common_exception::Result;
2626
use common_meta_types::UserInfo;
2727
use common_storage::StorageOperator;
28-
use futures::future::AbortHandle;
2928
use opendal::Operator;
3029
use parking_lot::Mutex;
3130
use parking_lot::RwLock;
@@ -68,7 +67,6 @@ pub struct QueryContextShared {
6867
pub(in crate::sessions) runtime: Arc<RwLock<Option<Arc<Runtime>>>>,
6968
pub(in crate::sessions) init_query_id: Arc<RwLock<String>>,
7069
pub(in crate::sessions) cluster_cache: Arc<Cluster>,
71-
pub(in crate::sessions) sources_abort_handle: Arc<RwLock<Vec<AbortHandle>>>,
7270
pub(in crate::sessions) subquery_index: Arc<AtomicUsize>,
7371
pub(in crate::sessions) running_query: Arc<RwLock<Option<String>>>,
7472
pub(in crate::sessions) http_query: Arc<RwLock<Option<HttpQueryHandle>>>,
@@ -99,7 +97,6 @@ impl QueryContextShared {
9997
write_progress: Arc::new(Progress::create()),
10098
error: Arc::new(Mutex::new(None)),
10199
runtime: Arc::new(RwLock::new(None)),
102-
sources_abort_handle: Arc::new(RwLock::new(Vec::new())),
103100
subquery_index: Arc::new(AtomicUsize::new(1)),
104101
running_query: Arc::new(RwLock::new(None)),
105102
http_query: Arc::new(RwLock::new(None)),
@@ -123,11 +120,6 @@ impl QueryContextShared {
123120
executor.finish(Some(cause));
124121
}
125122

126-
let mut sources_abort_handle = self.sources_abort_handle.write();
127-
128-
while let Some(source_abort_handle) = sources_abort_handle.pop() {
129-
source_abort_handle.abort();
130-
}
131123
// TODO: Wait for the query to be processed (write out the last error)
132124
}
133125

@@ -256,11 +248,6 @@ impl QueryContextShared {
256248
running_query.as_ref().unwrap_or(&"".to_string()).clone()
257249
}
258250

259-
pub fn add_source_abort_handle(&self, handle: AbortHandle) {
260-
let mut sources_abort_handle = self.sources_abort_handle.write();
261-
sources_abort_handle.push(handle);
262-
}
263-
264251
pub fn get_config(&self) -> Config {
265252
self.config.clone()
266253
}

src/query/streams/src/lib.rs

-2
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@
1414

1515
mod sources;
1616
mod stream;
17-
mod stream_abort;
1817
mod stream_datablock;
1918
mod stream_error;
2019
mod stream_progress;
2120
mod stream_take;
2221

2322
pub use sources::*;
2423
pub use stream::*;
25-
pub use stream_abort::AbortStream;
2624
pub use stream_datablock::DataBlockStream;
2725
pub use stream_error::ErrorStream;
2826
pub use stream_progress::ProgressStream;

src/query/streams/src/stream_abort.rs

-64
This file was deleted.

0 commit comments

Comments
 (0)