diff --git a/Cargo.lock b/Cargo.lock index 6a8ad08fe9..33ed54bcd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,7 +67,7 @@ dependencies = [ [[package]] name = "alloc_tracker" -version = "2.0.0" +version = "2.1.0" [[package]] name = "allocator-api2" @@ -77,7 +77,7 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "analytic_engine" -version = "2.0.0" +version = "2.1.0" dependencies = [ "anyhow", "arc-swap 1.6.0", @@ -108,7 +108,7 @@ dependencies = [ "macros", "message_queue", "metric_ext", - "object_store 2.0.0", + "object_store 2.1.0", "parquet", "parquet_ext", "pin-project-lite", @@ -237,7 +237,7 @@ checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" [[package]] name = "arena" -version = "2.0.0" +version = "2.1.0" [[package]] name = "array-init" @@ -682,7 +682,7 @@ dependencies = [ [[package]] name = "arrow_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "serde", @@ -951,7 +951,7 @@ dependencies = [ [[package]] name = "benchmarks" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "arena", @@ -966,7 +966,7 @@ dependencies = [ "generic_error", "logger", "macros", - "object_store 2.0.0", + "object_store 2.1.0", "parquet", "parquet_ext", "pprof", @@ -1238,7 +1238,7 @@ checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "bytes_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "bytes", "snafu 0.6.10", @@ -1304,7 +1304,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "catalog" -version = "2.0.0" +version = "2.1.0" dependencies = [ "async-trait", "common_types", @@ -1319,7 +1319,7 @@ dependencies = [ [[package]] name = "catalog_impls" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "async-trait", @@ -1490,7 +1490,7 @@ checksum = "b8191fa7302e03607ff0e237d4246cc043ff5b3cb9409d995172ba3bea16b807" [[package]] name = "cluster" -version = "2.0.0" +version = "2.1.0" dependencies = [ "async-trait", "bytes_ext", @@ -1525,7 +1525,7 @@ dependencies = [ [[package]] name = "codec" -version = "2.0.0" +version = "2.1.0" dependencies = [ "bytes_ext", "common_types", @@ -1574,7 +1574,7 @@ dependencies = [ [[package]] name = "common_types" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "arrow_ext", @@ -2358,7 +2358,7 @@ dependencies = [ [[package]] name = "df_engine_extensions" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "async-recursion", @@ -2383,7 +2383,7 @@ dependencies = [ [[package]] name = "df_operator" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "base64 0.13.1", @@ -2743,7 +2743,7 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "future_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "futures 0.3.28", "lazy_static", @@ -2915,7 +2915,7 @@ dependencies = [ [[package]] name = "generic_error" -version = "2.0.0" +version = "2.1.0" [[package]] name = "getrandom" @@ -2998,7 +2998,7 @@ dependencies = [ [[package]] name = "hash_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "ahash 0.8.3", "byteorder", @@ -3128,7 +3128,7 @@ dependencies = [ [[package]] name = "horaectl" -version = "2.0.0" +version = "2.1.0" dependencies = [ "anyhow", "chrono", @@ -3143,7 +3143,7 @@ dependencies = [ [[package]] name = "horaedb" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "catalog", @@ -3194,7 +3194,7 @@ dependencies = [ [[package]] name = "horaedb-test" -version = "2.0.0" +version = "2.1.0" dependencies = [ "anyhow", "async-trait", @@ -3459,7 +3459,7 @@ dependencies = [ [[package]] name = "id_allocator" -version = "2.0.0" +version = "2.1.0" dependencies = [ "generic_error", "tokio", @@ -3574,7 +3574,7 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "interpreters" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "arrow 49.0.0", @@ -3983,7 +3983,7 @@ dependencies = [ [[package]] name = "logger" -version = "2.0.0" +version = "2.1.0" dependencies = [ "chrono", "log", @@ -4064,7 +4064,7 @@ dependencies = [ [[package]] name = "macros" -version = "2.0.0" +version = "2.1.0" [[package]] name = "matchers" @@ -4141,7 +4141,7 @@ dependencies = [ [[package]] name = "message_queue" -version = "2.0.0" +version = "2.1.0" dependencies = [ "async-trait", "chrono", @@ -4158,7 +4158,7 @@ dependencies = [ [[package]] name = "meta_client" -version = "2.0.0" +version = "2.1.0" dependencies = [ "async-trait", "common_types", @@ -4181,7 +4181,7 @@ dependencies = [ [[package]] name = "metric_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "crossbeam-utils", "serde", @@ -4510,7 +4510,7 @@ dependencies = [ [[package]] name = "notifier" -version = "2.0.0" +version = "2.1.0" dependencies = [ "tokio", ] @@ -4693,7 +4693,7 @@ dependencies = [ [[package]] name = "object_store" -version = "2.0.0" +version = "2.1.0" dependencies = [ "async-trait", "bytes", @@ -4878,7 +4878,7 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "panic_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "backtrace", "gag", @@ -4979,7 +4979,7 @@ dependencies = [ [[package]] name = "parquet_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "arrow_ext", @@ -4989,7 +4989,7 @@ dependencies = [ "futures 0.3.28", "generic_error", "logger", - "object_store 2.0.0", + "object_store 2.1.0", "parquet", "tokio", ] @@ -5005,7 +5005,7 @@ dependencies = [ [[package]] name = "partition_table_engine" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "arrow 49.0.0", @@ -5026,7 +5026,7 @@ dependencies = [ [[package]] name = "partitioned_lock" -version = "2.0.0" +version = "2.1.0" dependencies = [ "hash_ext", "tokio", @@ -5445,7 +5445,7 @@ dependencies = [ [[package]] name = "profile" -version = "2.0.0" +version = "2.1.0" dependencies = [ "jemalloc-ctl", "jemalloc-sys", @@ -5668,7 +5668,7 @@ checksum = "9653c3ed92974e34c5a6e0a510864dab979760481714c172e0a34e437cb98804" [[package]] name = "proxy" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "arrow_ext", @@ -5769,7 +5769,7 @@ dependencies = [ [[package]] name = "query_engine" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "async-trait", @@ -5799,7 +5799,7 @@ dependencies = [ [[package]] name = "query_frontend" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "async-trait", @@ -6098,7 +6098,7 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "remote_engine_client" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow_ext", "async-trait", @@ -6315,7 +6315,7 @@ dependencies = [ [[package]] name = "router" -version = "2.0.0" +version = "2.1.0" dependencies = [ "async-trait", "cluster", @@ -6358,7 +6358,7 @@ dependencies = [ [[package]] name = "runtime" -version = "2.0.0" +version = "2.1.0" dependencies = [ "lazy_static", "macros", @@ -6395,7 +6395,7 @@ dependencies = [ [[package]] name = "rust-sdk-test" -version = "2.0.0" +version = "2.1.0" dependencies = [ "horaedb-client", "tokio", @@ -6592,7 +6592,7 @@ dependencies = [ [[package]] name = "sampling_cache" -version = "2.0.0" +version = "2.1.0" dependencies = [ "chrono", ] @@ -6737,7 +6737,7 @@ dependencies = [ [[package]] name = "server" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "arc-swap 1.6.0", @@ -6900,7 +6900,7 @@ dependencies = [ [[package]] name = "size_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "serde", "toml 0.7.3", @@ -6923,7 +6923,7 @@ dependencies = [ [[package]] name = "skiplist" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arena", "bytes", @@ -7318,7 +7318,7 @@ dependencies = [ [[package]] name = "system_catalog" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "async-trait", @@ -7340,7 +7340,7 @@ dependencies = [ [[package]] name = "system_stats" -version = "2.0.0" +version = "2.1.0" dependencies = [ "sysinfo", "tokio", @@ -7348,7 +7348,7 @@ dependencies = [ [[package]] name = "table_engine" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "arrow_ext", @@ -7380,7 +7380,7 @@ dependencies = [ [[package]] name = "table_kv" -version = "2.0.0" +version = "2.1.0" dependencies = [ "lazy_static", "logger", @@ -7459,7 +7459,7 @@ dependencies = [ [[package]] name = "test_util" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "chrono", @@ -7549,7 +7549,7 @@ dependencies = [ [[package]] name = "time_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "chrono", "common_types", @@ -7563,7 +7563,7 @@ dependencies = [ [[package]] name = "timed_task" -version = "2.0.0" +version = "2.1.0" dependencies = [ "logger", "runtime", @@ -7773,7 +7773,7 @@ dependencies = [ [[package]] name = "toml_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "macros", "serde", @@ -7859,7 +7859,7 @@ dependencies = [ [[package]] name = "tools" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "anyhow", @@ -7868,7 +7868,7 @@ dependencies = [ "futures 0.3.28", "generic_error", "num_cpus", - "object_store 2.0.0", + "object_store 2.1.0", "parquet", "parquet_ext", "runtime", @@ -7911,14 +7911,14 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "trace_metric" -version = "2.0.0" +version = "2.1.0" dependencies = [ "trace_metric_derive", ] [[package]] name = "trace_metric_derive" -version = "2.0.0" +version = "2.1.0" dependencies = [ "proc-macro2", "quote", @@ -7927,7 +7927,7 @@ dependencies = [ [[package]] name = "trace_metric_derive_tests" -version = "2.0.0" +version = "2.1.0" dependencies = [ "trace_metric", ] @@ -8018,7 +8018,7 @@ dependencies = [ [[package]] name = "tracing_util" -version = "2.0.0" +version = "2.1.0" dependencies = [ "console-subscriber", "lazy_static", @@ -8229,7 +8229,7 @@ checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" [[package]] name = "wal" -version = "2.0.0" +version = "2.1.0" dependencies = [ "anyhow", "async-scoped", diff --git a/Cargo.toml b/Cargo.toml index abc2403648..d2d73fd0ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ # under the License. [workspace.package] -version = "2.0.0" +version = "2.1.0" authors = ["HoraeDB Authors"] edition = "2021" license = "Apache-2.0" diff --git a/src/wal/src/local_storage_impl/config.rs b/src/wal/src/local_storage_impl/config.rs index b3e70e9b5c..5a88131371 100644 --- a/src/wal/src/local_storage_impl/config.rs +++ b/src/wal/src/local_storage_impl/config.rs @@ -16,12 +16,13 @@ // under the License. use serde::{Deserialize, Serialize}; +use size_ext::ReadableSize; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct LocalStorageConfig { pub data_dir: String, - pub segment_size: usize, + pub segment_size: ReadableSize, pub cache_size: usize, } @@ -29,7 +30,7 @@ impl Default for LocalStorageConfig { fn default() -> Self { Self { data_dir: "/tmp/horaedb".to_string(), - segment_size: 64 * 1024 * 1024, // 64MB + segment_size: ReadableSize::mb(64), cache_size: 3, } } diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 5613021c96..8fa0915b12 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -134,6 +134,7 @@ pub enum Error { define_result!(Error); +const SEGMENT_NAME_PREFIX: &str = "seg_"; const SEGMENT_HEADER: &[u8] = b"HoraeDBWAL"; const WAL_SEGMENT_V0: u8 = 0; const NEWEST_WAL_SEGMENT_VERSION: u8 = WAL_SEGMENT_V0; @@ -653,61 +654,43 @@ impl Region { let mut all_segments = HashMap::new(); // Scan the directory for existing WAL files - let mut max_segment_id: i32 = -1; + let mut max_segment_id: u64 = 0; let mut next_sequence_num: u64 = MIN_SEQUENCE_NUMBER + 1; - // Segment file naming convention: segment_.wal + // Segment file naming convention: {SEGMENT_NAME_PREFIX}{id} for entry in fs::read_dir(®ion_dir).context(FileOpen)? { let entry = entry.context(FileOpen)?; - - let path = entry.path(); - - if !path.is_file() { - continue; - } - - match path.extension() { - Some(ext) if ext == "wal" => ext, - _ => continue, - }; - - let file_name = match path.file_name().and_then(|name| name.to_str()) { - Some(name) => name, - None => continue, - }; - - let segment_id = match file_name - .trim_start_matches("segment_") - .trim_end_matches(".wal") - .parse::() - .ok() - { + let filename = entry.file_name(); + let filename = filename.to_string_lossy(); + let segment_id = match filename.strip_prefix(SEGMENT_NAME_PREFIX) { Some(id) => id, None => continue, }; + let segment_id = segment_id + .parse::() + .map_err(anyhow::Error::new) + .context(Internal)?; - let segment = - Segment::new(path.to_string_lossy().to_string(), segment_id, segment_size)?; + let segment = Segment::new( + entry.path().to_string_lossy().to_string(), + segment_id, + segment_size, + )?; next_sequence_num = next_sequence_num.max(segment.max_seq + 1); + max_segment_id = max_segment_id.max(segment_id); let segment = Arc::new(Mutex::new(segment)); - - if segment_id as i32 > max_segment_id { - max_segment_id = segment_id as i32; - } all_segments.insert(segment_id, segment); } // If no existing segments, create a new one - if max_segment_id == -1 { - max_segment_id = 0; - let path = format!("{}/segment_{}.wal", region_dir, max_segment_id); - let new_segment = Segment::new(path, max_segment_id as u64, segment_size)?; - let new_segment = Arc::new(Mutex::new(new_segment)); - all_segments.insert(0, new_segment); + if all_segments.is_empty() { + all_segments.insert( + max_segment_id, + Self::create_new_segment(®ion_dir, max_segment_id, segment_size)?, + ); } - let latest_segment = all_segments.get(&(max_segment_id as u64)).unwrap().clone(); - + let latest_segment = all_segments.get(&max_segment_id).unwrap().clone(); let segment_manager = SegmentManager { all_segments: Mutex::new(all_segments), cache: Mutex::new(VecDeque::new()), @@ -727,17 +710,9 @@ impl Region { }) } - fn create_new_segment(&self, id: u64) -> Result>> { - // Create a new segment - let new_segment = Segment::new( - format!("{}/segment_{}.wal", self.region_dir, id), - id, - self.segment_size, - )?; - let new_segment = Arc::new(Mutex::new(new_segment)); - self.segment_manager.add_segment(id, new_segment.clone())?; - - Ok(new_segment) + fn create_new_segment(dir: &str, id: u64, size: usize) -> Result>> { + let new_segment = Segment::new(format!("{dir}/{SEGMENT_NAME_PREFIX}{id}"), id, size)?; + Ok(Arc::new(Mutex::new(new_segment))) } pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) -> Result { @@ -782,7 +757,10 @@ impl Region { let new_segment_id = guard.id + 1; drop(guard); - *current_segment = self.create_new_segment(new_segment_id)?; + *current_segment = + Self::create_new_segment(&self.region_dir, new_segment_id, self.segment_size)?; + self.segment_manager + .add_segment(new_segment_id, current_segment.clone())?; } } diff --git a/src/wal/src/local_storage_impl/wal_manager.rs b/src/wal/src/local_storage_impl/wal_manager.rs index 694831eae1..2c4494654d 100644 --- a/src/wal/src/local_storage_impl/wal_manager.rs +++ b/src/wal/src/local_storage_impl/wal_manager.rs @@ -57,11 +57,11 @@ impl LocalStorageImpl { segment_size, .. } = config.clone(); - let wal_path_str = wal_path.to_str().unwrap().to_string(); + let wal_path_str = wal_path.to_string_lossy().to_string(); let region_manager = RegionManager::new( wal_path_str.clone(), cache_size, - segment_size, + segment_size.as_byte() as usize, runtime.clone(), ) .box_err()