From c4ccca0d494ee594c5624985a7ec2c219707a67c Mon Sep 17 00:00:00 2001 From: kamille <34352236+Rachelint@users.noreply.github.com> Date: Tue, 14 Feb 2023 11:42:20 +0800 Subject: [PATCH] allow incomplete mapping between sequence and kafka offset by not updating the marked deleted sequence. (#642) --- wal/src/message_queue_impl/region_context.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/wal/src/message_queue_impl/region_context.rs b/wal/src/message_queue_impl/region_context.rs index addf3e0269..d31b994018 100644 --- a/wal/src/message_queue_impl/region_context.rs +++ b/wal/src/message_queue_impl/region_context.rs @@ -12,7 +12,7 @@ use common_util::{ define_result, error::{BoxError, GenericError}, }; -use log::debug; +use log::{debug, warn}; use message_queue::{MessageQueue, Offset}; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use tokio::sync::{Mutex, RwLock}; @@ -292,6 +292,7 @@ impl TableMeta { ) -> std::result::Result<(), String> { let mut inner = self.inner.lock().await; + // Check the set sequence num's validity. if sequence_num > inner.next_sequence_num { return Err(format!( "latest marked deleted should be less than or @@ -306,6 +307,17 @@ impl TableMeta { inner.latest_marked_deleted)); } + // The `start_sequence_offset_mapping` is possible to be incomplete during + // recovery. + let offset = inner.start_sequence_offset_mapping.get(&sequence_num); + if offset.is_none() && inner.next_sequence_num != inner.latest_marked_deleted { + warn!("Start sequence offset mapping is incomplete, + just not update the marked deleted sequence in this flush, new marked deleted, sequence num:{}, previous:{}", + sequence_num, inner.latest_marked_deleted); + + return Ok(()); + } + inner.latest_marked_deleted = sequence_num; // Update the mapping, keep the range in description.