Skip to content

Commit

Permalink
remove spawn_on_all_threads and add example that uses spawn_by_idx in…
Browse files Browse the repository at this point in the history
… a loop to spawn a task on all threads
  • Loading branch information
b-naber committed May 27, 2022
1 parent eb41fd8 commit 5e72463
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 55 deletions.
58 changes: 27 additions & 31 deletions tokio-util/src/task/spawn_pinned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,53 +133,49 @@ impl LocalPoolHandle {
/// A worker thread is chosen by index. Indices are 0 based and the largest index
/// is given by `num_threads() - 1`
///
/// Panics if the index is out of bounds.
pub fn spawn_pinned_by_idx<F, Fut>(&self, create_task: F, idx: usize) -> JoinHandle<Fut::Output>
where
F: FnOnce() -> Fut,
F: Send + 'static,
Fut: Future + 'static,
Fut::Output: Send + 'static,
{
self.pool
.spawn_pinned(create_task, WorkerChoice::ByIdx(idx))
}

/// Spawn a task on every worker thread in the pool and pin it so that it
/// can't be moved off of the thread.
/// # Panics
///
/// This method panics if the index is out of bounds.
///
/// # Examples
///
/// This method can be used to spawn a task on all worker threads of the pool:
///
/// ```
/// use std::rc::Rc;
/// use tokio_util::task::LocalPoolHandle;
///
/// #[tokio::main]
/// async fn main() {
/// let pool = LocalPoolHandle::new(3);
///
/// let _ = pool.spawn_pinned_on_all_workers(|| {
/// // Rc is !Send + !Sync
/// let local_data = Rc::new("test");
/// const NUM_WORKERS: usize = 3;
/// let pool = LocalPoolHandle::new(NUM_WORKERS);
/// let handles = (0..pool.num_threads())
/// .map(|worker_idx| {
/// pool.spawn_pinned_by_idx(
/// || {
/// async {
/// "test"
/// }
/// },
/// worker_idx,
/// )
/// })
/// .collect::<Vec<_>>();
///
/// // This future holds an Rc, so it is !Send
/// async move { local_data.to_string() }
/// });
/// let _ = handles
/// .into_iter()
/// .map(|handle| async { handle.await.unwrap() });
/// }
/// ```
pub fn spawn_pinned_on_all_workers<F, Fut>(
&self,
create_task: F,
) -> Vec<JoinHandle<Fut::Output>>
///
pub fn spawn_pinned_by_idx<F, Fut>(&self, create_task: F, idx: usize) -> JoinHandle<Fut::Output>
where
F: FnOnce() -> Fut,
F: Send + Clone + 'static,
F: Send + 'static,
Fut: Future + 'static,
Fut::Output: Send + 'static,
{
(0..self.pool.workers.len())
.map(|idx| self.spawn_pinned_by_idx(create_task.clone(), idx))
.collect::<Vec<_>>()
self.pool
.spawn_pinned(create_task, WorkerChoice::ByIdx(idx))
}
}

Expand Down
24 changes: 0 additions & 24 deletions tokio-util/tests/spawn_pinned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,27 +234,3 @@ async fn spawn_by_idx() {

assert_ne!(thread_id1, thread_id2);
}

#[tokio::test]
async fn spawn_on_all_workers() {
const NUM_WORKERS: usize = 3;
let pool = task::LocalPoolHandle::new(NUM_WORKERS);
let barrier = Arc::new(Barrier::new(NUM_WORKERS + 1));
let barrier_clone = barrier.clone();

let handles = pool.spawn_pinned_on_all_workers(|| async move {
barrier_clone.wait().await;

"test"
});

let loads = pool.get_task_loads_for_each_worker();
barrier.wait().await;
assert_eq!(loads[0], 1);
assert_eq!(loads[1], 1);
assert_eq!(loads[2], 1);

let _ = handles
.into_iter()
.map(|handle| async { handle.await.unwrap() });
}

0 comments on commit 5e72463

Please sign in to comment.