diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs index e9324ed602b..31ef65be775 100644 --- a/core/src/raw/futures_util.rs +++ b/core/src/raw/futures_util.rs @@ -88,6 +88,12 @@ pub struct ConcurrentTasks { tasks: VecDeque)>>, /// `results` stores the successful results. results: VecDeque, + + /// hitting the last unrecoverable error. + /// + /// If concurrent tasks hit an unrecoverable error, it will stop executing new tasks and return + /// an unrecoverable error to users. + errored: bool, } impl ConcurrentTasks { @@ -105,6 +111,7 @@ impl ConcurrentTasks { tasks: VecDeque::with_capacity(concurrent), results: VecDeque::with_capacity(concurrent), + errored: false, } } @@ -140,6 +147,13 @@ impl ConcurrentTasks { /// - Execute the task in the background if there are available slots. /// - Await the first task in the queue if there is no available slots. pub async fn execute(&mut self, input: I) -> Result<()> { + if self.errored { + return Err(Error::new( + ErrorKind::Unexpected, + "concurrent tasks met an unrecoverable error", + )); + } + // Short path for non-concurrent case. if !self.is_concurrent() { let (_, o) = (self.factory)(input).await; @@ -164,6 +178,9 @@ impl ConcurrentTasks { if err.is_temporary() { self.tasks .push_front(self.executor.execute((self.factory)(i))); + } else { + self.clear(); + self.errored = true; } return Err(err); } @@ -197,6 +214,9 @@ impl ConcurrentTasks { if err.is_temporary() { self.tasks .push_front(self.executor.execute((self.factory)(i))); + } else { + self.clear(); + self.errored = true; } return Err(err); } @@ -206,6 +226,13 @@ impl ConcurrentTasks { /// Fetch the successful result from the result queue. pub async fn next(&mut self) -> Option> { + if self.errored { + return Some(Err(Error::new( + ErrorKind::Unexpected, + "concurrent tasks met an unrecoverable error", + ))); + } + if let Some(result) = self.results.pop_front() { return Some(Ok(result)); } @@ -219,6 +246,9 @@ impl ConcurrentTasks { if err.is_temporary() { self.tasks .push_front(self.executor.execute((self.factory)(i))); + } else { + self.clear(); + self.errored = true; } Some(Err(err)) } diff --git a/core/src/raw/oio/write/multipart_write.rs b/core/src/raw/oio/write/multipart_write.rs index 77179a20cea..08b9598607f 100644 --- a/core/src/raw/oio/write/multipart_write.rs +++ b/core/src/raw/oio/write/multipart_write.rs @@ -326,7 +326,9 @@ mod tests { // We will have 10% percent rate for write part to fail. if thread_rng().gen_bool(1.0 / 10.0) { - return Err(Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!")); + return Err( + Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!").set_temporary() + ); } {