diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index 86405ce01..75715b390 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -19,24 +19,32 @@ where Box::pin(async move { // Using `scan` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; let out: V = stream - .scan(&mut found_error, |error, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - **error = Some(err); - // Stop processing the stream on error - None - } + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None } }) .collect() .await; - match found_error { - Some(err) => Err(err), - None => Ok(out), + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) } diff --git a/src/result/product.rs b/src/result/product.rs index 89ddacb97..b236651e9 100644 --- a/src/result/product.rs +++ b/src/result/product.rs @@ -42,22 +42,33 @@ where Box::pin(async move { // Using `scan` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; - let out = >::product(stream.scan(&mut found_error, |error, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - **error = Some(err); - // Stop processing the stream on error - None - } - } - })) + let out = >::product( + stream + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None + } + }), + ) .await; - match found_error { - Some(err) => Err(err), - None => Ok(out), + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) } diff --git a/src/result/sum.rs b/src/result/sum.rs index c0ef4c26f..f7e504516 100644 --- a/src/result/sum.rs +++ b/src/result/sum.rs @@ -42,22 +42,33 @@ where Box::pin(async move { // Using `scan` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; - let out = >::sum(stream.scan(&mut found_error, |error, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - **error = Some(err); - // Stop processing the stream on error - None - } - } - })) + let out = >::sum( + stream + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None + } + }), + ) .await; - match found_error { - Some(err) => Err(err), - None => Ok(out), + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) }