Skip to content

Commit

Permalink
allow incomplete mapping between sequence and kafka offset by not upd…
Browse files Browse the repository at this point in the history
…ating the marked deleted sequence. (#642)
  • Loading branch information
Rachelint authored Feb 14, 2023
1 parent d906f9f commit c4ccca0
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion wal/src/message_queue_impl/region_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use common_util::{
define_result,
error::{BoxError, GenericError},
};
use log::debug;
use log::{debug, warn};
use message_queue::{MessageQueue, Offset};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use tokio::sync::{Mutex, RwLock};
Expand Down Expand Up @@ -292,6 +292,7 @@ impl TableMeta {
) -> std::result::Result<(), String> {
let mut inner = self.inner.lock().await;

// Check the set sequence num's validity.
if sequence_num > inner.next_sequence_num {
return Err(format!(
"latest marked deleted should be less than or
Expand All @@ -306,6 +307,17 @@ impl TableMeta {
inner.latest_marked_deleted));
}

// The `start_sequence_offset_mapping` is possible to be incomplete during
// recovery.
let offset = inner.start_sequence_offset_mapping.get(&sequence_num);
if offset.is_none() && inner.next_sequence_num != inner.latest_marked_deleted {
warn!("Start sequence offset mapping is incomplete,
just not update the marked deleted sequence in this flush, new marked deleted, sequence num:{}, previous:{}",
sequence_num, inner.latest_marked_deleted);

return Ok(());
}

inner.latest_marked_deleted = sequence_num;

// Update the mapping, keep the range in description.
Expand Down

0 comments on commit c4ccca0

Please sign in to comment.