Skip to content

Commit

Permalink
refactor: refactor auto compactor implementation
Browse files Browse the repository at this point in the history
* using static generic instead of dynamic trait object
* improve revision window's implementation

Signed-off-by: Phoeniix Zhao <Phoenix500526@163.com>
  • Loading branch information
Phoenix500526 committed Aug 1, 2023
1 parent d7b40f8 commit ca23a44
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 53 deletions.
16 changes: 9 additions & 7 deletions xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,16 @@ impl XlineServer {

let auto_compactor = if let Some(auto_config_cfg) = *self.compact_cfg.auto_compact_config()
{
auto_compactor(
self.is_leader,
Arc::clone(&client),
header_gen.revision_arc(),
Arc::clone(&self.shutdown_trigger),
auto_config_cfg,
Some(
auto_compactor(
self.is_leader,
Arc::clone(&client),
header_gen.revision_arc(),
Arc::clone(&self.shutdown_trigger),
auto_config_cfg,
)
.await,
)
.await
} else {
None
};
Expand Down
4 changes: 2 additions & 2 deletions xline/src/storage/compact/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub(crate) async fn auto_compactor(
revision_getter: Arc<RevisionNumberGenerator>,
shutdown_trigger: Arc<Event>,
auto_compact_cfg: AutoCompactConfig,
) -> Option<Arc<dyn Compactor>> {
) -> Arc<dyn Compactor> {
let auto_compactor: Arc<dyn Compactor> = match auto_compact_cfg {
AutoCompactConfig::Periodic(period) => {
PeriodicCompactor::new_arc(is_leader, client, revision_getter, shutdown_trigger, period)
Expand All @@ -90,7 +90,7 @@ pub(crate) async fn auto_compactor(
let _hd = tokio::spawn(async move {
auto_compactor.run().await;
});
Some(compactor_handle)
compactor_handle
}

/// background compact executor
Expand Down
55 changes: 16 additions & 39 deletions xline/src/storage/compact/periodic_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,61 +28,38 @@ impl RevisionWindow {
/// Create a new `RevisionWindow`
fn new(retention: usize) -> Self {
Self {
ring_buf: Vec::with_capacity(retention),
ring_buf: vec![0; retention],
cursor: retention.overflow_sub(1),
retention,
}
}

/// Store the revision into the inner ring buffer
#[allow(clippy::integer_arithmetic)]
#[allow(clippy::integer_arithmetic, clippy::indexing_slicing)]
fn sample(&mut self, revision: i64) {
self.cursor = (self.cursor + 1) % self.retention; // it's ok to do so since cursor will never overflow
match self.ring_buf.len().cmp(&self.retention) {
Ordering::Less => self.ring_buf.push(revision),
Ordering::Equal => {
if let Some(element) = self.ring_buf.get_mut(self.cursor) {
*element = revision;
} else {
unreachable!(
"ring_buf ({:?}) at {} should not be None",
self.ring_buf, self.cursor
);
}
}
Ordering::Greater => {
unreachable!(
"the length of RevisionWindow should be less than {}",
self.retention
)
}
}
self.ring_buf[self.cursor] = revision;
}

/// Retrieve the expired revision that is sampled period ago
#[allow(clippy::indexing_slicing, clippy::integer_arithmetic)]
fn expired_revision(&self) -> Option<i64> {
debug_assert!(
self.ring_buf.len() <= self.retention,
"the length of RevisionWindow should be less than {}",
self.retention
);
if self.ring_buf.len() < self.retention {
let target = self.ring_buf[(self.cursor + 1) % self.retention];
if target == 0 {
None
} else {
let target = (self.cursor + 1) % self.retention;
Some(self.ring_buf[target]) // it's ok to do so since ring_buf[target] should not be None.
Some(target)
}
}
}

/// Revision auto compactor
#[derive(Debug)]
pub(crate) struct PeriodicCompactor {
pub(crate) struct PeriodicCompactor<C: Compactable> {
/// `is_leader` indicates whether the current node is a leader or not.
is_leader: AtomicBool,
/// curp client
client: Arc<dyn Compactable>,
client: Arc<C>,
/// revision getter
revision_getter: Arc<RevisionNumberGenerator>,
/// shutdown trigger
Expand All @@ -91,11 +68,11 @@ pub(crate) struct PeriodicCompactor {
period: Duration,
}

impl PeriodicCompactor {
impl<C: Compactable> PeriodicCompactor<C> {
/// Creates a new revision compactor
pub(super) fn new_arc(
is_leader: bool,
client: Arc<dyn Compactable>,
client: Arc<C>,
revision_getter: Arc<RevisionNumberGenerator>,
shutdown_trigger: Arc<Event>,
period: Duration,
Expand Down Expand Up @@ -170,7 +147,7 @@ fn sample_config(period: Duration) -> (Duration, usize) {
}

#[async_trait::async_trait]
impl Compactor for PeriodicCompactor {
impl<C: Compactable> Compactor for PeriodicCompactor<C> {
fn pause(&self) {
self.is_leader.store(false, Relaxed);
}
Expand Down Expand Up @@ -214,18 +191,18 @@ mod test {
fn revision_window_should_work() {
let mut rw = RevisionWindow::new(3);
assert!(rw.expired_revision().is_none());
rw.sample(0);
assert!(rw.expired_revision().is_none());
rw.sample(1);
assert!(rw.expired_revision().is_none());
rw.sample(2);
assert_eq!(rw.expired_revision(), Some(0));
assert!(rw.expired_revision().is_none());
rw.sample(3);
assert_eq!(rw.expired_revision(), Some(1));
// retention is 2
// The first 3 minutes: 0,1,2
// The first 3 minutes: 1,2,3
// The second 2 minutes: 3,4
rw.sample(3);
rw.sample(4);
assert_eq!(rw.expired_revision(), Some(2));
assert_eq!(rw.expired_revision(), Some(3));
}

#[test]
Expand Down
10 changes: 5 additions & 5 deletions xline/src/storage/compact/revision_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ const CHECK_INTERVAL: Duration = Duration::from_secs(5 * 60);

/// Revision auto compactor
#[derive(Debug)]
pub(crate) struct RevisionCompactor {
pub(crate) struct RevisionCompactor<C: Compactable> {
/// `is_leader` indicates whether the current node is a leader or not.
is_leader: AtomicBool,
/// curp client
client: Arc<dyn Compactable>,
client: Arc<C>,
/// revision getter
revision_getter: Arc<RevisionNumberGenerator>,
/// shutdown trigger
Expand All @@ -31,11 +31,11 @@ pub(crate) struct RevisionCompactor {
retention: i64,
}

impl RevisionCompactor {
impl<C: Compactable> RevisionCompactor<C> {
/// Creates a new revision compactor
pub(super) fn new_arc(
is_leader: bool,
client: Arc<dyn Compactable + Send + Sync>,
client: Arc<C>,
revision_getter: Arc<RevisionNumberGenerator>,
shutdown_trigger: Arc<Event>,
retention: i64,
Expand Down Expand Up @@ -85,7 +85,7 @@ impl RevisionCompactor {
}

#[async_trait::async_trait]
impl Compactor for RevisionCompactor {
impl<C: Compactable> Compactor for RevisionCompactor<C> {
fn pause(&self) {
self.is_leader.store(false, Relaxed);
}
Expand Down

0 comments on commit ca23a44

Please sign in to comment.