From 631c116f090ee42e6babc0ba86f234b8e4eca0f6 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 17 Oct 2024 11:32:02 +0800 Subject: [PATCH] fix ci --- src/wal/src/local_storage_impl/config.rs | 2 +- src/wal/src/local_storage_impl/segment.rs | 46 ++++++++++------------- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/src/wal/src/local_storage_impl/config.rs b/src/wal/src/local_storage_impl/config.rs index 565282ee4d..5a88131371 100644 --- a/src/wal/src/local_storage_impl/config.rs +++ b/src/wal/src/local_storage_impl/config.rs @@ -30,7 +30,7 @@ impl Default for LocalStorageConfig { fn default() -> Self { Self { data_dir: "/tmp/horaedb".to_string(), - segment_size: ReadableSize::mb(128), + segment_size: ReadableSize::mb(64), cache_size: 3, } } diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index fec8c72eaf..8fa0915b12 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -654,7 +654,7 @@ impl Region { let mut all_segments = HashMap::new(); // Scan the directory for existing WAL files - let mut max_segment_id: i32 = -1; + let mut max_segment_id: u64 = 0; let mut next_sequence_num: u64 = MIN_SEQUENCE_NUMBER + 1; // Segment file naming convention: {SEGMENT_NAME_PREFIX}{id} @@ -671,27 +671,26 @@ impl Region { .map_err(anyhow::Error::new) .context(Internal)?; - let segment = Segment::new(filename.to_string(), segment_id, segment_size)?; + let segment = Segment::new( + entry.path().to_string_lossy().to_string(), + segment_id, + segment_size, + )?; next_sequence_num = next_sequence_num.max(segment.max_seq + 1); + max_segment_id = max_segment_id.max(segment_id); let segment = Arc::new(Mutex::new(segment)); - - if segment_id as i32 > max_segment_id { - max_segment_id = segment_id as i32; - } all_segments.insert(segment_id, segment); } // If no existing segments, create a new one - if max_segment_id == -1 { - max_segment_id = 0; - let path = format!("{region_dir}/{SEGMENT_NAME_PREFIX}{max_segment_id}"); - let new_segment = Segment::new(path, max_segment_id as u64, segment_size)?; - let new_segment = Arc::new(Mutex::new(new_segment)); - all_segments.insert(0, new_segment); + if all_segments.is_empty() { + all_segments.insert( + max_segment_id, + Self::create_new_segment(®ion_dir, max_segment_id, segment_size)?, + ); } - let latest_segment = all_segments.get(&(max_segment_id as u64)).unwrap().clone(); - + let latest_segment = all_segments.get(&max_segment_id).unwrap().clone(); let segment_manager = SegmentManager { all_segments: Mutex::new(all_segments), cache: Mutex::new(VecDeque::new()), @@ -711,17 +710,9 @@ impl Region { }) } - fn create_new_segment(&self, id: u64) -> Result>> { - // Create a new segment - let new_segment = Segment::new( - format!("{}/segment_{}.wal", self.region_dir, id), - id, - self.segment_size, - )?; - let new_segment = Arc::new(Mutex::new(new_segment)); - self.segment_manager.add_segment(id, new_segment.clone())?; - - Ok(new_segment) + fn create_new_segment(dir: &str, id: u64, size: usize) -> Result>> { + let new_segment = Segment::new(format!("{dir}/{SEGMENT_NAME_PREFIX}{id}"), id, size)?; + Ok(Arc::new(Mutex::new(new_segment))) } pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) -> Result { @@ -766,7 +757,10 @@ impl Region { let new_segment_id = guard.id + 1; drop(guard); - *current_segment = self.create_new_segment(new_segment_id)?; + *current_segment = + Self::create_new_segment(&self.region_dir, new_segment_id, self.segment_size)?; + self.segment_manager + .add_segment(new_segment_id, current_segment.clone())?; } }