Skip to content

Commit

Permalink
Allow for task offloading to a new thread.
Browse files Browse the repository at this point in the history
Not *actually* implemented the instrumentation, yet.
  • Loading branch information
FelixMcFelix committed May 19, 2023
1 parent 1241a55 commit f6e0cca
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 15 deletions.
30 changes: 23 additions & 7 deletions src/driver/scheduler/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) struct Idle {
rx: Receiver<SchedulerMessage>,
tx: Sender<SchedulerMessage>,
next_id: TaskId,
next_worker_id: WorkerId,
workers: Vec<Worker>,
to_cull: Vec<TaskId>,
}
Expand All @@ -41,6 +42,7 @@ impl Idle {
rx,
tx: tx.clone(),
next_id: TaskId::new(),
next_worker_id: WorkerId::new(),
workers: Vec::with_capacity(16),
to_cull: vec![],
};
Expand Down Expand Up @@ -87,11 +89,17 @@ impl Idle {
let task = self.tasks.get_mut(&id).unwrap();

match task.handle_message(mix_msg) {
Ok(false) if now_live => self.schedule_mixer(id),
Ok(false) if now_live => {
let task = self.tasks.remove(&id).unwrap();
self.schedule_mixer(task, id, None);
},
Ok(false) => {},
Ok(true) | Err(_) => self.to_cull.push(id),
}
},
Ok(SchedulerMessage::Overspill(worker_id, id, task)) => {
self.schedule_mixer(task, id, Some(worker_id));
},
Ok(SchedulerMessage::GetStats(tx)) => {
_ = tx.send(self.workers.iter().map(Worker::stats).collect());
},
Expand Down Expand Up @@ -133,9 +141,8 @@ impl Idle {
}

/// Promote a task to a live mixer thread.
fn schedule_mixer(&mut self, id: TaskId) {
let mut task = self.tasks.remove(&id).unwrap();
let worker = self.fetch_worker(&task);
fn schedule_mixer(&mut self, mut task: ParkedMixer, id: TaskId, avoid: Option<WorkerId>) {
let worker = self.fetch_worker(&task, avoid);
if task.send_gateway_speaking().is_ok() {
// TODO: put this task on a valid worker, if this fails -- kill old worker.
worker
Expand All @@ -146,15 +153,18 @@ impl Idle {
}

/// Fetch the first `Worker` that has room for a new task, creating one if needed.
fn fetch_worker(&mut self, task: &ParkedMixer) -> &mut Worker {
///
/// If an inbound task has spilled from another thread, then do not reschedule it there.
fn fetch_worker(&mut self, task: &ParkedMixer, avoid: Option<WorkerId>) -> &mut Worker {
// look through all workers.
// if none found w/ space, add new.
let idx = self
.workers
.iter()
.position(|w| w.has_room(task))
.position(|w| w.can_schedule(task, avoid))
.unwrap_or_else(|| {
self.workers.push(Worker::new(
self.next_worker_id.incr(),
self.mode.clone(),
self.tx.clone(),
self.stats.clone(),
Expand Down Expand Up @@ -232,9 +242,15 @@ mod test {
core.cull_timer = TEST_TIMER;

let mut next_id = TaskId::new();
let mut thread_id = WorkerId::new();
let mut handles = vec![];
for i in 0..2 {
let mut worker = Worker::new(mode.clone(), tx.clone(), core.stats.clone());
let mut worker = Worker::new(
thread_id.incr(),
mode.clone(),
tx.clone(),
core.stats.clone(),
);
let ((mixer, listeners), track_handle) =
Mixer::test_with_float_unending(Handle::current(), false);

Expand Down
23 changes: 20 additions & 3 deletions src/driver/scheduler/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use super::*;
/// The send-half of a worker thread, with bookkeeping mechanisms to help
/// the idle task schedule incoming tasks.
pub struct Worker {
id: WorkerId,
stats: Arc<LiveStatBlock>,
mode: ScheduleMode,
tx: Sender<(TaskId, ParkedMixer)>,
Expand All @@ -26,17 +27,26 @@ pub struct Worker {
#[allow(missing_docs)]
impl Worker {
pub fn new(
id: WorkerId,
mode: ScheduleMode,
sched_tx: Sender<SchedulerMessage>,
global_stats: Arc<StatBlock>,
) -> Self {
let stats = Arc::new(LiveStatBlock::default());
let (live_tx, live_rx) = flume::unbounded();

let core = Live::new(mode.clone(), global_stats, stats.clone(), live_rx, sched_tx);
let core = Live::new(
id,
mode.clone(),
global_stats,
stats.clone(),
live_rx,
sched_tx,
);
core.spawn();

Self {
id,
stats,
mode,
tx: live_tx,
Expand Down Expand Up @@ -73,8 +83,8 @@ impl Worker {
/// Return whether this thread has enough room (task count, spare cycles)
/// for the given task.
#[inline]
pub fn has_room(&self, task: &ParkedMixer) -> bool {
self.stats.has_room(&self.mode, task)
pub fn can_schedule(&self, task: &ParkedMixer, avoid: Option<WorkerId>) -> bool {
avoid.map_or(true, |id| !self.has_id(id)) && self.stats.has_room(&self.mode, task)
}

#[inline]
Expand All @@ -89,6 +99,10 @@ impl Worker {
self.stats.add_mixer();
self.tx.send((id, task)).map_err(|_| ())
}

pub fn has_id(&self, id: WorkerId) -> bool {
self.id == id
}
}

const PACKETS_PER_BLOCK: usize = 16;
Expand All @@ -106,6 +120,7 @@ pub struct Live {
deadline: Instant,
start_of_work: Option<Instant>,

id: WorkerId,
mode: ScheduleMode,
stats: Arc<LiveStatBlock>,
global_stats: Arc<StatBlock>,
Expand All @@ -118,6 +133,7 @@ pub struct Live {
#[allow(missing_docs)]
impl Live {
pub fn new(
id: WorkerId,
mode: ScheduleMode,
global_stats: Arc<StatBlock>,
stats: Arc<LiveStatBlock>,
Expand All @@ -143,6 +159,7 @@ impl Live {
deadline: Instant::now(),
start_of_work: None,

id,
mode,
stats,
global_stats,
Expand Down
2 changes: 2 additions & 0 deletions src/driver/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ pub enum SchedulerMessage {
Do(TaskId, MixerMessage),
/// Return a `Mixer` from a worker back to the idle pool.
Demote(TaskId, ParkedMixer),
/// Move an expensive `Mixer` to another thread in the worker pool.
Overspill(WorkerId, TaskId, ParkedMixer),
/// Request a copy of all per-worker statistics.
GetStats(Sender<Vec<Arc<LiveStatBlock>>>),
/// Cleanup once all `Scheduler` handles are dropped.
Expand Down
21 changes: 16 additions & 5 deletions src/driver/scheduler/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,27 @@ use crate::{

use super::SchedulerMessage;

/// Typesafe counter used to identify individual mixer instances.
/// Typesafe counter used to identify individual mixer/worker instances.
#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct TaskId(u64);
pub struct ResId<T>(u64, std::marker::PhantomData<T>);
#[allow(missing_docs)]
pub type TaskId = ResId<TaskMarker>;
#[allow(missing_docs)]
pub type WorkerId = ResId<WorkerMarker>;

#[allow(missing_docs)]
#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct TaskMarker;
#[allow(missing_docs)]
#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct WorkerMarker;

impl IsEnabled for TaskId {}
impl<T> IsEnabled for ResId<T> {}

#[allow(missing_docs)]
impl TaskId {
impl<T: Copy> ResId<T> {
pub fn new() -> Self {
TaskId(0)
ResId(0, Default::default())
}

pub fn incr(&mut self) -> Self {
Expand Down
1 change: 1 addition & 0 deletions src/driver/test_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl MockScheduler {
let (sched_tx, sched_rx) = flume::unbounded();

let core = Live::new(
WorkerId::new(),
mode.unwrap_or_default(),
stats.clone(),
local.clone(),
Expand Down

0 comments on commit f6e0cca

Please sign in to comment.