Skip to content

Commit

Permalink
fix: report local execution mode error (risingwavelabs#7454)
Browse files Browse the repository at this point in the history
1. Enable local mode error propagation. Now when local mode task (in CN) happens error, it can report to users.
2. Store sender in TaskExecution, avoid early drop (Otherwise it's possible that the task execution error will become hash shuffle error)

This pr revert some previous workaround: TODO in sqlsmith, store the sender in task execution

Approved-By: liurenjie1024

Co-Authored-By: BowenXiao1999 <931759898@qq.com>
Co-Authored-By: Bowen <36908971+bowenxiao1999@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 3, 2023
1 parent a3306ea commit 20bdb72
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 55 deletions.
31 changes: 31 additions & 0 deletions e2e_test/batch/issue_7324.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# This is a test on error propagation of local mode. If we can not correctly handle the error report, it will hang up (#7324).

statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
SET CREATE_COMPACTION_GROUP_FOR_MV TO true;

statement ok
CREATE TABLE INT2_TBL(f1 int2);

statement ok
INSERT INTO INT2_TBL(f1) VALUES ('0 ');

statement ok
INSERT INTO INT2_TBL(f1) VALUES (' 1234 ');

statement ok
INSERT INTO INT2_TBL(f1) VALUES (' -1234');

statement ok
INSERT INTO INT2_TBL(f1) VALUES ('32767');

statement ok
INSERT INTO INT2_TBL(f1) VALUES ('-32767');

statement error
SELECT i.f1, i.f1 * smallint '2' AS x FROM INT2_TBL i;

statement ok
drop table INT2_TBL;
4 changes: 3 additions & 1 deletion src/batch/src/execution/grpc_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ impl ExchangeSource for GrpcExchangeSource {
fn take_data(&mut self) -> Self::TakeDataFuture<'_> {
async {
let res = match self.stream.next().await {
None => return Ok(None),
None => {
return Ok(None);
}
Some(r) => r,
};
let task_data = res?;
Expand Down
35 changes: 17 additions & 18 deletions src/batch/src/rpc/service/task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use tonic::{Request, Response, Status};

use crate::rpc::service::exchange::GrpcExchangeWriter;
use crate::task::{
self, BatchEnvironment, BatchManager, BatchTaskExecution, ComputeNodeContext, TaskId,
BatchEnvironment, BatchManager, BatchTaskExecution, ComputeNodeContext, StateReporter, TaskId,
TASK_STATUS_BUFFER_SIZE,
};

const LOCAL_EXECUTE_BUFFER_SIZE: usize = 64;
Expand All @@ -43,6 +44,7 @@ impl BatchServiceImpl {
}
}
pub(crate) type TaskInfoResponseResult = std::result::Result<TaskInfoResponse, Status>;
pub(crate) type GetDataResponseResult = std::result::Result<GetDataResponse, Status>;
#[async_trait::async_trait]
impl TaskService for BatchServiceImpl {
type CreateTaskStream = ReceiverStream<TaskInfoResponseResult>;
Expand All @@ -59,6 +61,8 @@ impl TaskService for BatchServiceImpl {
epoch,
} = request.into_inner();

let (state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE);
let state_reporter = StateReporter::new_with_dist_sender(state_tx);
let res = self
.mgr
.fire_task(
Expand All @@ -69,6 +73,7 @@ impl TaskService for BatchServiceImpl {
self.env.clone(),
TaskId::from(task_id.as_ref().expect("no task id found")),
),
state_reporter,
)
.await;
match res {
Expand All @@ -78,8 +83,7 @@ impl TaskService for BatchServiceImpl {
// Will be used for receive task status update.
// Note: we introduce this hack cuz `.execute()` do not produce a status stream,
// but still share `.async_execute()` and `.try_execute()`.
self.mgr
.get_task_receiver(&task::TaskId::from(&task_id.unwrap())),
state_rx,
))),
Err(e) => {
error!("failed to fire task {}", e);
Expand Down Expand Up @@ -120,8 +124,9 @@ impl TaskService for BatchServiceImpl {
);
let task = BatchTaskExecution::new(&task_id, plan, context, epoch, self.mgr.runtime())?;
let task = Arc::new(task);

if let Err(e) = task.clone().async_execute().await {
let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE);
let state_reporter = StateReporter::new_with_local_sender(tx.clone());
if let Err(e) = task.clone().async_execute(state_reporter).await {
error!(
"failed to build executors and trigger execution of Task {:?}: {}",
task_id, e
Expand All @@ -142,20 +147,14 @@ impl TaskService for BatchServiceImpl {
);
e
})?;
let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE);

let mut writer = GrpcExchangeWriter::new(tx.clone());
let finish = output
.take_data_with_num(&mut writer, tx.capacity())
.await?;
if !finish {
self.mgr.runtime().spawn(async move {
match output.take_data(&mut writer).await {
Ok(_) => Ok(()),
Err(e) => tx.send(Err(e.into())).await,
}
});
}
// Always spawn a task and do not block current function.
self.mgr.runtime().spawn(async move {
match output.take_data(&mut writer).await {
Ok(_) => Ok(()),
Err(e) => tx.send(Err(e.into())).await,
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
1 change: 1 addition & 0 deletions src/batch/src/task/broadcast_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSende
use crate::task::data_chunk_in_channel::DataChunkInChannel;

/// `BroadcastSender` sends the same chunk to a number of `BroadcastReceiver`s.
#[derive(Clone)]
pub struct BroadcastSender {
senders: Vec<mpsc::Sender<Option<DataChunkInChannel>>>,
broadcast_info: BroadcastInfo,
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(super) trait ChanSender: Send {
fn send(&mut self, chunk: Option<DataChunk>) -> Self::SendFuture<'_>;
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ChanSenderImpl {
HashShuffle(HashShuffleSender),
ConsistentHashShuffle(ConsistentHashShuffleSender),
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/task/consistent_hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::error::Result as BatchResult;
use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl};
use crate::task::data_chunk_in_channel::DataChunkInChannel;

#[derive(Clone)]
pub struct ConsistentHashShuffleSender {
senders: Vec<mpsc::Sender<Option<DataChunkInChannel>>>,
consistent_hash_info: ConsistentHashInfo,
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/task/fifo_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::error::BatchError::SenderError;
use crate::error::Result as BatchResult;
use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl};
use crate::task::data_chunk_in_channel::DataChunkInChannel;
#[derive(Clone)]
pub struct FifoSender {
sender: mpsc::Sender<Option<DataChunkInChannel>>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::error::BatchError::SenderError;
use crate::error::Result as BatchResult;
use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl};
use crate::task::data_chunk_in_channel::DataChunkInChannel;

#[derive(Clone)]
pub struct HashShuffleSender {
senders: Vec<mpsc::Sender<Option<DataChunkInChannel>>>,
hash_info: HashInfo,
Expand Down
92 changes: 69 additions & 23 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ use crate::error::BatchError::SenderError;
use crate::error::{BatchError, Result as BatchResult};
use crate::executor::{BoxedExecutor, ExecutorBuilder};
use crate::rpc::service::exchange::ExchangeWriter;
use crate::rpc::service::task_service::TaskInfoResponseResult;
use crate::rpc::service::task_service::{GetDataResponseResult, TaskInfoResponseResult};
use crate::task::channel::{create_output_channel, ChanReceiverImpl, ChanSenderImpl};
use crate::task::BatchTaskContext;

// Now we will only at most have 2 status for each status channel. Running -> Failed or Finished.
const TASK_STATUS_BUFFER_SIZE: usize = 2;
pub const TASK_STATUS_BUFFER_SIZE: usize = 2;

/// A special version for batch allocation stat, passed in another task `context` C to report task
/// mem usage 0 bytes at the end.
Expand Down Expand Up @@ -86,6 +86,48 @@ where
.await
}

/// Send batch task status (local/distributed) to frontend.
///
///
/// Local mode use `StateReporter::Local`, Distributed mode use `StateReporter::Distributed` to send
/// status (Failed/Finished) update. `StateReporter::Mock` is only used in test and do not takes any
/// effect. Local sender only report Failed update, Distributed sender will also report
/// Finished/Pending/Starting/Aborted etc.
pub enum StateReporter {
Local(tokio::sync::mpsc::Sender<GetDataResponseResult>),
Distributed(tokio::sync::mpsc::Sender<TaskInfoResponseResult>),
Mock(),
}

impl StateReporter {
pub async fn send(&mut self, val: TaskInfoResponseResult) -> BatchResult<()> {
match self {
Self::Local(s) => {
if let Err(e) = val {
s.send(Err(e)).await.map_err(|_| SenderError)
} else {
// do nothing and just return.
Ok(())
}
}
Self::Distributed(s) => s.send(val).await.map_err(|_| SenderError),
Self::Mock() => Ok(()),
}
}

pub fn new_with_local_sender(s: tokio::sync::mpsc::Sender<GetDataResponseResult>) -> Self {
Self::Local(s)
}

pub fn new_with_dist_sender(s: tokio::sync::mpsc::Sender<TaskInfoResponseResult>) -> Self {
Self::Distributed(s)
}

pub fn new_with_test() -> Self {
Self::Mock()
}
}

#[derive(PartialEq, Eq, Hash, Clone, Debug, Default)]
pub struct TaskId {
pub task_id: u32,
Expand Down Expand Up @@ -247,6 +289,9 @@ pub struct BatchTaskExecution<C> {
/// Receivers data of the task.
receivers: Mutex<Vec<Option<ChanReceiverImpl>>>,

/// Sender for sending chunks between different executors.
sender: ChanSenderImpl,

/// Context for task execution
context: C,

Expand Down Expand Up @@ -275,17 +320,27 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
runtime: &'static Runtime,
) -> Result<Self> {
let task_id = TaskId::from(prost_tid);

let (sender, receivers) = create_output_channel(
plan.get_exchange_info()?,
context.get_config().developer.batch_output_channel_size,
)?;

let mut rts = Vec::new();
rts.extend(receivers.into_iter().map(Some));

Ok(Self {
task_id,
plan,
state: Mutex::new(TaskStatus::Pending),
receivers: Mutex::new(Vec::new()),
receivers: Mutex::new(rts),
failure: Arc::new(Mutex::new(None)),
epoch,
shutdown_tx: Mutex::new(None),
state_rx: Mutex::new(None),
context,
runtime,
sender,
})
}

Expand All @@ -299,7 +354,8 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
/// hash partitioned across multiple channels.
/// To obtain the result, one must pick one of the channels to consume via [`TaskOutputId`]. As
/// such, parallel consumers are able to consume the result independently.
pub async fn async_execute(self: Arc<Self>) -> Result<()> {
pub async fn async_execute(self: Arc<Self>, state_tx: StateReporter) -> Result<()> {
let mut state_tx = state_tx;
trace!(
"Prepare executing plan [{:?}]: {}",
self.task_id,
Expand All @@ -316,26 +372,15 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
.await?;

// Init shutdown channel and data receivers.
let (sender, receivers) = create_output_channel(
self.plan.get_exchange_info()?,
self.context
.get_config()
.developer
.batch_output_channel_size,
)?;
let sender = self.sender.clone();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<u64>();
*self.shutdown_tx.lock() = Some(shutdown_tx);
self.receivers
.lock()
.extend(receivers.into_iter().map(Some));
let failure = self.failure.clone();
let task_id = self.task_id.clone();

// After we init the output receivers, it's must safe to schedule next stage -- able to send
// TaskStatus::Running here.
let (mut state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE);
// Init the state receivers. Swap out later.
*self.state_rx.lock() = Some(state_rx);
self.change_state_notify(TaskStatus::Running, &mut state_tx, None)
.await?;

Expand Down Expand Up @@ -373,6 +418,11 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
// It's possible to send fail. Same reason in `.try_execute`.
warn!("send task execution error message fail!");
}

// There will be no more chunks, so send None.
if let Err(_e) = sender.send(None).await {
warn!("failed to send None to annotate end");
}
}
};

Expand Down Expand Up @@ -437,15 +487,12 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
pub async fn change_state_notify(
&self,
task_status: TaskStatus,
state_tx: &mut tokio::sync::mpsc::Sender<TaskInfoResponseResult>,
state_tx: &mut StateReporter,
err_str: Option<String>,
) -> BatchResult<()> {
self.change_state(task_status);
if let Some(err_str) = err_str {
state_tx
.send(Err(Status::internal(err_str)))
.await
.map_err(|_| SenderError)
state_tx.send(Err(Status::internal(err_str))).await
} else {
// Notify frontend the task status.
state_tx
Expand All @@ -458,7 +505,6 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
..Default::default()
}))
.await
.map_err(|_| SenderError)
}
}

Expand All @@ -471,7 +517,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
root: BoxedExecutor,
sender: &mut ChanSenderImpl,
mut shutdown_rx: Receiver<u64>,
state_tx: &mut tokio::sync::mpsc::Sender<TaskInfoResponseResult>,
state_tx: &mut StateReporter,
) -> Result<()> {
let mut data_chunk_stream = root.execute();
let mut state = TaskStatus::Unspecified;
Expand Down
Loading

0 comments on commit 20bdb72

Please sign in to comment.