Skip to content

Commit

Permalink
Merge pull request #1 from nurmohammed840/dev
Browse files Browse the repository at this point in the history
Scheduler: find the least loaded worker in batch.
  • Loading branch information
nurmohammed840 authored Dec 10, 2024
2 parents 44a6fd9 + a516185 commit 66d06ad
Showing 1 changed file with 34 additions and 4 deletions.
38 changes: 34 additions & 4 deletions src/scheduler/least_loaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,42 @@ use crossbeam_channel::{unbounded as channel, Receiver, Sender};
use std::{
collections::VecDeque,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicU8, AtomicUsize, Ordering},
Arc,
},
};

pub struct Scheduler {
workers: Arc<[Worker]>,
workers: Arc<Batch<Worker>>,
}

struct Batch<T> {
next_batch: AtomicU8,
workers: Box<[T]>,
}

const BATCH_SIZE: usize = 8;

impl<T> Batch<T> {
fn new(workers: Box<[T]>) -> Self {
Self {
next_batch: AtomicU8::new(0),
workers,
}
}

#[inline]
fn next_batch(&self) -> &[T] {
let len = self.workers.len();
if len <= BATCH_SIZE {
&self.workers
} else {
let no = self.next_batch.fetch_add(1, Ordering::Relaxed) as usize;
let next = (no * BATCH_SIZE) % len.next_multiple_of(BATCH_SIZE);
let end = (next + BATCH_SIZE).min(len);
unsafe { self.workers.get_unchecked(next..end) }
}
}
}

struct Worker {
Expand Down Expand Up @@ -63,7 +92,7 @@ impl Scheduler {

let init_capacity = 256;
let mut queues = vec![];
let workers: Vec<_> = (0..worker_count)
let workers: Box<_> = (0..worker_count)
.map(|_| {
let len = Length::default();
let (tx, rx) = channel();
Expand All @@ -80,7 +109,7 @@ impl Scheduler {
.collect();

let scheduler = Self {
workers: workers.into(),
workers: Batch::new(workers).into(),
};
(scheduler, queues)
}
Expand All @@ -89,6 +118,7 @@ impl Scheduler {
fn least_loaded_worker(&self) -> &Worker {
unsafe {
self.workers
.next_batch()
.iter()
.min_by_key(|a| a.len.get())
.unwrap_unchecked()
Expand Down

0 comments on commit 66d06ad

Please sign in to comment.