From ee0c11d3164a4b87ecf1f9da5342713611bcbb86 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 18 May 2020 12:07:45 +0200 Subject: [PATCH 1/2] Simplify executor by merging task_queue and wake_queue --- src/task/executor.rs | 57 +++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 35 deletions(-) diff --git a/src/task/executor.rs b/src/task/executor.rs index 55410cebd..7587f2548 100644 --- a/src/task/executor.rs +++ b/src/task/executor.rs @@ -1,44 +1,48 @@ use super::{Task, TaskId}; -use alloc::{ - collections::{BTreeMap, VecDeque}, - sync::Arc, - task::Wake, -}; +use alloc::{collections::BTreeMap, sync::Arc, task::Wake}; use core::task::{Context, Poll, Waker}; use crossbeam_queue::ArrayQueue; pub struct Executor { - task_queue: VecDeque, + task_queue: Arc>, waiting_tasks: BTreeMap, - wake_queue: Arc>, waker_cache: BTreeMap, } impl Executor { pub fn new() -> Self { Executor { - task_queue: VecDeque::new(), + task_queue: Arc::new(ArrayQueue::new(100)), waiting_tasks: BTreeMap::new(), - wake_queue: Arc::new(ArrayQueue::new(100)), waker_cache: BTreeMap::new(), } } pub fn spawn(&mut self, task: Task) { - self.task_queue.push_back(task) + let task_id = task.id; + self.add_waiting(task); + self.task_queue.push(task_id).expect("task_queue full"); } pub fn run(&mut self) -> ! { loop { - self.wake_tasks(); self.run_ready_tasks(); self.sleep_if_idle(); } } + fn add_waiting(&mut self, task: Task) { + if self.waiting_tasks.insert(task.id, task).is_some() { + panic!("task with same ID already in waiting_tasks"); + } + } + fn run_ready_tasks(&mut self) { - while let Some(mut task) = self.task_queue.pop_front() { - let task_id = task.id; + while let Ok(task_id) = self.task_queue.pop() { + let mut task = match self.waiting_tasks.remove(&task_id) { + Some(task) => task, + None => continue, + }; if !self.waker_cache.contains_key(&task_id) { self.waker_cache.insert(task_id, self.create_waker(task_id)); } @@ -49,19 +53,7 @@ impl Executor { // task done -> remove cached waker self.waker_cache.remove(&task_id); } - Poll::Pending => { - if self.waiting_tasks.insert(task_id, task).is_some() { - panic!("task with same ID already in waiting_tasks"); - } - } - } - } - } - - fn wake_tasks(&mut self) { - while let Ok(task_id) = self.wake_queue.pop() { - if let Some(task) = self.waiting_tasks.remove(&task_id) { - self.task_queue.push_back(task); + Poll::Pending => self.add_waiting(task), } } } @@ -69,13 +61,8 @@ impl Executor { fn sleep_if_idle(&self) { use x86_64::instructions::interrupts::{self, enable_interrupts_and_hlt}; - // fast path - if !self.wake_queue.is_empty() { - return; - } - interrupts::disable(); - if self.wake_queue.is_empty() { + if self.task_queue.is_empty() { enable_interrupts_and_hlt(); } else { interrupts::enable(); @@ -85,19 +72,19 @@ impl Executor { fn create_waker(&self, task_id: TaskId) -> Waker { Waker::from(Arc::new(TaskWaker { task_id, - wake_queue: self.wake_queue.clone(), + task_queue: self.task_queue.clone(), })) } } struct TaskWaker { task_id: TaskId, - wake_queue: Arc>, + task_queue: Arc>, } impl TaskWaker { fn wake_task(&self) { - self.wake_queue.push(self.task_id).expect("wake_queue full"); + self.task_queue.push(self.task_id).expect("task_queue full"); } } From 9887c1257d274e434ed9f7c094b7b1f1908cba0b Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 19 May 2020 13:30:35 +0200 Subject: [PATCH 2/2] Keep task in map instead of repeatedly removing it Also: Use entry API on waker_cache map. --- src/task/executor.rs | 57 +++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/src/task/executor.rs b/src/task/executor.rs index 7587f2548..3c29dea0b 100644 --- a/src/task/executor.rs +++ b/src/task/executor.rs @@ -4,24 +4,26 @@ use core::task::{Context, Poll, Waker}; use crossbeam_queue::ArrayQueue; pub struct Executor { + tasks: BTreeMap, task_queue: Arc>, - waiting_tasks: BTreeMap, waker_cache: BTreeMap, } impl Executor { pub fn new() -> Self { Executor { + tasks: BTreeMap::new(), task_queue: Arc::new(ArrayQueue::new(100)), - waiting_tasks: BTreeMap::new(), waker_cache: BTreeMap::new(), } } pub fn spawn(&mut self, task: Task) { let task_id = task.id; - self.add_waiting(task); - self.task_queue.push(task_id).expect("task_queue full"); + if self.tasks.insert(task.id, task).is_some() { + panic!("task with same ID already in tasks"); + } + self.task_queue.push(task_id).expect("queue full"); } pub fn run(&mut self) -> ! { @@ -31,29 +33,30 @@ impl Executor { } } - fn add_waiting(&mut self, task: Task) { - if self.waiting_tasks.insert(task.id, task).is_some() { - panic!("task with same ID already in waiting_tasks"); - } - } - fn run_ready_tasks(&mut self) { - while let Ok(task_id) = self.task_queue.pop() { - let mut task = match self.waiting_tasks.remove(&task_id) { + // destructure `self` to avoid borrow checker errors + let Self { + tasks, + task_queue, + waker_cache, + } = self; + + while let Ok(task_id) = task_queue.pop() { + let task = match tasks.get_mut(&task_id) { Some(task) => task, - None => continue, + None => continue, // task no longer exists }; - if !self.waker_cache.contains_key(&task_id) { - self.waker_cache.insert(task_id, self.create_waker(task_id)); - } - let waker = self.waker_cache.get(&task_id).expect("should exist"); + let waker = waker_cache + .entry(task_id) + .or_insert_with(|| TaskWaker::new(task_id, task_queue.clone())); let mut context = Context::from_waker(waker); match task.poll(&mut context) { Poll::Ready(()) => { - // task done -> remove cached waker - self.waker_cache.remove(&task_id); + // task done -> remove it and its cached waker + tasks.remove(&task_id); + waker_cache.remove(&task_id); } - Poll::Pending => self.add_waiting(task), + Poll::Pending => {} } } } @@ -68,13 +71,6 @@ impl Executor { interrupts::enable(); } } - - fn create_waker(&self, task_id: TaskId) -> Waker { - Waker::from(Arc::new(TaskWaker { - task_id, - task_queue: self.task_queue.clone(), - })) - } } struct TaskWaker { @@ -83,6 +79,13 @@ struct TaskWaker { } impl TaskWaker { + fn new(task_id: TaskId, task_queue: Arc>) -> Waker { + Waker::from(Arc::new(TaskWaker { + task_id, + task_queue, + })) + } + fn wake_task(&self) { self.task_queue.push(self.task_id).expect("task_queue full"); }