Skip to content

Commit

Permalink
feat(stream): ErrorSuppressor for user compute errors (#8132)
Browse files Browse the repository at this point in the history
`ErrorSuppressor` for user compute errors

Approved-By: fuyufjh

Co-Authored-By: jon-chuang <jon-chuang@users.noreply.github.com>
Co-Authored-By: jon-chuang <9093549+jon-chuang@users.noreply.github.com>
  • Loading branch information
jon-chuang and jon-chuang authored Feb 23, 2023
1 parent f5f8f83 commit 52a39fd
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 20 deletions.
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ pub struct StreamingConfig {

#[serde(default)]
pub developer: DeveloperConfig,

/// Max unique user stream errors per actor
#[serde(default = "default::streaming::unique_user_stream_errors")]
pub unique_user_stream_errors: usize,
}

impl Default for StreamingConfig {
Expand Down Expand Up @@ -636,6 +640,10 @@ mod default {
pub fn async_stack_trace() -> AsyncStackTraceOption {
AsyncStackTraceOption::On
}

pub fn unique_user_stream_errors() -> usize {
10
}
}

pub mod file_cache {
Expand Down
40 changes: 40 additions & 0 deletions src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
// limitations under the License.

use std::backtrace::Backtrace;
use std::collections::HashSet;
use std::convert::Infallible;
use std::fmt::{Debug, Display, Formatter};
use std::io::Error as IoError;
use std::time::{Duration, SystemTime};

use memcomparable::Error as MemComparableError;
use risingwave_pb::ProstFieldNotFound;
Expand All @@ -29,6 +31,8 @@ use crate::util::value_encoding::error::ValueEncodingError;
/// Header used to store serialized [`RwError`] in grpc status.
pub const RW_ERROR_GRPC_HEADER: &str = "risingwave-error-bin";

const ERROR_SUPPRESSOR_RESET_DURATION: Duration = Duration::from_millis(60 * 60 * 1000); // 1h

pub trait Error = std::error::Error + Send + Sync + 'static;
pub type BoxedError = Box<dyn Error>;

Expand Down Expand Up @@ -422,6 +426,42 @@ macro_rules! bail {
};
}

#[derive(Debug)]
pub struct ErrorSuppressor {
max_unique: usize,
unique: HashSet<String>,
last_reset_time: SystemTime,
}

impl ErrorSuppressor {
pub fn new(max_unique: usize) -> Self {
Self {
max_unique,
last_reset_time: SystemTime::now(),
unique: Default::default(),
}
}

pub fn suppress_error(&mut self, error: &str) -> bool {
self.try_reset();
if self.unique.contains(error) {
false
} else if self.unique.len() < self.max_unique {
self.unique.insert(error.to_string());
false
} else {
// We have exceeded the capacity.
true
}
}

fn try_reset(&mut self) {
if self.last_reset_time.elapsed().unwrap() >= ERROR_SUPPRESSOR_RESET_DURATION {
*self = Self::new(self.max_unique)
}
}
}

#[cfg(test)]
mod tests {
use std::convert::Into;
Expand Down
37 changes: 17 additions & 20 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

Expand All @@ -22,6 +21,7 @@ use futures::pin_mut;
use hytra::TrAdder;
use minitrace::prelude::*;
use parking_lot::Mutex;
use risingwave_common::error::ErrorSuppressor;
use risingwave_common::util::epoch::EpochPair;
use risingwave_expr::ExprError;
use tokio_stream::StreamExt;
Expand All @@ -37,13 +37,11 @@ pub struct ActorContext {
pub id: ActorId,
pub fragment_id: u32,

// TODO: report errors and prompt the user.
pub errors: Mutex<HashMap<String, Vec<ExprError>>>,

last_mem_val: Arc<AtomicUsize>,
cur_mem_val: Arc<AtomicUsize>,
total_mem_val: Arc<TrAdder<i64>>,
streaming_metrics: Arc<StreamingMetrics>,
pub error_suppressor: Arc<Mutex<ErrorSuppressor>>,
}

pub type ActorContextRef = Arc<ActorContext>;
Expand All @@ -53,11 +51,11 @@ impl ActorContext {
Arc::new(Self {
id,
fragment_id: 0,
errors: Default::default(),
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
total_mem_val: Arc::new(TrAdder::new()),
streaming_metrics: Arc::new(StreamingMetrics::unused()),
error_suppressor: Arc::new(Mutex::new(ErrorSuppressor::new(10))),
})
}

Expand All @@ -66,35 +64,34 @@ impl ActorContext {
fragment_id: u32,
total_mem_val: Arc<TrAdder<i64>>,
streaming_metrics: Arc<StreamingMetrics>,
unique_user_errors: usize,
) -> ActorContextRef {
Arc::new(Self {
id,
fragment_id,
errors: Default::default(),
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
total_mem_val,
streaming_metrics,
error_suppressor: Arc::new(Mutex::new(ErrorSuppressor::new(unique_user_errors))),
})
}

pub fn on_compute_error(&self, err: ExprError, identity: &str) {
tracing::error!("Compute error: {}, executor: {identity}", err);
let executor_name = identity.split(' ').next().unwrap_or("name_not_found");
self.streaming_metrics
.user_compute_error_count
.with_label_values(&[
"ExprError",
&err.to_string(),
executor_name,
&self.fragment_id.to_string(),
])
.inc();
self.errors
.lock()
.entry(identity.to_owned())
.or_default()
.push(err);
let err_str = err.to_string();
if !self.error_suppressor.lock().suppress_error(&err_str) {
self.streaming_metrics
.user_compute_error_count
.with_label_values(&[
"ExprError",
&err_str,
executor_name,
&self.fragment_id.to_string(),
])
.inc();
}
}

pub fn store_mem_usage(&self, val: usize) {
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ impl LocalStreamManagerCore {
actor.fragment_id,
self.total_mem_val.clone(),
self.streaming_metrics.clone(),
self.config.unique_user_stream_errors,
);
let vnode_bitmap = actor
.vnode_bitmap
Expand Down

0 comments on commit 52a39fd

Please sign in to comment.