Skip to content

Commit

Permalink
Rework on the scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Nov 5, 2019
1 parent 44f744b commit 7e9a938
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 68 deletions.
35 changes: 0 additions & 35 deletions bastion-executor/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,6 @@ impl Pool {
unimplemented!()
}

pub fn fetch_proc(&self, affinity: usize, local: &Worker<LightProc>) -> Option<LightProc> {
if let Ok(mut stats) = load_balancer::stats().try_write() {
stats
.smp_queues
.insert(affinity, local.worker_run_queue_size());
}

if let Ok(stats) = load_balancer::stats().try_read() {
if local.worker_run_queue_size() == 0 {
while let Some(proc) = self.injector.steal_batch_and_pop(local).success() {
return Some(proc);
}
} else {
let affine_core = *stats
.smp_queues
.iter()
.max_by_key(|&(_core, stat)| stat)
.unwrap()
.0;
let stealer = self.stealers.get(affine_core).unwrap();
if let Some(amount) = stealer.run_queue_size().checked_sub(stats.mean_level) {
if let Some(proc) = stealer
.steal_batch_and_pop_with_amount(local, amount.wrapping_add(1))
.success()
{
return Some(proc);
}
}
}
}

// Pop only from the local queue with full trust
local.pop()
}

pub fn spawn<F, T>(&self, future: F, stack: ProcStack) -> RecoverableHandle<T>
where
F: Future<Output = T> + Send + 'static,
Expand Down
4 changes: 0 additions & 4 deletions bastion-executor/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ where
}
};

// Log this `block_on` operation.
let _child_id = stack.get_pid();
let _parent_id = worker::get_proc_stack(|t| t.get_pid()).unwrap_or(0);

// Pin the future onto the stack.
pin_utils::pin_mut!(future);

Expand Down
23 changes: 16 additions & 7 deletions bastion-executor/src/run_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,11 @@ impl<T> Worker<T> {
let cap = self.buffer.get().cap;

// Is there enough capacity to push `reserve_cap` tasks?
if cap - len < reserve_cap {
if cap.saturating_sub(len) < reserve_cap {
// Keep doubling the capacity as much as is needed.
let mut new_cap = cap * 2;
while new_cap - len < reserve_cap {
new_cap *= 2;
while new_cap.saturating_sub(len) < reserve_cap {
new_cap = new_cap.wrapping_mul(2);
}

// Resize the buffer.
Expand Down Expand Up @@ -908,6 +908,9 @@ impl<T> Stealer<T> {

let guard = &epoch::pin();

// Shadow the requested amount;
let mut amount = amount;

// Load the back index.
let b = self.inner.back.load(Ordering::Acquire);

Expand All @@ -917,6 +920,11 @@ impl<T> Stealer<T> {
return Steal::Empty;
}

let default_time_ahead = ((len as usize - 1) / 2);
if amount > default_time_ahead as usize {
amount = default_time_ahead;
}

// Reserve capacity for the stolen batch.
let batch_size = cmp::min(amount, MAX_BATCH - 1);
dest.reserve(batch_size);
Expand Down Expand Up @@ -1748,8 +1756,8 @@ impl<T> Steal<T> {
/// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
/// * If both resulted in `None`, then `None` is returned.
pub fn or_else<F>(self, f: F) -> Steal<T>
where
F: FnOnce() -> Steal<T>,
where
F: FnOnce() -> Steal<T>,
{
match self {
Steal::Empty => f(),
Expand Down Expand Up @@ -1781,8 +1789,8 @@ impl<T> FromIterator<Steal<T>> for Steal<T> {
/// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
/// Otherwise, `Empty` is returned.
fn from_iter<I>(iter: I) -> Steal<T>
where
I: IntoIterator<Item = Steal<T>>,
where
I: IntoIterator<Item = Steal<T>>,
{
let mut retry = false;
for s in iter {
Expand All @@ -1800,3 +1808,4 @@ impl<T> FromIterator<Steal<T>> for Steal<T> {
}
}
}

125 changes: 104 additions & 21 deletions bastion-executor/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use std::cell::Cell;
use std::cell::{UnsafeCell, Cell};
use std::ptr;

use super::pool;
use super::run_queue::Worker;
use lightproc::prelude::*;
use core::iter;
use crate::load_balancer;
use std::iter::repeat_with;
use crate::pool::Pool;
use std::sync::atomic::Ordering;
use std::sync::atomic;

pub fn current() -> ProcStack {
get_proc_stack(|proc| proc.clone())
Expand Down Expand Up @@ -47,40 +53,117 @@ where
}

thread_local! {
static IS_WORKER: Cell<bool> = Cell::new(false);
static QUEUE: Cell<Option<Worker<LightProc>>> = Cell::new(None);
static QUEUE: UnsafeCell<Option<Worker<LightProc>>> = UnsafeCell::new(None);
}

pub(crate) fn is_worker() -> bool {
IS_WORKER.with(|is_worker| is_worker.get())
pub(crate) fn schedule(proc: LightProc) {
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref() };

match local {
None => pool::get().injector.push(proc),
Some(q) => q.push(proc),
}
});

pool::get().sleepers.notify_one();
}

fn get_queue<F: FnOnce(&Worker<LightProc>) -> T, T>(f: F) -> T {
pub fn fetch_proc(affinity: usize) -> Option<LightProc> {
let pool = pool::get();

QUEUE.with(|queue| {
let q = queue.take().unwrap();
let ret = f(&q);
queue.set(Some(q));
ret
let local = unsafe { (*queue.get()).as_ref().unwrap() };

stats_generator(affinity, local);

// Pop only from the local queue with full trust
local.pop().or_else(|| {
match local.worker_run_queue_size() {
x if x == 0 => {
if pool.injector.is_empty() {
affine_steal(pool, local)
} else {
pool.injector.steal_batch_and_pop(local).success()
}
},
_ => {
affine_steal(pool, local)
}
}
})
})
}

pub(crate) fn schedule(proc: LightProc) {
if is_worker() {
get_queue(|q| q.push(proc));
} else {
pool::get().injector.push(proc);
fn affine_steal(pool: &Pool, local: &Worker<LightProc>) -> Option<LightProc> {
match load_balancer::stats().try_read() {
Ok(stats) => {
let affine_core = *stats
.smp_queues
.iter()
.max_by_key(|&(_core, stat)| stat)
.unwrap()
.0;

let default = || {
pool.injector.steal_batch_and_pop(local).success()
};

pool.stealers.get(affine_core)
.map_or_else(default, |stealer| {
stealer.run_queue_size().checked_sub(stats.mean_level)
.map_or_else(default, |amount| {
amount.checked_sub(1)
.map_or_else(default, |possible| {
if possible != 0 {
stealer.steal_batch_and_pop_with_amount(local, possible).success()
} else {
default()
}
})
})
})

/////
// let stealer = pool.stealers.get(affine_core).unwrap();
//
// if let Some(amount) = stealer.run_queue_size().checked_sub(stats.mean_level) {
// if let Some(possible) = amount.checked_sub(1) {
// stealer.steal_batch_and_pop_with_amount(local, possible).success()
// } else {
// pool.injector.steal_batch_and_pop(local).success()
// }
// } else {
// pool.injector.steal_batch_and_pop(local).success()
// }
},
Err(_) => {
pool.injector.steal_batch_and_pop(local).success()
}
}
}

fn stats_generator(affinity: usize, local: &Worker<LightProc>) {
if let Ok(mut stats) = load_balancer::stats().try_write() {
stats
.smp_queues
.insert(affinity, local.worker_run_queue_size());
}
pool::get().sleepers.notify_one();
}

pub(crate) fn main_loop(affinity: usize, worker: Worker<LightProc>) {
IS_WORKER.with(|is_worker| is_worker.set(true));
QUEUE.with(|queue| queue.set(Some(worker)));
pub(crate) fn main_loop(affinity: usize, local: Worker<LightProc>) {
QUEUE.with(|queue| unsafe { *queue.get() = Some(local) });

loop {
match get_queue(|q| pool::get().fetch_proc(affinity, q)) {
match fetch_proc(affinity) {
Some(proc) => set_stack(proc.stack(), || proc.run()),
None => pool::get().sleepers.wait(),
None => {
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref().unwrap() };
stats_generator(affinity, local);
});
pool::get().sleepers.wait()
},
}
}
}
2 changes: 1 addition & 1 deletion lightproc/src/raw_proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ where
let raw = Self::from_ptr(ptr);

// Decrement the reference count.
let new = (*raw.pdata).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
let new = (*raw.pdata).state.fetch_sub(REFERENCE, Ordering::AcqRel).saturating_sub(REFERENCE);

// If this was the last reference to the proc and the `ProcHandle` has been dropped as
// well, then destroy the proc.
Expand Down

0 comments on commit 7e9a938

Please sign in to comment.