Skip to content

Commit 3e306c2

Browse files
authored
feat(futures-util/stream): add StreamExt::unzip (rust-lang#2263)
1 parent 34015c8 commit 3e306c2

File tree

198 files changed

+1965
-2392
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

198 files changed

+1965
-2392
lines changed

examples/functional/src/main.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ fn main() {
3030
// responsible for transmission
3131
pool.spawn_ok(fut_tx_result);
3232

33-
let fut_values = rx
34-
.map(|v| v * 2)
35-
.collect();
33+
let fut_values = rx.map(|v| v * 2).collect();
3634

3735
// Use the executor provided to this async block to wait for the
3836
// future to complete.
@@ -45,4 +43,4 @@ fn main() {
4543
let values: Vec<i32> = executor::block_on(fut_values);
4644

4745
println!("Values={:?}", values);
48-
}
46+
}

examples/imperative/src/main.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ fn main() {
3434
// of the stream to be available.
3535
while let Some(v) = rx.next().await {
3636
pending.push(v * 2);
37-
};
37+
}
3838

3939
pending
4040
};
@@ -45,4 +45,4 @@ fn main() {
4545
let values: Vec<i32> = executor::block_on(fut_values);
4646

4747
println!("Values={:?}", values);
48-
}
48+
}

futures-channel/benches/sync_mpsc.rs

+3-12
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use {
77
futures::{
88
channel::mpsc::{self, Sender, UnboundedSender},
99
ready,
10-
stream::{Stream, StreamExt},
1110
sink::Sink,
11+
stream::{Stream, StreamExt},
1212
task::{Context, Poll},
1313
},
1414
futures_test::task::noop_context,
@@ -25,7 +25,6 @@ fn unbounded_1_tx(b: &mut Bencher) {
2525
// 1000 iterations to avoid measuring overhead of initialization
2626
// Result should be divided by 1000
2727
for i in 0..1000 {
28-
2928
// Poll, not ready, park
3029
assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
3130

@@ -73,7 +72,6 @@ fn unbounded_uncontended(b: &mut Bencher) {
7372
})
7473
}
7574

76-
7775
/// A Stream that continuously sends incrementing number of the queue
7876
struct TestSender {
7977
tx: Sender<u32>,
@@ -84,9 +82,7 @@ struct TestSender {
8482
impl Stream for TestSender {
8583
type Item = u32;
8684

87-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
88-
-> Poll<Option<Self::Item>>
89-
{
85+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
9086
let this = &mut *self;
9187
let mut tx = Pin::new(&mut this.tx);
9288

@@ -123,12 +119,7 @@ fn bounded_100_tx(b: &mut Bencher) {
123119
// Each sender can send one item after specified capacity
124120
let (tx, mut rx) = mpsc::channel(0);
125121

126-
let mut tx: Vec<_> = (0..100).map(|_| {
127-
TestSender {
128-
tx: tx.clone(),
129-
last: 0
130-
}
131-
}).collect();
122+
let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect();
132123

133124
for i in 0..10 {
134125
for x in &mut tx {

futures-channel/src/lib.rs

-5
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,16 @@
77
//! library is activated, and it is activated by default.
88
99
#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
10-
1110
#![cfg_attr(not(feature = "std"), no_std)]
12-
1311
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
1412
// It cannot be included in the published code because this lints have false positives in the minimum required version.
1513
#![cfg_attr(test, warn(single_use_lifetimes))]
1614
#![warn(clippy::all)]
17-
1815
// mem::take requires Rust 1.40, matches! requires Rust 1.42
1916
// Can be removed if the minimum supported version increased or if https://github.com/rust-lang/rust-clippy/issues/3941
2017
// get's implemented.
2118
#![allow(clippy::mem_replace_with_default, clippy::match_like_matches_macro)]
22-
2319
#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
24-
2520
#![doc(html_root_url = "https://docs.rs/futures-channel/0.3.8")]
2621

2722
#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]

futures-channel/tests/channel.rs

+3-7
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use futures::channel::mpsc;
22
use futures::executor::block_on;
33
use futures::future::poll_fn;
4-
use futures::stream::StreamExt;
54
use futures::sink::SinkExt;
5+
use futures::stream::StreamExt;
66
use std::sync::atomic::{AtomicUsize, Ordering};
77
use std::thread;
88

@@ -11,9 +11,7 @@ fn sequence() {
1111
let (tx, rx) = mpsc::channel(1);
1212

1313
let amt = 20;
14-
let t = thread::spawn(move || {
15-
block_on(send_sequence(amt, tx))
16-
});
14+
let t = thread::spawn(move || block_on(send_sequence(amt, tx)));
1715
let list: Vec<_> = block_on(rx.collect());
1816
let mut list = list.into_iter();
1917
for i in (1..=amt).rev() {
@@ -34,9 +32,7 @@ async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) {
3432
fn drop_sender() {
3533
let (tx, mut rx) = mpsc::channel::<u32>(1);
3634
drop(tx);
37-
let f = poll_fn(|cx| {
38-
rx.poll_next_unpin(cx)
39-
});
35+
let f = poll_fn(|cx| rx.poll_next_unpin(cx));
4036
assert_eq!(block_on(f), None)
4137
}
4238

futures-channel/tests/mpsc-close.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ use std::thread;
99
fn smoke() {
1010
let (mut sender, receiver) = mpsc::channel(1);
1111

12-
let t = thread::spawn(move || {
13-
while let Ok(()) = block_on(sender.send(42)) {}
14-
});
12+
let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});
1513

1614
// `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
1715
block_on(receiver.take(3).for_each(|_| futures::future::ready(())));

futures-channel/tests/mpsc.rs

+15-18
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use futures::channel::{mpsc, oneshot};
22
use futures::executor::{block_on, block_on_stream};
3-
use futures::future::{FutureExt, poll_fn};
4-
use futures::stream::{Stream, StreamExt};
3+
use futures::future::{poll_fn, FutureExt};
4+
use futures::pin_mut;
55
use futures::sink::{Sink, SinkExt};
6+
use futures::stream::{Stream, StreamExt};
67
use futures::task::{Context, Poll};
7-
use futures::pin_mut;
88
use futures_test::task::{new_count_waker, noop_context};
9-
use std::sync::{Arc, Mutex};
109
use std::sync::atomic::{AtomicUsize, Ordering};
10+
use std::sync::{Arc, Mutex};
1111
use std::thread;
1212

1313
trait AssertSend: Send {}
@@ -77,7 +77,7 @@ fn send_shared_recv() {
7777
fn send_recv_threads() {
7878
let (mut tx, rx) = mpsc::channel::<i32>(16);
7979

80-
let t = thread::spawn(move|| {
80+
let t = thread::spawn(move || {
8181
block_on(tx.send(1)).unwrap();
8282
});
8383

@@ -204,7 +204,7 @@ fn stress_shared_unbounded() {
204204
const NTHREADS: u32 = 8;
205205
let (tx, rx) = mpsc::unbounded::<i32>();
206206

207-
let t = thread::spawn(move|| {
207+
let t = thread::spawn(move || {
208208
let result: Vec<_> = block_on(rx.collect());
209209
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
210210
for item in result {
@@ -215,7 +215,7 @@ fn stress_shared_unbounded() {
215215
for _ in 0..NTHREADS {
216216
let tx = tx.clone();
217217

218-
thread::spawn(move|| {
218+
thread::spawn(move || {
219219
for _ in 0..AMT {
220220
tx.unbounded_send(1).unwrap();
221221
}
@@ -233,7 +233,7 @@ fn stress_shared_bounded_hard() {
233233
const NTHREADS: u32 = 8;
234234
let (tx, rx) = mpsc::channel::<i32>(0);
235235

236-
let t = thread::spawn(move|| {
236+
let t = thread::spawn(move || {
237237
let result: Vec<_> = block_on(rx.collect());
238238
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
239239
for item in result {
@@ -297,9 +297,9 @@ fn stress_receiver_multi_task_bounded_hard() {
297297
}
298298
Poll::Ready(None) => {
299299
*rx_opt = None;
300-
break
301-
},
302-
Poll::Pending => {},
300+
break;
301+
}
302+
Poll::Pending => {}
303303
}
304304
}
305305
} else {
@@ -311,7 +311,6 @@ fn stress_receiver_multi_task_bounded_hard() {
311311
th.push(t);
312312
}
313313

314-
315314
for i in 0..AMT {
316315
block_on(tx.send(i)).unwrap();
317316
}
@@ -328,7 +327,7 @@ fn stress_receiver_multi_task_bounded_hard() {
328327
/// after sender dropped.
329328
#[test]
330329
fn stress_drop_sender() {
331-
fn list() -> impl Stream<Item=i32> {
330+
fn list() -> impl Stream<Item = i32> {
332331
let (tx, rx) = mpsc::channel(1);
333332
thread::spawn(move || {
334333
block_on(send_one_two_three(tx));
@@ -407,9 +406,7 @@ fn stress_poll_ready() {
407406
let mut threads = Vec::new();
408407
for _ in 0..NTHREADS {
409408
let sender = tx.clone();
410-
threads.push(thread::spawn(move || {
411-
block_on(stress_poll_ready_sender(sender, AMT))
412-
}));
409+
threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
413410
}
414411
drop(tx);
415412

@@ -436,7 +433,7 @@ fn try_send_1() {
436433
for i in 0..N {
437434
loop {
438435
if tx.try_send(i).is_ok() {
439-
break
436+
break;
440437
}
441438
}
442439
}
@@ -542,8 +539,8 @@ fn is_connected_to() {
542539

543540
#[test]
544541
fn hash_receiver() {
545-
use std::hash::Hasher;
546542
use std::collections::hash_map::DefaultHasher;
543+
use std::hash::Hasher;
547544

548545
let mut hasher_a1 = DefaultHasher::new();
549546
let mut hasher_a2 = DefaultHasher::new();

futures-channel/tests/oneshot.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use futures::channel::oneshot::{self, Sender};
22
use futures::executor::block_on;
3-
use futures::future::{FutureExt, poll_fn};
3+
use futures::future::{poll_fn, FutureExt};
44
use futures::task::{Context, Poll};
55
use futures_test::task::panic_waker_ref;
66
use std::sync::mpsc;
@@ -70,7 +70,7 @@ fn close() {
7070
rx.close();
7171
block_on(poll_fn(|cx| {
7272
match rx.poll_unpin(cx) {
73-
Poll::Ready(Err(_)) => {},
73+
Poll::Ready(Err(_)) => {}
7474
_ => panic!(),
7575
};
7676
assert!(tx.poll_canceled(cx).is_ready());

futures-core/src/future.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,12 @@ pub trait TryFuture: Future + private_try_future::Sealed {
6666
/// This method is a stopgap for a compiler limitation that prevents us from
6767
/// directly inheriting from the `Future` trait; in the future it won't be
6868
/// needed.
69-
fn try_poll(
70-
self: Pin<&mut Self>,
71-
cx: &mut Context<'_>,
72-
) -> Poll<Result<Self::Ok, Self::Error>>;
69+
fn try_poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Ok, Self::Error>>;
7370
}
7471

7572
impl<F, T, E> TryFuture for F
76-
where F: ?Sized + Future<Output = Result<T, E>>
73+
where
74+
F: ?Sized + Future<Output = Result<T, E>>,
7775
{
7876
type Ok = T;
7977
type Error = E;
@@ -86,8 +84,8 @@ impl<F, T, E> TryFuture for F
8684

8785
#[cfg(feature = "alloc")]
8886
mod if_alloc {
89-
use alloc::boxed::Box;
9087
use super::*;
88+
use alloc::boxed::Box;
9189

9290
impl<F: FusedFuture + ?Sized + Unpin> FusedFuture for Box<F> {
9391
fn is_terminated(&self) -> bool {

futures-core/src/lib.rs

+4-7
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,16 @@
11
//! Core traits and types for asynchronous operations in Rust.
22
33
#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
4-
54
#![cfg_attr(not(feature = "std"), no_std)]
6-
75
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
86
// It cannot be included in the published code because this lints have false positives in the minimum required version.
97
#![cfg_attr(test, warn(single_use_lifetimes))]
108
#![warn(clippy::all)]
11-
129
// mem::take requires Rust 1.40, matches! requires Rust 1.42
1310
// Can be removed if the minimum supported version increased or if https://github.com/rust-lang/rust-clippy/issues/3941
1411
// get's implemented.
1512
#![allow(clippy::mem_replace_with_default, clippy::match_like_matches_macro)]
16-
1713
#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
18-
1914
#![doc(html_root_url = "https://docs.rs/futures-core/0.3.8")]
2015

2116
#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
@@ -25,10 +20,12 @@ compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feat
2520
extern crate alloc;
2621

2722
pub mod future;
28-
#[doc(hidden)] pub use self::future::{Future, FusedFuture, TryFuture};
23+
#[doc(hidden)]
24+
pub use self::future::{FusedFuture, Future, TryFuture};
2925

3026
pub mod stream;
31-
#[doc(hidden)] pub use self::stream::{Stream, FusedStream, TryStream};
27+
#[doc(hidden)]
28+
pub use self::stream::{FusedStream, Stream, TryStream};
3229

3330
#[macro_use]
3431
pub mod task;

0 commit comments

Comments
 (0)