From 8f945d6cee5cbe672ab8640e220fa53e61bf66c8 Mon Sep 17 00:00:00 2001 From: kamille Date: Tue, 11 Jul 2023 13:33:51 +0800 Subject: [PATCH] fix fully flushed region open in kakfa wal --- wal/src/message_queue_impl/region.rs | 48 +++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/wal/src/message_queue_impl/region.rs b/wal/src/message_queue_impl/region.rs index 0cdda3eddd..f2d6a285ba 100644 --- a/wal/src/message_queue_impl/region.rs +++ b/wal/src/message_queue_impl/region.rs @@ -224,7 +224,17 @@ impl Region { namespace, region_id ); - // Fetch high watermark and check. + // Fetch earliest, high watermark and check. + let earliest = message_queue + .fetch_offset(meta_topic, OffsetType::EarliestOffset) + .await + .box_err() + .context(OpenWithCause { + namespace, + region_id, + msg: "failed while recover from meta", + })?; + let high_watermark = message_queue .fetch_offset(meta_topic, OffsetType::HighWaterMark) .await @@ -235,9 +245,19 @@ impl Region { msg: "failed while recover from meta", })?; - if high_watermark == 0 { - debug!("Meta topic is empty, it just needs to recover from log topic, namespace:{}, region id:{}", namespace, region_id); - return Ok(None); + if earliest == high_watermark { + if high_watermark == 0 { + info!("Recover region meta from meta, found empty meta topic, just need to recover from log topic, namespace:{}, region id:{}", + namespace, region_id); + return Ok(None); + } + + return OpenNoCause { + namespace, + region_id, + msg: "region meta impossible to be empty when having written logs", + } + .fail(); } // Fetch snapshot from meta topic(just fetch the last snapshot). @@ -313,6 +333,9 @@ impl Region { }); let region_safe_delete_offset = if min_safe_delete_offset == i64::MAX { + info!("Recover region meta from meta, min_safe_delete_offset not exist, region_meta_snapshot:{:?}, namespace:{}, region id:{}", + value, namespace, region_id); + None } else { Some(min_safe_delete_offset) @@ -352,6 +375,17 @@ impl Region { // FIXME: should not judge whether topic is empty or not by caller. // The consumer iterator should return immediately rather than hanging when // topic empty. + // Fetch earliest, high watermark and check. + let earliest = message_queue + .fetch_offset(log_topic, OffsetType::EarliestOffset) + .await + .box_err() + .context(OpenWithCause { + namespace, + region_id, + msg: "failed while recover from log", + })?; + let high_watermark = message_queue .fetch_offset(log_topic, OffsetType::HighWaterMark) .await @@ -361,7 +395,11 @@ impl Region { region_id, msg: "failed while recover from log", })?; - if high_watermark == 0 { + + if earliest == high_watermark { + info!("Recover region meta from log, found empty log topic, namespace:{}, region_id:{}, earliest:{}, high_watermark:{}", + namespace, region_id, earliest, high_watermark + ); return Ok(()); }