Skip to content

Commit

Permalink
fix: Fix potential block in frontent. (#7416)
Browse files Browse the repository at this point in the history
Currently local mode's root fragment is executed in frontend, if it maybe a blocking operation and block listener thread of frontend, so we move it to block pool.

Approved-By: BowenXiao1999
  • Loading branch information
liurenjie1024 authored Jan 17, 2023
1 parent 1132ec8 commit 9fd7845
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"signal",
"fs",
] }
tokio-stream = "0.1"
tonic = { version = "0.2", package = "madsim-tonic" }
tracing = "0.1"
uuid = "1"
Expand Down
48 changes: 23 additions & 25 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@

//! Local execution for batch query.
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::Stream;
use futures::executor::block_on;
use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use pgwire::pg_server::BoxedError;
Expand All @@ -35,6 +34,9 @@ use risingwave_pb::batch_plan::{
ExchangeInfo, ExchangeSource, LocalExecutePlan, PlanFragment, PlanNode as PlanNodeProst,
TaskId as ProstTaskId, TaskOutputId,
};
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;
use uuid::Uuid;

Expand All @@ -45,26 +47,7 @@ use crate::scheduler::task_context::FrontendBatchTaskContext;
use crate::scheduler::{PinnedHummockSnapshot, SchedulerResult};
use crate::session::{AuthContext, FrontendEnv};

pub struct LocalQueryStream {
data_stream: BoxedDataChunkStream,
}

impl Stream for LocalQueryStream {
type Item = Result<DataChunk, BoxedError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.data_stream.as_mut().poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(chunk) => match chunk {
Some(chunk_result) => match chunk_result {
Ok(chunk) => Poll::Ready(Some(Ok(chunk))),
Err(err) => Poll::Ready(Some(Err(Box::new(err)))),
},
None => Poll::Ready(None),
},
}
}
}
pub type LocalQueryStream = ReceiverStream<Result<DataChunk, BoxedError>>;

pub struct LocalQueryExecution {
sql: String,
Expand Down Expand Up @@ -129,9 +112,24 @@ impl LocalQueryExecution {
}

pub fn stream_rows(self) -> LocalQueryStream {
LocalQueryStream {
data_stream: self.run(),
let (sender, receiver) = mpsc::channel(10);
let mut data_stream = self.run().map(|r| r.map_err(|e| Box::new(e) as BoxedError));

let future = async move {
while let Some(r) = data_stream.next().await {
if (sender.send(r).await).is_err() {
tracing::info!("Receiver closed.");

This comment has been minimized.

Copy link
@wangrunji0408

wangrunji0408 Jul 6, 2023

Contributor

Why not just return to cancel the task? 👀
Currently the compute task may still be running.

}
}
};

if cfg!(madsim) {
tokio::spawn(future);
} else {
spawn_blocking(move || block_on(future));
}

ReceiverStream::new(receiver)
}

/// Convert query to plan fragment.
Expand Down

0 comments on commit 9fd7845

Please sign in to comment.