diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index fec8c72eaf..786f6a74bc 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -654,7 +654,7 @@ impl Region { let mut all_segments = HashMap::new(); // Scan the directory for existing WAL files - let mut max_segment_id: i32 = -1; + let mut max_segment_id: u64 = 0; let mut next_sequence_num: u64 = MIN_SEQUENCE_NUMBER + 1; // Segment file naming convention: {SEGMENT_NAME_PREFIX}{id} @@ -673,25 +673,20 @@ impl Region { let segment = Segment::new(filename.to_string(), segment_id, segment_size)?; next_sequence_num = next_sequence_num.max(segment.max_seq + 1); + max_segment_id = max_segment_id.max(segment_id); let segment = Arc::new(Mutex::new(segment)); - - if segment_id as i32 > max_segment_id { - max_segment_id = segment_id as i32; - } all_segments.insert(segment_id, segment); } // If no existing segments, create a new one - if max_segment_id == -1 { - max_segment_id = 0; - let path = format!("{region_dir}/{SEGMENT_NAME_PREFIX}{max_segment_id}"); - let new_segment = Segment::new(path, max_segment_id as u64, segment_size)?; - let new_segment = Arc::new(Mutex::new(new_segment)); - all_segments.insert(0, new_segment); + if all_segments.is_empty() { + all_segments.insert( + max_segment_id, + Self::create_new_segment(®ion_dir, max_segment_id, segment_size)?, + ); } - let latest_segment = all_segments.get(&(max_segment_id as u64)).unwrap().clone(); - + let latest_segment = all_segments.get(&max_segment_id).unwrap().clone(); let segment_manager = SegmentManager { all_segments: Mutex::new(all_segments), cache: Mutex::new(VecDeque::new()), @@ -711,17 +706,9 @@ impl Region { }) } - fn create_new_segment(&self, id: u64) -> Result>> { - // Create a new segment - let new_segment = Segment::new( - format!("{}/segment_{}.wal", self.region_dir, id), - id, - self.segment_size, - )?; - let new_segment = Arc::new(Mutex::new(new_segment)); - self.segment_manager.add_segment(id, new_segment.clone())?; - - Ok(new_segment) + fn create_new_segment(dir: &str, id: u64, size: usize) -> Result>> { + let new_segment = Segment::new(format!("{dir}/{SEGMENT_NAME_PREFIX}{id}"), id, size)?; + Ok(Arc::new(Mutex::new(new_segment))) } pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) -> Result { @@ -766,7 +753,10 @@ impl Region { let new_segment_id = guard.id + 1; drop(guard); - *current_segment = self.create_new_segment(new_segment_id)?; + *current_segment = + Self::create_new_segment(&self.region_dir, new_segment_id, self.segment_size)?; + self.segment_manager + .add_segment(new_segment_id, current_segment.clone())?; } }