Skip to content

Commit

Permalink
Fix Warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwe committed May 29, 2023
1 parent 5f440b9 commit dd0fbc3
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 72 deletions.
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub(crate) trait Owner<T: 'static>: Send + Sync {

/// Returns a tuple with the lower bound and an Option for the upper bound of remaining
/// slots for enqueuing in the queue.
fn remaining_slots_hint(&self) -> (u32, Option<u32>);
fn remaining_slots_hint(&self) -> (u16, Option<u16>);

/// Returns true if there are entries in the queue.
fn has_tasks(&self) -> bool;
Expand Down
33 changes: 14 additions & 19 deletions tokio/src/runtime/scheduler/multi_thread/queue/bwosq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,16 @@ impl<T> super::Owner<T> for Local<T> {
metrics: &mut MetricsBatch,
) {
if let Err(t) = self.inner.enqueue(task) {
inject.push(t);
// note: the current implementation is slow
// if self.inner.has_stealers() {
// inject.push(t);
// } else {
// // push overflow of old queue
// if let Some(block_iter) = self.inner.dequeue_block() {
// // could use `and_then` to chain block dequeues a couple of times if
// // successfull, if we want to steal more than one block
// inject.push_batch(block_iter.chain(std::iter::once(t)))
// } else {
// // Give up and use inject queue.
// inject.push(t)
// }
// }
// Add 1 to factor in the task currently being scheduled.
if self.inner.next_block_has_stealers() {
inject.push(t);
} else {
// push overflow of old queue
if let Some(block_iter) = self.inner.dequeue_block() {
inject.push_batch(block_iter.chain(std::iter::once(t)))
} else {
inject.push(t)
}
}
metrics.incr_overflow_count();
};
}
Expand All @@ -94,12 +88,13 @@ impl<T> super::Owner<T> for Local<T> {
let _num_enqueued = self.inner.enqueue_batch_unchecked(tasks);
}

fn remaining_slots_hint(&self) -> (u32, Option<u32>) {
fn remaining_slots_hint(&self) -> (u16, Option<u16>) {
let min_slots = self.inner.min_remaining_slots();
debug_assert!(min_slots <= u16::MAX.into());
// Note: If we do change from a linked list of blocks to an array of blocks,
// we may be able to quickly calculate an approximate upper bound based
// on the consumer cache _index_.
(min_slots as u32, None)
(min_slots as u16, None)
}

fn pop(&mut self) -> Option<task::Notified<T>> {
Expand Down Expand Up @@ -128,7 +123,7 @@ impl<T> super::Stealer<T> for Steal<T> {
) -> Option<task::Notified<T>> {
// In the rare case that the `dst` queue is at the same time also full, because the
// producer is blocked waiting on a stealer we only attempt to steal a single task
if dst.remaining_slots_hint().0 < ELEMENTS_PER_BLOCK as u32 {
if dst.remaining_slots_hint().0 < ELEMENTS_PER_BLOCK as u16 {
dst_metrics.incr_steal_count(1);
dst_metrics.incr_steal_operations();
// We could evaluate stealing exactly the amount of remaining slots + 1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl<const NUM_ELEMENTS_PER_BLOCK: usize> Index<NUM_ELEMENTS_PER_BLOCK> {

/// True if the block is empty
#[inline(always)]
#[allow(dead_code)]
pub(crate) fn is_empty(&self) -> bool {
self.0 == 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ pub(crate) struct Owner<E, const NUM_BLOCKS: usize, const ENTRIES_PER_BLOCK: usi
pcache: CachePadded<*const Block<E, { ENTRIES_PER_BLOCK }>>,
/// Consumer cache (single consumer) - points to block in self.queue.
ccache: CachePadded<*const Block<E, { ENTRIES_PER_BLOCK }>>,
/// Stealer position cache - Allows the owner to quickly check if there are any stealers
spos: CachePadded<Arc<AtomicUsize>>,
/// `Arc` to the actual queue to ensure the queue lives at least as long as the Owner.
#[allow(dead_code)]
queue: Pin<Arc<BwsQueue<E, NUM_BLOCKS, ENTRIES_PER_BLOCK>>>,
Expand Down Expand Up @@ -373,37 +371,15 @@ impl<E, const NUM_BLOCKS: usize, const ENTRIES_PER_BLOCK: usize>
true
}

// /// Advance consumer to the next block, unless the producer has not reached the block yet.
// fn try_advance_consumer_block(
// &mut self,
// next_block: &Block<E, ENTRIES_PER_BLOCK>,
// curr_consumed: IndexAndVersion<ENTRIES_PER_BLOCK>,
// ) -> Result<(), ()> {
// if self.can_advance_consumer_block(next_block, curr_consumed) {
// *self.ccache = next_block;
// Ok(())
// } else {
// Err(())
// }
// }

/// Todo: Ideally we would not have this function.
pub(crate) fn has_stealers(&self) -> bool {
let curr_spos = self.spos.load(Relaxed);
// spos increments beyond NUM_BLOCKS to prevent ABA problems.
let start_block_idx = curr_spos % NUM_BLOCKS;
for i in 0..NUM_BLOCKS {
let block_idx = (start_block_idx + i) % NUM_BLOCKS;
let blk: &Block<E, ENTRIES_PER_BLOCK> = &self.queue.blocks[block_idx];
let stolen = blk.stolen.load(Relaxed);
let reserved = blk.reserved.load(Relaxed);
if reserved != stolen {
return true;
} else if !reserved.index().is_full() {
return false;
}
}
false
/// Check if there are any entries in the next block that are currently being stolen.
pub(crate) fn next_block_has_stealers(&self) -> bool {
// SAFETY: `pcache` always points to a valid `Block` in the queue. We never create a mutable reference
// to a Block, so it is safe to construct a shared reference here.
let blk = unsafe { &**self.pcache };
let reserved = blk.reserved.load(Relaxed);
let stolen = blk.stolen.load(Relaxed);
// If reserved and stolen don't match then there is still an active stealer in the block.
stolen != reserved
}

/// Check if there are entries that can be stolen from the queue.
Expand Down Expand Up @@ -450,18 +426,6 @@ impl<E, const NUM_BLOCKS: usize, const ENTRIES_PER_BLOCK: usize>
free_slots
}

/// Returns `true` if enqueuing one block of entries would succeed.
pub(crate) fn can_enqueue_block(&self) -> bool {
// Note: the current implementation of this function is overly conservative but fast.
let current_block = unsafe { &*(**self.pcache).next() };
let committed = current_block.committed.load(Relaxed);
if committed.index().is_empty() {
true
} else {
self.is_next_block_writable(current_block, committed.version())
}
}

/// `true` if there is at least one entry that can be dequeued.
///
/// It is possible that a dequeue can still fail, since the item was stolen after we checked
Expand Down Expand Up @@ -740,7 +704,7 @@ impl<E, const NUM_BLOCKS: usize, const ENTRIES_PER_BLOCK: usize>

/// The estimated number of entries currently enqueued.
#[cfg(feature = "stats")]
pub fn estimated_queue_entries(&self) -> usize {
pub(crate) fn estimated_queue_entries(&self) -> usize {
self.queue.estimated_len()
}
}
Expand Down Expand Up @@ -851,7 +815,6 @@ pub(crate) fn new<E, const NUM_BLOCKS: usize, const ENTRIES_PER_BLOCK: usize>()
Owner {
pcache: CachePadded::new(first_block),
ccache: CachePadded::new(first_block),
spos: CachePadded::new(stealer_position.clone()),
queue: q.clone(),
},
Stealer {
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/scheduler/multi_thread/queue/tokioq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ impl<T: 'static> super::Owner<T> for Local<T> {

/// Returns a tuple with the lower bound and an Option for the upper bound of remaining
/// slots for enqueuing in the queue.
fn remaining_slots_hint(&self) -> (u32, Option<u32>) {
fn remaining_slots_hint(&self) -> (u16, Option<u16>) {
// Safety: We own the queue and thus are the only ones that could potentially mutate
// `inner.tail`.
let tail = unsafe { self.inner.tail.unsync_load() };
Expand All @@ -410,8 +410,8 @@ impl<T: 'static> super::Owner<T> for Local<T> {
// `tail` is always larger then `steal`, since the counter is monotonically increasing,
// at least until it wraps around at `UnsignedShort::MAX`. wrapping_sub always gives the
// correct difference.
let capacity = LOCAL_QUEUE_CAPACITY as UnsignedShort - (tail.wrapping_sub(steal)) as u32;
(capacity, Some(capacity))
let capacity = LOCAL_QUEUE_CAPACITY as UnsignedShort - (tail.wrapping_sub(steal));
(capacity as u16, Some(capacity as u16))
}

/// Pops a task from the local queue.
Expand Down Expand Up @@ -546,7 +546,7 @@ impl<T: 'static> super::Stealer<T> for Steal<T> {

impl<T> Steal<T> {
/// Steal half of the queues item, but not more than `max`.
fn steal_half_max(&self, max: u32) -> Option<StealerIterator<'_, T>> {
fn steal_half_max(&self, max: u16) -> Option<StealerIterator<'_, T>> {
let mut prev_packed = self.0.head.load(Acquire);
let mut next_packed;

Expand All @@ -561,7 +561,7 @@ impl<T> Steal<T> {
// Number of available tasks to steal
let n = src_tail.wrapping_sub(src_head_real);
let n = n - n / 2;
let n = cmp::min(n, max);
let n = cmp::min(n, max as UnsignedShort);

if n == 0 {
// No tasks available to steal
Expand Down

0 comments on commit dd0fbc3

Please sign in to comment.