Skip to content

Commit

Permalink
apply: support yield (tikv#6487)
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Lee <busyjaylee@gmail.com>
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay committed Feb 8, 2020
1 parent bbb744a commit 0c15d29
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 55 deletions.
65 changes: 48 additions & 17 deletions components/batch-system/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl_sched!(ControlScheduler, FsmTypes::Control, Fsm = C);
#[allow(clippy::vec_box)]
pub struct Batch<N, C> {
normals: Vec<Box<N>>,
counters: Vec<usize>,
control: Option<Box<C>>,
}

Expand All @@ -79,6 +80,7 @@ impl<N: Fsm, C: Fsm> Batch<N, C> {
pub fn with_capacity(cap: usize) -> Batch<N, C> {
Batch {
normals: Vec::with_capacity(cap),
counters: Vec::with_capacity(cap),
control: None,
}
}
Expand All @@ -89,7 +91,10 @@ impl<N: Fsm, C: Fsm> Batch<N, C> {

fn push(&mut self, fsm: FsmTypes<N, C>) -> bool {
match fsm {
FsmTypes::Normal(n) => self.normals.push(n),
FsmTypes::Normal(n) => {
self.normals.push(n);
self.counters.push(0);
}
FsmTypes::Control(c) => {
assert!(self.control.is_none());
self.control = Some(c);
Expand All @@ -110,6 +115,7 @@ impl<N: Fsm, C: Fsm> Batch<N, C> {

fn clear(&mut self) {
self.normals.clear();
self.counters.clear();
self.control.take();
}

Expand All @@ -118,21 +124,20 @@ impl<N: Fsm, C: Fsm> Batch<N, C> {
/// Only when channel length is larger than `checked_len` will trigger
/// further notification. This function may fail if channel length is
/// larger than the given value before FSM is released.
pub fn release(&mut self, index: usize, checked_len: usize) -> bool {
pub fn release(&mut self, index: usize, checked_len: usize) {
let mut fsm = self.normals.swap_remove(index);
let mailbox = fsm.take_mailbox().unwrap();
mailbox.release(fsm);
if mailbox.len() == checked_len {
true
self.counters.swap_remove(index);
} else {
match mailbox.take_fsm() {
None => true,
None => (),
Some(mut s) => {
s.set_mailbox(Cow::Owned(mailbox));
let last_index = self.normals.len();
self.normals.push(s);
self.normals.swap(index, last_index);
false
}
}
}
Expand All @@ -143,21 +148,27 @@ impl<N: Fsm, C: Fsm> Batch<N, C> {
/// This method should only be called when the FSM is stopped.
/// If there are still messages in channel, the FSM is untouched and
/// the function will return false to let caller to keep polling.
pub fn remove(&mut self, index: usize) -> bool {
pub fn remove(&mut self, index: usize) {
let mut fsm = self.normals.swap_remove(index);
let mailbox = fsm.take_mailbox().unwrap();
if mailbox.is_empty() {
mailbox.release(fsm);
true
self.counters.swap_remove(index);
} else {
fsm.set_mailbox(Cow::Owned(mailbox));
let last_index = self.normals.len();
self.normals.push(fsm);
self.normals.swap(index, last_index);
false
}
}

/// Schedule the normal FSM located at `index`.
pub fn reschedule(&mut self, router: &BatchRouter<N, C>, index: usize) {
let fsm = self.normals.swap_remove(index);
self.counters.swap_remove(index);
router.normal_scheduler.schedule(fsm);
}

/// Same as `release`, but working on control FSM.
pub fn release_control(&mut self, control_box: &BasicMailbox<C>, checked_len: usize) -> bool {
let s = self.control.take().unwrap();
Expand Down Expand Up @@ -233,6 +244,12 @@ struct Poller<N: Fsm, C: Fsm, Handler> {
max_batch_size: usize,
}

enum ReschedulePolicy {
Release(usize),
Remove,
Schedule,
}

impl<N: Fsm, C: Fsm, Handler: PollHandler<N, C>> Poller<N, C, Handler> {
fn fetch_batch(&mut self, batch: &mut Batch<N, C>, max_size: usize) {
let curr_batch_len = batch.len();
Expand Down Expand Up @@ -272,10 +289,11 @@ impl<N: Fsm, C: Fsm, Handler: PollHandler<N, C>> Poller<N, C, Handler> {
// Poll for readiness and forward to handler. Remove stale peer if necessary.
fn poll(&mut self) {
let mut batch = Batch::with_capacity(self.max_batch_size);
let mut exhausted_fsms = Vec::with_capacity(self.max_batch_size);
let mut reschedule_fsms = Vec::with_capacity(self.max_batch_size);

self.fetch_batch(&mut batch, self.max_batch_size);
while !batch.is_empty() {
let mut hot_fsm_count = 0;
self.handler.begin(batch.len());
if batch.control.is_some() {
let len = self.handler.handle_control(batch.control.as_mut().unwrap());
Expand All @@ -288,21 +306,34 @@ impl<N: Fsm, C: Fsm, Handler: PollHandler<N, C>> Poller<N, C, Handler> {
if !batch.normals.is_empty() {
for (i, p) in batch.normals.iter_mut().enumerate() {
let len = self.handler.handle_normal(p);
batch.counters[i] += 1;
if p.is_stopped() {
exhausted_fsms.push((i, None));
} else if len.is_some() {
exhausted_fsms.push((i, len));
reschedule_fsms.push((i, ReschedulePolicy::Remove));
} else {
if batch.counters[i] > 3 {
hot_fsm_count += 1;
// We should only reschedule a half of the hot regions, otherwise,
// it's possible all the hot regions are fetched in a batch the
// next time.
if hot_fsm_count % 2 == 0 {
reschedule_fsms.push((i, ReschedulePolicy::Schedule));
continue;
}
}
if let Some(l) = len {
reschedule_fsms.push((i, ReschedulePolicy::Release(l)));
}
}
}
}
self.handler.end(batch.normals_mut());
// Because release use `swap_remove` internally, so using pop here
// to remove the correct FSM.
while let Some((r, mark)) = exhausted_fsms.pop() {
if let Some(m) = mark {
batch.release(r, m);
} else {
batch.remove(r);
while let Some((r, mark)) = reschedule_fsms.pop() {
match mark {
ReschedulePolicy::Release(l) => batch.release(r, l),
ReschedulePolicy::Remove => batch.remove(r),
ReschedulePolicy::Schedule => batch.reschedule(&self.router, r),
}
}
// Fetch batch after every round is finished. It's helpful to protect regions
Expand Down
2 changes: 1 addition & 1 deletion components/batch-system/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct Router<N: Fsm, C: Fsm, Ns, Cs> {
// TODO: These two schedulers should be unified as single one. However
// it's not possible to write FsmScheduler<Fsm=C> + FsmScheduler<Fsm=N>
// for now.
normal_scheduler: Ns,
pub(crate) normal_scheduler: Ns,
control_scheduler: Cs,
}

Expand Down
Loading

0 comments on commit 0c15d29

Please sign in to comment.