Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify executor by merging task_queue and wake_queue #804

Merged
merged 2 commits into from
May 19, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 39 additions & 49 deletions src/task/executor.rs
Original file line number Diff line number Diff line change
@@ -1,103 +1,93 @@
use super::{Task, TaskId};
use alloc::{
collections::{BTreeMap, VecDeque},
sync::Arc,
task::Wake,
};
use alloc::{collections::BTreeMap, sync::Arc, task::Wake};
use core::task::{Context, Poll, Waker};
use crossbeam_queue::ArrayQueue;

pub struct Executor {
task_queue: VecDeque<Task>,
waiting_tasks: BTreeMap<TaskId, Task>,
wake_queue: Arc<ArrayQueue<TaskId>>,
tasks: BTreeMap<TaskId, Task>,
task_queue: Arc<ArrayQueue<TaskId>>,
waker_cache: BTreeMap<TaskId, Waker>,
}

impl Executor {
pub fn new() -> Self {
Executor {
task_queue: VecDeque::new(),
waiting_tasks: BTreeMap::new(),
wake_queue: Arc::new(ArrayQueue::new(100)),
tasks: BTreeMap::new(),
task_queue: Arc::new(ArrayQueue::new(100)),
waker_cache: BTreeMap::new(),
}
}

pub fn spawn(&mut self, task: Task) {
self.task_queue.push_back(task)
let task_id = task.id;
if self.tasks.insert(task.id, task).is_some() {
panic!("task with same ID already in tasks");
}
self.task_queue.push(task_id).expect("queue full");
}

pub fn run(&mut self) -> ! {
loop {
self.wake_tasks();
self.run_ready_tasks();
self.sleep_if_idle();
}
}

fn run_ready_tasks(&mut self) {
while let Some(mut task) = self.task_queue.pop_front() {
let task_id = task.id;
if !self.waker_cache.contains_key(&task_id) {
self.waker_cache.insert(task_id, self.create_waker(task_id));
}
let waker = self.waker_cache.get(&task_id).expect("should exist");
// destructure `self` to avoid borrow checker errors
let Self {
tasks,
task_queue,
waker_cache,
} = self;

while let Ok(task_id) = task_queue.pop() {
let task = match tasks.get_mut(&task_id) {
Some(task) => task,
None => continue, // task no longer exists
};
let waker = waker_cache
.entry(task_id)
.or_insert_with(|| TaskWaker::new(task_id, task_queue.clone()));
let mut context = Context::from_waker(waker);
match task.poll(&mut context) {
Poll::Ready(()) => {
// task done -> remove cached waker
self.waker_cache.remove(&task_id);
// task done -> remove it and its cached waker
tasks.remove(&task_id);
waker_cache.remove(&task_id);
}
Poll::Pending => {
if self.waiting_tasks.insert(task_id, task).is_some() {
panic!("task with same ID already in waiting_tasks");
}
}
}
}
}

fn wake_tasks(&mut self) {
while let Ok(task_id) = self.wake_queue.pop() {
if let Some(task) = self.waiting_tasks.remove(&task_id) {
self.task_queue.push_back(task);
Poll::Pending => {}
}
}
}

fn sleep_if_idle(&self) {
use x86_64::instructions::interrupts::{self, enable_interrupts_and_hlt};

// fast path
if !self.wake_queue.is_empty() {
return;
}

interrupts::disable();
if self.wake_queue.is_empty() {
if self.task_queue.is_empty() {
enable_interrupts_and_hlt();
} else {
interrupts::enable();
}
}

fn create_waker(&self, task_id: TaskId) -> Waker {
Waker::from(Arc::new(TaskWaker {
task_id,
wake_queue: self.wake_queue.clone(),
}))
}
}

struct TaskWaker {
task_id: TaskId,
wake_queue: Arc<ArrayQueue<TaskId>>,
task_queue: Arc<ArrayQueue<TaskId>>,
}

impl TaskWaker {
fn new(task_id: TaskId, task_queue: Arc<ArrayQueue<TaskId>>) -> Waker {
Waker::from(Arc::new(TaskWaker {
task_id,
task_queue,
}))
}

fn wake_task(&self) {
self.wake_queue.push(self.task_id).expect("wake_queue full");
self.task_queue.push(self.task_id).expect("task_queue full");
}
}

Expand Down