Skip to content

Commit

Permalink
refactor: add preempt guard for AxRunQueue through AxRunQueueRef
Browse files Browse the repository at this point in the history
  • Loading branch information
hky1999 committed Sep 21, 2024
1 parent db02f00 commit 7ad8947
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 43 deletions.
14 changes: 8 additions & 6 deletions modules/axtask/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use alloc::{string::String, sync::Arc};

use kernel_guard::{NoOp, NoPreemptIrqSave};

pub(crate) use crate::run_queue::{current_run_queue, select_run_queue};

#[doc(cfg(feature = "multitask"))]
Expand Down Expand Up @@ -88,14 +90,14 @@ pub fn init_scheduler_secondary() {
#[doc(cfg(feature = "irq"))]
pub fn on_timer_tick() {
crate::timers::check_events();
current_run_queue().scheduler_timer_tick();
current_run_queue::<NoOp>().scheduler_timer_tick();
}

/// Adds the given task to the run queue, returns the task reference.
pub fn spawn_task(task: TaskInner) -> AxTaskRef {
let task_ref = task.into_arc();
let _kernel_guard = kernel_guard::NoPreemptIrqSave::new();
crate::select_run_queue(
crate::select_run_queue::<NoPreemptIrqSave>(
#[cfg(feature = "smp")]
task_ref.clone(),
)
Expand Down Expand Up @@ -136,13 +138,13 @@ where
///
/// [CFS]: https://en.wikipedia.org/wiki/Completely_Fair_Scheduler
pub fn set_priority(prio: isize) -> bool {
current_run_queue().set_current_priority(prio)
current_run_queue::<NoPreemptIrqSave>().set_current_priority(prio)
}

/// Current task gives up the CPU time voluntarily, and switches to another
/// ready task.
pub fn yield_now() {
current_run_queue().yield_current()
current_run_queue::<NoPreemptIrqSave>().yield_current()
}

/// Current task is going to sleep for the given duration.
Expand All @@ -157,14 +159,14 @@ pub fn sleep(dur: core::time::Duration) {
/// If the feature `irq` is not enabled, it uses busy-wait instead.
pub fn sleep_until(deadline: axhal::time::TimeValue) {
#[cfg(feature = "irq")]
current_run_queue().sleep_until(deadline);
current_run_queue::<NoPreemptIrqSave>().sleep_until(deadline);
#[cfg(not(feature = "irq"))]
axhal::time::busy_wait_until(deadline);
}

/// Exits the current task.
pub fn exit(exit_code: i32) -> ! {
current_run_queue().exit_current(exit_code)
current_run_queue::<NoPreemptIrqSave>().exit_current(exit_code)
}

/// The idle task routine.
Expand Down
67 changes: 45 additions & 22 deletions modules/axtask/src/run_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use alloc::sync::Arc;
use core::mem::MaybeUninit;

use bitmaps::Bitmap;
use kernel_guard::BaseGuard;
use lazyinit::LazyInit;
use scheduler::BaseScheduler;

Expand Down Expand Up @@ -39,6 +40,7 @@ percpu_static! {
static mut RUN_QUEUES: [MaybeUninit<&'static mut AxRunQueue>; axconfig::SMP] =
[ARRAY_REPEAT_VALUE; axconfig::SMP];
const ARRAY_REPEAT_VALUE: MaybeUninit<&'static mut AxRunQueue> = MaybeUninit::uninit();

/// Returns a reference to the current run queue.
///
/// ## Safety
Expand All @@ -51,9 +53,14 @@ const ARRAY_REPEAT_VALUE: MaybeUninit<&'static mut AxRunQueue> = MaybeUninit::un
/// ## Returns
///
/// A static reference to the current run queue.
#[inline]
pub(crate) fn current_run_queue() -> &'static mut AxRunQueue {
unsafe { RUN_QUEUE.current_ref_mut_raw() }
// #[inline(always)]
pub(crate) fn current_run_queue<G: BaseGuard>() -> AxRunQueueRef<'static, G> {
let irq_state = G::acquire();
AxRunQueueRef {
inner: unsafe { RUN_QUEUE.current_ref_mut_raw() },
state: irq_state,
_phantom: core::marker::PhantomData,
}
}

/// Selects the run queue index based on a CPU set bitmap, minimizing the number of tasks.
Expand Down Expand Up @@ -129,17 +136,24 @@ fn get_run_queue(index: usize) -> &'static mut AxRunQueue {
/// 2. Use a more generic load balancing algorithm that can be customized or replaced.
///
#[inline]
pub(crate) fn select_run_queue(#[cfg(feature = "smp")] task: AxTaskRef) -> &'static mut AxRunQueue {
pub(crate) fn select_run_queue<G: BaseGuard>(
#[cfg(feature = "smp")] task: AxTaskRef,
) -> AxRunQueueRef<'static, G> {
#[cfg(not(feature = "smp"))]
{
// When SMP is disabled, all tasks are scheduled on the same global run queue.
current_run_queue()
}
#[cfg(feature = "smp")]
{
let irq_state = G::acquire();
// When SMP is enabled, select the run queue based on the task's CPU affinity and load balance.
let index = select_run_queue_index(task.cpu_set());
get_run_queue(index)
AxRunQueueRef {
inner: get_run_queue(index),
state: irq_state,
_phantom: core::marker::PhantomData,
}
}
}

Expand All @@ -151,6 +165,18 @@ pub(crate) struct AxRunQueue {
scheduler: Scheduler,
}

pub(crate) struct AxRunQueueRef<'a, G: BaseGuard> {
inner: &'a mut AxRunQueue,
state: G::State,
_phantom: core::marker::PhantomData<G>,
}

impl<'a, G: BaseGuard> Drop for AxRunQueueRef<'a, G> {
fn drop(&mut self) {
G::release(self.state);
}
}

impl AxRunQueue {
pub fn new(cpu_id: usize) -> Self {
let gc_task = TaskInner::new(
Expand Down Expand Up @@ -182,17 +208,17 @@ impl AxRunQueue {
}

/// Core functions of run queue.
impl AxRunQueue {
impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
pub fn add_task(&mut self, task: AxTaskRef) {
debug!("Add {} on run_queue {}", task.id_name(), self.cpu_id);
debug!("Add {} on run_queue {}", task.id_name(), self.inner.cpu_id);
assert!(task.is_ready());
self.scheduler.add_task(task);
self.inner.scheduler.add_task(task);
}

#[cfg(feature = "irq")]
pub fn scheduler_timer_tick(&mut self) {
let curr = crate::current();
if !curr.is_idle() && self.scheduler.task_tick(curr.as_task_ref()) {
if !curr.is_idle() && self.inner.scheduler.task_tick(curr.as_task_ref()) {
#[cfg(feature = "preempt")]
curr.set_preempt_pending(true);
}
Expand All @@ -203,11 +229,12 @@ impl AxRunQueue {
let curr = crate::current();
trace!("task yield: {}", curr.id_name());
assert!(curr.is_running());
self.resched(false);
self.inner.resched(false);
}

pub fn set_current_priority(&mut self, prio: isize) -> bool {
self.scheduler
self.inner
.scheduler
.set_priority(crate::current().as_task_ref(), prio)
}

Expand All @@ -230,15 +257,13 @@ impl AxRunQueue {
can_preempt
);
if can_preempt {
self.resched(true);
self.inner.resched(true);
} else {
curr.set_preempt_pending(true);
}
}

pub fn exit_current(&mut self, exit_code: i32) -> ! {
let _kernel_guard = kernel_guard::NoPreemptIrqSave::new();

let curr = crate::current();
debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code);
assert!(curr.is_running(), "task is not running: {:?}", curr.state());
Expand All @@ -257,9 +282,8 @@ impl AxRunQueue {
// Wake up the GC task to drop the exited tasks.
WAIT_FOR_EXIT.with_current(|wq| wq.notify_one(false));
// Schedule to next task.
self.resched(false);
self.inner.resched(false);
}
drop(_kernel_guard);
unreachable!("task exited!");
}

Expand All @@ -272,17 +296,17 @@ impl AxRunQueue {
);

debug!("task block: {}", curr.id_name());
self.resched(false);
self.inner.resched(false);
}

/// Unblock one task by inserting it into the run queue.
/// If task state is `BLOCKING`, it will enter a loop until the task is in `BLOCKED` state.
pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) {
task.clone().unblock_locked(|| {
let cpu_id = self.cpu_id;
let cpu_id = self.inner.cpu_id;
debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id);
task.set_state(TaskState::Ready);
self.scheduler.add_task(task.clone()); // TODO: priority
self.inner.scheduler.add_task(task.clone()); // TODO: priority

// Note: when the task is unblocked on another CPU's run queue,
// we just ingiore the `resched` flag.
Expand All @@ -295,7 +319,7 @@ impl AxRunQueue {

#[cfg(feature = "irq")]
pub fn sleep_until(&mut self, deadline: axhal::time::TimeValue) {
let kernel_guard = kernel_guard::NoPreemptIrqSave::new();
let _kernel_guard = kernel_guard::NoPreemptIrqSave::new();
let curr = crate::current();
debug!("task sleep: {}, deadline={:?}", curr.id_name(), deadline);
assert!(curr.is_running());
Expand All @@ -305,9 +329,8 @@ impl AxRunQueue {
if now < deadline {
crate::timers::set_alarm_wakeup(deadline, curr.clone());
curr.set_state(TaskState::Blocking);
self.resched(false);
self.inner.resched(false);
}
drop(kernel_guard)
}
}

Expand Down
2 changes: 1 addition & 1 deletion modules/axtask/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl TaskInner {
if curr.need_resched.load(Ordering::Acquire) && curr.can_preempt(0) {
let _kernel_guard = kernel_guard::NoPreemptIrqSave::new();
if curr.need_resched.load(Ordering::Acquire) {
crate::current_run_queue().preempt_resched()
crate::current_run_queue::<kernel_guard::NoOp>().preempt_resched()
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion modules/axtask/src/timers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use alloc::sync::Arc;

use kernel_guard::NoOp;
use lazyinit::LazyInit;
use timer_list::{TimeValue, TimerEvent, TimerList};

Expand All @@ -16,7 +17,7 @@ struct TaskWakeupEvent(AxTaskRef);
impl TimerEvent for TaskWakeupEvent {
fn callback(self, _now: TimeValue) {
self.0.set_in_timer_list(false);
select_run_queue(
select_run_queue::<NoOp>(
#[cfg(feature = "smp")]
self.0.clone(),
)
Expand Down
24 changes: 11 additions & 13 deletions modules/axtask/src/wait_queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use alloc::collections::VecDeque;
use alloc::sync::Arc;

use kernel_guard::{NoOp, NoPreemptIrqSave};
use kspin::SpinNoIrq;

use crate::{current_run_queue, select_run_queue, task::TaskState, AxTaskRef, CurrentTask};
Expand Down Expand Up @@ -92,11 +94,10 @@ impl WaitQueue {
/// Blocks the current task and put it into the wait queue, until other task
/// notifies it.
pub fn wait(&self) {
let kernel_guard = kernel_guard::NoPreemptIrqSave::new();
let _kernel_guard = NoPreemptIrqSave::new();
self.push_to_wait_queue();
current_run_queue().blocked_resched();
current_run_queue::<NoOp>().blocked_resched();
self.cancel_events(crate::current());
drop(kernel_guard);
}

/// Blocks the current task and put it into the wait queue, until the given
Expand All @@ -108,7 +109,7 @@ impl WaitQueue {
where
F: Fn() -> bool,
{
let kernel_guard = kernel_guard::NoPreemptIrqSave::new();
let _kernel_guard = NoPreemptIrqSave::new();
loop {
let mut wq = self.queue.lock();
if condition() {
Expand All @@ -131,17 +132,16 @@ impl WaitQueue {
curr.set_in_wait_queue(true);
drop(wq);

current_run_queue().blocked_resched();
current_run_queue::<NoOp>().blocked_resched();
}
self.cancel_events(crate::current());
drop(kernel_guard);
}

/// Blocks the current task and put it into the wait queue, until other tasks
/// notify it, or the given duration has elapsed.
#[cfg(feature = "irq")]
pub fn wait_timeout(&self, dur: core::time::Duration) -> bool {
let kernel_guard = kernel_guard::NoPreemptIrqSave::new();
let _kernel_guard = NoPreemptIrqSave::new();
let curr = crate::current();
let deadline = axhal::time::wall_time() + dur;
debug!(
Expand All @@ -152,11 +152,10 @@ impl WaitQueue {
crate::timers::set_alarm_wakeup(deadline, curr.clone());

self.push_to_wait_queue();
current_run_queue().blocked_resched();
current_run_queue::<NoOp>().blocked_resched();

let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out
self.cancel_events(curr);
drop(kernel_guard);
timeout
}

Expand All @@ -170,7 +169,7 @@ impl WaitQueue {
where
F: Fn() -> bool,
{
let kernel_guard = kernel_guard::NoPreemptIrqSave::new();
let _kernel_guard = NoPreemptIrqSave::new();
let curr = crate::current();
let deadline = axhal::time::wall_time() + dur;
debug!(
Expand Down Expand Up @@ -201,10 +200,9 @@ impl WaitQueue {
curr.set_in_wait_queue(true);
drop(wq);

current_run_queue().blocked_resched()
current_run_queue::<NoOp>().blocked_resched()
}
self.cancel_events(curr);
drop(kernel_guard);
timeout
}

Expand Down Expand Up @@ -268,7 +266,7 @@ impl WaitQueue {

pub(crate) fn unblock_one_task(task: AxTaskRef, resched: bool) {
// Select run queue by the CPU set of the task.
select_run_queue(
select_run_queue::<NoPreemptIrqSave>(
#[cfg(feature = "smp")]
task.clone(),
)
Expand Down

0 comments on commit 7ad8947

Please sign in to comment.