Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(core): Fix unit tests #4684

Merged
merged 2 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions core/src/raw/futures_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ pub struct ConcurrentTasks<I, O> {
tasks: VecDeque<Task<(I, Result<O>)>>,
/// `results` stores the successful results.
results: VecDeque<O>,

/// hitting the last unrecoverable error.
///
/// If concurrent tasks hit an unrecoverable error, it will stop executing new tasks and return
/// an unrecoverable error to users.
errored: bool,
}

impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
Expand All @@ -105,6 +111,7 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {

tasks: VecDeque::with_capacity(concurrent),
results: VecDeque::with_capacity(concurrent),
errored: false,
}
}

Expand Down Expand Up @@ -140,6 +147,13 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
/// - Execute the task in the background if there are available slots.
/// - Await the first task in the queue if there is no available slots.
pub async fn execute(&mut self, input: I) -> Result<()> {
if self.errored {
return Err(Error::new(
ErrorKind::Unexpected,
"concurrent tasks met an unrecoverable error",
));
}

// Short path for non-concurrent case.
if !self.is_concurrent() {
let (_, o) = (self.factory)(input).await;
Expand All @@ -164,6 +178,9 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
if err.is_temporary() {
self.tasks
.push_front(self.executor.execute((self.factory)(i)));
} else {
self.clear();
self.errored = true;
}
return Err(err);
}
Expand Down Expand Up @@ -197,6 +214,9 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
if err.is_temporary() {
self.tasks
.push_front(self.executor.execute((self.factory)(i)));
} else {
self.clear();
self.errored = true;
}
return Err(err);
}
Expand All @@ -206,6 +226,13 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {

/// Fetch the successful result from the result queue.
pub async fn next(&mut self) -> Option<Result<O>> {
if self.errored {
return Some(Err(Error::new(
ErrorKind::Unexpected,
"concurrent tasks met an unrecoverable error",
)));
}

if let Some(result) = self.results.pop_front() {
return Some(Ok(result));
}
Expand All @@ -219,6 +246,9 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
if err.is_temporary() {
self.tasks
.push_front(self.executor.execute((self.factory)(i)));
} else {
self.clear();
self.errored = true;
}
Some(Err(err))
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/raw/oio/write/multipart_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ mod tests {

// We will have 10% percent rate for write part to fail.
if thread_rng().gen_bool(1.0 / 10.0) {
return Err(Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!"));
return Err(
Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!").set_temporary()
);
}

{
Expand Down
Loading