Skip to content

Commit

Permalink
adjust the order of sync meta snapshot and clean logs in wal on kafka. (
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint authored Feb 14, 2023
1 parent c4ccca0 commit a7d3f72
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 36 deletions.
33 changes: 4 additions & 29 deletions wal/src/message_queue_impl/log_cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

//! Log cleaner

use std::{cmp, sync::Arc};
use std::sync::Arc;

use common_util::{
define_result,
Expand All @@ -12,8 +12,6 @@ use log::info;
use message_queue::{MessageQueue, Offset};
use snafu::{ensure, Backtrace, ResultExt, Snafu};

use crate::message_queue_impl::region_context::RegionMetaSnapshot;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
Expand Down Expand Up @@ -68,15 +66,12 @@ impl<M: MessageQueue> LogCleaner<M> {
}
}

pub async fn maybe_clean_logs(&mut self, snapshot: &RegionMetaSnapshot) -> Result<()> {
pub async fn maybe_clean_logs(&mut self, safe_delete_offset: Offset) -> Result<()> {
info!(
"Begin to check and clean logs, region id:{} snapshot:{:?}, topic:{}",
self.region_id, snapshot, self.log_topic
"Begin to check and clean logs, region id:{}, topic:{}, safe delete offset:{:?}",
self.region_id, self.log_topic, safe_delete_offset
);

// Get offset preparing to delete to.
let safe_delete_offset = Self::calc_safe_delete_offset(snapshot);

// Decide whether cleaning should be done.
let mut do_clean = true;
if let Some(last_deleted_offset) = self.last_deleted_offset {
Expand Down Expand Up @@ -113,24 +108,4 @@ impl<M: MessageQueue> LogCleaner<M> {

Ok(())
}

fn calc_safe_delete_offset(snapshot: &RegionMetaSnapshot) -> Offset {
let mut safe_delete_offset = Offset::MAX;
let mut high_watermark = 0;
// Calc the min offset in message queue.
for table_meta in &snapshot.entries {
if let Some(offset) = table_meta.safe_delete_offset {
safe_delete_offset = cmp::min(safe_delete_offset, offset);
}
high_watermark = cmp::max(high_watermark, table_meta.current_high_watermark);
}

if safe_delete_offset == Offset::MAX {
// All tables are in such states: after init/flush, but not written.
// So, we can directly delete it up to the high_watermark.
high_watermark
} else {
safe_delete_offset
}
}
}
15 changes: 8 additions & 7 deletions wal/src/message_queue_impl/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,17 +621,18 @@ impl<M: MessageQueue> Region<M> {
)
};

// Check and maybe clean logs.
let mut log_cleaner = self.log_cleaner.lock().await;
log_cleaner
.maybe_clean_logs(&snapshot)
let safe_delete_offset = snapshot.safe_delete_offset();
// Sync snapshot first.
synchronizer
.sync(snapshot)
.await
.box_err()
.context(CleanLogs)?;

// Sync snapshot.
synchronizer
.sync(snapshot)
// Check and maybe clean logs then.
let mut log_cleaner = self.log_cleaner.lock().await;
log_cleaner
.maybe_clean_logs(safe_delete_offset)
.await
.box_err()
.context(CleanLogs)
Expand Down
23 changes: 23 additions & 0 deletions wal/src/message_queue_impl/region_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Region context

use std::{
cmp,
collections::{BTreeMap, HashMap},
sync::Arc,
};
Expand Down Expand Up @@ -431,6 +432,28 @@ pub struct RegionMetaSnapshot {
pub entries: Vec<TableMetaData>,
}

impl RegionMetaSnapshot {
pub fn safe_delete_offset(&self) -> Offset {
let mut safe_delete_offset = Offset::MAX;
let mut high_watermark = 0;
// Calc the min offset in message queue.
for table_meta in &self.entries {
if let Some(offset) = table_meta.safe_delete_offset {
safe_delete_offset = cmp::min(safe_delete_offset, offset);
}
high_watermark = cmp::max(high_watermark, table_meta.current_high_watermark);
}

if safe_delete_offset == Offset::MAX {
// All tables are in such states: after init/flush, but not written.
// So, we can directly delete it up to the high_watermark.
high_watermark
} else {
safe_delete_offset
}
}
}

/// Message queue's offset range
///
/// The range should be [start, end], and it will never be empty.
Expand Down

0 comments on commit a7d3f72

Please sign in to comment.